Source code for openghg.store._flux_timeseries

from __future__ import annotations

import logging
from pathlib import Path
from tempfile import TemporaryDirectory
import numpy as np
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from openghg.store import DataSchema

from openghg.store.base import BaseStore

__all__ = ["FluxTimeseries"]

logger = logging.getLogger("openghg.store")
logger.setLevel(logging.DEBUG)  # Have to set level for logger as well as handler


[docs] class FluxTimeseries(BaseStore): """This class is used to process ond dimension timeseries data""" _data_type = "flux_timeseries" """ _root = "FluxTimeseries" _uuid = "099b597b-0598-4efa-87dd-472dfe027f5d8" _metakey = f"{_root}/uuid/{_uuid}/metastore"""
[docs] def read_raw_data(self, binary_data: bytes, metadata: dict, file_metadata: dict) -> list[dict] | None: """Ready a footprint from binary data Args: binary_data: Footprint data metadata: Dictionary of metadata file_metadat: File metadata Returns: dict: UUIDs of Datasources data has been assigned to """ with TemporaryDirectory() as tmpdir: tmpdir_path = Path(tmpdir) try: filename = file_metadata["filename"] except KeyError: raise KeyError("We require a filename key for metadata read.") filepath = tmpdir_path.joinpath(filename) filepath.write_bytes(binary_data) return self.standardise_and_store(filepath=filepath, **metadata)
# def read_data( # self, # filepath: pathType, # species: str, # source: str, # region: str, # domain: str | None = None, # database: str | None = None, # database_version: str | None = None, # model: str | None = None, # source_format: str = "crf", # period: str | tuple | None = None, # tag: str | list | None = None, # continuous: bool = True, # if_exists: str = "auto", # save_current: str = "auto", # overwrite: bool = False, # force: bool = False, # compressor: Any | None = None, # filters: Any | None = None, # info_metadata: dict | None = None, # ) -> list[dict]: # """Read one dimension timeseries file # Args: # filepath: Path of flux timeseries / emissions timeseries file # species: Species name # domain: Region for Flux timeseries # source: Source of the emissions data, e.g. "energy", "anthro", default is 'anthro'. # region: Region/Country of the CRF data # domain: Geographic domain, default is 'None'. Instead region is used to identify area # database: Name of database source for this input (if relevant) # database_version: Name of database version (if relevant) # model: Model name (if relevant) # source_format : Type of data being input e.g. openghg (internal format) # period: Period of measurements. Only needed if this can not be inferred from the time coords # If specified, should be one of: # - "yearly", "monthly" # - suitable pandas Offset Alias # - tuple of (value, unit) as would be passed to pandas.Timedelta function # tag: Special tagged values to add to the Datasource. This will be added to any # current values if the tag key already exists in a list. # continuous: Whether time stamps have to be continuous. # if_exists: What to do if existing data is present. # - "auto" - checks new and current data for timeseries overlap # - adds data if no overlap # - raises DataOverlapError if there is an overlap # - "new" - just include new data and ignore previous # - "combine" - replace and insert new data into current timeseries # save_current: Whether to save data in current form and create a new version. # - "auto" - this will depend on if_exists input ("auto" -> False), (other -> True) # - "y" / "yes" - Save current data exactly as it exists as a separate (previous) version # - "n" / "no" - Allow current data to updated / deleted # overwrite: Deprecated. This will use options for if_exists="new". # force: Force adding of data even if this is identical to data stored. # compressor: A custom compressor to use. If None, this will default to # `Blosc(cname="zstd", clevel=5, shuffle=Blosc.SHUFFLE)`. # See https://zarr.readthedocs.io/en/stable/api/codecs.html for more information on compressors. # filters: Filters to apply to the data on storage, this defaults to no filtering. See # https://zarr.readthedocs.io/en/stable/tutorial.html#filters for more information on picking filters. # info_metadata: Allows to pass in additional tags to describe the data. e.g {"comment":"Quality checks have been applied"} # Returns: # dict: Dictionary of datasource UUIDs data assigned to # """ # # Get initial values which exist within this function scope using locals # # MUST be at the top of the function # fn_input_parameters = locals().copy() # from openghg.store.spec import define_standardise_parsers # from openghg.util import ( # clean_string, # load_standardise_parser, # check_if_need_new_version, # split_function_inputs, # ) # species = clean_string(species) # source = clean_string(source) # region = clean_string(region) # if domain: # domain = clean_string(domain) # # Specify any additional metadata to be added # additional_metadata = {} # if overwrite and if_exists == "auto": # logger.warning( # "Overwrite flag is deprecated in preference to `if_exists` (and `save_current`) inputs." # "See documentation for details of these inputs and options." # ) # if_exists = "new" # # Making sure new version will be created by default if force keyword is included. # if force and if_exists == "auto": # if_exists = "new" # new_version = check_if_need_new_version(if_exists, save_current) # filepath = Path(filepath) # standardise_parsers = define_standardise_parsers()[self._data_type] # try: # source_format = standardise_parsers[source_format.upper()].value # except KeyError: # raise ValueError(f"Unknown data type {source_format} selected.") # # Load the data retrieve object # parser_fn = load_standardise_parser(data_type=self._data_type, source_format=source_format) # # Get current parameter values and filter to only include function inputs # fn_current_parameters = locals().copy() # Make a copy of parameters passed to function # fn_input_parameters = {key: fn_current_parameters[key] for key in fn_input_parameters} # _, unseen_hashes = self.check_hashes(filepaths=filepath, force=force) # if not unseen_hashes: # return [{}] # filepath = next(iter(unseen_hashes.values())) # # Define parameters to pass to the parser function and remaining keys # parser_input_parameters, additional_input_parameters = split_function_inputs( # fn_input_parameters, parser_fn # ) # flux_timeseries_data = parser_fn(**parser_input_parameters) # # Checking against expected format for Flux # for mdd in flux_timeseries_data: # FluxTimeseries.validate_data(mdd.data) # # Check to ensure no required keys are being passed through info_metadata dict # self.check_info_keys(info_metadata) # if info_metadata is not None: # additional_metadata.update(info_metadata) # # Mop up and add additional keys to metadata which weren't passed to the parser # flux_timeseries_data = self.update_metadata( # flux_timeseries_data, additional_input_parameters, additional_metadata # ) # data_type = "flux_timeseries" # datasource_uuids = self.assign_data( # data=flux_timeseries_data, # if_exists=if_exists, # new_version=new_version, # data_type=data_type, # compressor=compressor, # filters=filters, # ) # # Record the file hash in case we see this file again # self.store_hashes(unseen_hashes) # return datasource_uuids
[docs] def format_inputs(self, **kwargs: Any) -> dict: """ Apply appropriate formatting for expected inputs for FluxTimeseries. Expected inputs will typically be defined within the openghg.standardise.standardise_flux_timeseries() function. Args: kwargs: Set of keyword arguments. Selected keywords will be appropriately formatted. Returns: dict: Formatted parameters for this data type. """ from openghg.util import clean_string, synonyms params = kwargs.copy() # Apply clean string formatting params["species"] = clean_string(params.get("species")) params["source"] = clean_string(params.get("source")) params["region"] = clean_string(params.get("region")) params["domain"] = clean_string(params.get("domain")) # Apply individual formatting as appropriate # - apply synonyms substitution for species species = params.get("species") if species is not None: params["species"] = synonyms(species) return params
[docs] @staticmethod def schema() -> DataSchema: # type: ignore[override] """ Define schema for one dimensional timeseries(FluxTimeseries) Dataset. Includes observation for each time of the defined domain: - "Obs" - expected dimensions: ("time") Expected data types for all variables and coordinates also included. Returns: DataSchema : Contains schema for FluxTimeseries. """ from openghg.store import DataSchema data_vars: dict[str, tuple[str, ...]] = {"flux_timeseries": ("time",)} dtypes = { "time": np.datetime64, "flux_timeseries": np.floating, } data_format = DataSchema(data_vars=data_vars, dtypes=dtypes) return data_format