Source code for openghg.cloud._call
"""
Call OpenGHG serverless functions
"""
import os
from pathlib import Path
from typing import Dict, Optional, Tuple
import msgpack
import requests
from openghg.types import FunctionError
from openghg.util import compress, hash_bytes
[docs]
def create_file_package(filepath: Path, obs_type: str) -> Tuple[bytes, Dict]:
"""Reads file metadata and compresses data to be sent to the serverless function
Args:
filepath: Path of data
obs_type: Observation type
Returns:
tuple: Compressed data as bytes and dictionary of file metadata
"""
in_mem_limit = 300 # MB
# To convert bytes to megabytes
MB = 1e6
# Get the file size in megabytes
file_size = filepath.stat().st_size / MB
if file_size > in_mem_limit:
raise NotImplementedError("We can't handle this size of file yet.")
file_data = filepath.read_bytes()
sha1_hash = hash_bytes(data=file_data)
compressed_data = compress(data=file_data)
filename = filepath.name
file_metadata = {
"sha1_hash": sha1_hash,
"filename": filename,
"compressed": True,
"obs_type": obs_type.lower(),
}
return compressed_data, file_metadata
[docs]
def create_post_dict(
function_name: str,
data: bytes,
metadata: Dict,
file_metadata: Dict,
precision_data: Optional[Dict] = None,
precision_file_metadata: Optional[Dict] = None,
) -> Dict:
"""Create the dictionary to POST to the remote function
Args:
function: Function name
data: Compressed data
metadata: Metadata dictionary
file_metadata: File metadata
precision_data: GCWERKS precision data
precision_file_metadata: GCWERKS precision file metadata
Returns:
dict: Dictionary ready for function call
"""
to_post = {
"function": function_name,
"data": data,
"metadata": metadata,
"file_metadata": file_metadata,
}
if precision_data is not None:
to_post["precision_data"] = precision_data
if precision_file_metadata is None:
raise ValueError("We need both precision data and file metadata.")
to_post["precision_file_metadata"] = precision_file_metadata
return to_post
[docs]
def call_function(data: Dict) -> Dict:
"""Calls an OpenGHG serverless function and returns its response
Args:
data: Data to POST. Must be a dictionary created using the create_post_dict function.
Returns:
dict: Dictionary containing response status, headers and content.
"""
fn_url = _get_function_url()
auth_key = _get_auth_key()
headers = {}
headers["Content-Type"] = "application/octet-stream"
headers["authorization"] = auth_key
packed_data = msgpack.packb(data)
response = requests.post(url=fn_url, data=packed_data, headers=headers)
if response.status_code != 200:
raise FunctionError(f"Function call error: {str(response.content)}")
return {
"status": response.status_code,
"headers": dict(headers),
"content": msgpack.unpackb(response.content),
}
def _get_function_url() -> str:
"""Get the URl for the required service
Args:
service_name: Service name
Returns:
str: Service / function URL
"""
try:
return os.environ["OPENGHG_FN_URL"]
except KeyError:
raise FunctionError("No OPENGHG_FN_URL environment variable set for function URLs")
def _get_auth_key() -> str:
"""Get the authentication key from the local environmen
This offers very limited control over calling of the functions for now.
It will be replaced by whatever user authentication system we end up using.
Returns:
str: Authentication key for serverless Fn
"""
try:
return os.environ["AUTH_KEY"]
except KeyError:
raise FunctionError("A valid AUTH_KEY secret must be set.")