Source code for openghg.store._boundary_conditions

from __future__ import annotations

import logging
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, DefaultDict, Dict, Optional, Tuple, Union

import numpy as np
from xarray import Dataset

if TYPE_CHECKING:
    from openghg.store import DataSchema

from openghg.store.base import BaseStore

__all__ = ["BoundaryConditions"]

logger = logging.getLogger("openghg.store")
logger.setLevel(logging.DEBUG)  # Have to set level for logger as well as handler


[docs]class BoundaryConditions(BaseStore): """This class is used to process boundary condition data""" _root = "BoundaryConditions" _uuid = "4e787366-be91-4fc5-ad1b-4adcb213d478" _metakey = f"{_root}/uuid/{_uuid}/metastore"
[docs] def save(self) -> None: """Save the object to the object store Returns: None """ from openghg.objectstore import get_bucket, set_object_from_json bucket = get_bucket() obs_key = f"{BoundaryConditions._root}/uuid/{BoundaryConditions._uuid}" self._stored = True set_object_from_json(bucket=bucket, key=obs_key, data=self.to_data())
[docs] @staticmethod def read_data(binary_data: bytes, metadata: Dict, file_metadata: Dict) -> Optional[Dict]: """Ready a footprint from binary data Args: binary_data: Footprint data metadata: Dictionary of metadata file_metadat: File metadata Returns: dict: UUIDs of Datasources data has been assigned to """ with TemporaryDirectory() as tmpdir: tmpdir_path = Path(tmpdir) try: filename = file_metadata["filename"] except KeyError: raise KeyError("We require a filename key for metadata read.") filepath = tmpdir_path.joinpath(filename) filepath.write_bytes(binary_data) return BoundaryConditions.read_file(filepath=filepath, **metadata)
[docs] @staticmethod def read_file( filepath: Union[str, Path], species: str, bc_input: str, domain: str, period: Optional[Union[str, tuple]] = None, continuous: bool = True, overwrite: bool = False, ) -> Optional[Dict]: """Read boundary conditions file Args: filepath: Path of boundary conditions file species: Species name bc_input: Input used to create boundary conditions. For example: - a model name such as "MOZART" or "CAMS" - a description such as "UniformAGAGE" (uniform values based on AGAGE average) domain: Region for boundary conditions period: Period of measurements. Only needed if this can not be inferred from the time coords If specified, should be one of: - "yearly", "monthly" - suitable pandas Offset Alias - tuple of (value, unit) as would be passed to pandas.Timedelta function continuous: Whether time stamps have to be continuous. overwrite: Should this data overwrite currently stored data. Returns: dict: Dictionary of datasource UUIDs data assigned to """ from collections import defaultdict from openghg.store import ( assign_data, datasource_lookup, infer_date_range, update_zero_dim, load_metastore, ) from openghg.util import clean_string, hash_file, timestamp_now from xarray import open_dataset species = clean_string(species) bc_input = clean_string(bc_input) domain = clean_string(domain) filepath = Path(filepath) bc_store = BoundaryConditions.load() # Load in the metadata store metastore = load_metastore(key=bc_store._metakey) file_hash = hash_file(filepath=filepath) if file_hash in bc_store._file_hashes and not overwrite: logger.warning( "This file has been uploaded previously with the filename : " f"{bc_store._file_hashes[file_hash]} - skipping." ) return None bc_data = open_dataset(filepath) # Some attributes are numpy types we can't serialise to JSON so convert them # to their native types here attrs = {} for key, value in bc_data.attrs.items(): try: attrs[key] = value.item() except AttributeError: attrs[key] = value author_name = "OpenGHG Cloud" bc_data.attrs["author"] = author_name metadata = {} metadata.update(attrs) metadata["species"] = species metadata["domain"] = domain metadata["bc_input"] = bc_input metadata["author"] = author_name metadata["processed"] = str(timestamp_now()) # Check if time has 0-dimensions and, if so, expand this so time is 1D if "time" in bc_data.coords: bc_data = update_zero_dim(bc_data, dim="time") # Currently ACRG boundary conditions are split by month or year bc_time = bc_data["time"] start_date, end_date, period_str = infer_date_range( bc_time, filepath=filepath, period=period, continuous=continuous ) # Checking against expected format for boundary conditions BoundaryConditions.validate_data(bc_data) data_type = "boundary_conditions" metadata["start_date"] = str(start_date) metadata["end_date"] = str(end_date) metadata["data_type"] = data_type metadata["max_longitude"] = round(float(bc_data["lon"].max()), 5) metadata["min_longitude"] = round(float(bc_data["lon"].min()), 5) metadata["max_latitude"] = round(float(bc_data["lat"].max()), 5) metadata["min_latitude"] = round(float(bc_data["lat"].min()), 5) metadata["min_height"] = round(float(bc_data["height"].min()), 5) metadata["max_height"] = round(float(bc_data["height"].max()), 5) metadata["input_filename"] = filepath.name metadata["time_period"] = period_str key = "_".join((species, bc_input, domain)) boundary_conditions_data: DefaultDict[str, Dict[str, Union[Dict, Dataset]]] = defaultdict(dict) boundary_conditions_data[key]["data"] = bc_data boundary_conditions_data[key]["metadata"] = metadata required_keys = ("species", "bc_input", "domain") lookup_results = datasource_lookup( metastore=metastore, data=boundary_conditions_data, required_keys=required_keys ) datasource_uuids = assign_data( data_dict=boundary_conditions_data, lookup_results=lookup_results, overwrite=overwrite, data_type=data_type, ) bc_store.add_datasources(uuids=datasource_uuids, data=boundary_conditions_data, metastore=metastore) # Record the file hash in case we see this file again bc_store._file_hashes[file_hash] = filepath.name bc_store.save() metastore.close() return datasource_uuids
[docs] @staticmethod def schema() -> DataSchema: """ Define schema for boundary conditions Dataset. Includes volume mole fractions for each time and ordinal, vertical boundary at the edge of the defined domain: - "vmr_n", "vmr_s" - expected dimensions: ("time", "height", "lon") - "vmr_e", "vmr_w" - expected dimensions: ("time", "height", "lat") Expected data types for all variables and coordinates also included. Returns: DataSchema : Contains schema for BoundaryConditions. """ from openghg.store import DataSchema data_vars: Dict[str, Tuple[str, ...]] = { "vmr_n": ("time", "height", "lon"), "vmr_e": ("time", "height", "lat"), "vmr_s": ("time", "height", "lon"), "vmr_w": ("time", "height", "lat"), } dtypes = { "lat": np.floating, "lon": np.floating, "height": np.floating, "time": np.datetime64, "vmr_n": np.floating, "vmr_e": np.floating, "vmr_s": np.floating, "vmr_w": np.floating, } data_format = DataSchema(data_vars=data_vars, dtypes=dtypes) return data_format
[docs] @staticmethod def validate_data(data: Dataset) -> None: """ Validate input data against BoundaryConditions schema - definition from BoundaryConditions.schema() method. Args: data : xarray Dataset in expected format Returns: None Raises a ValueError with details if the input data does not adhere to the BoundaryConditions schema. """ data_schema = BoundaryConditions.schema() data_schema.validate_data(data)