from __future__ import annotations
import logging
from pathlib import Path
from tempfile import TemporaryDirectory
import numpy as np
from xarray import Dataset
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
if TYPE_CHECKING:
from openghg.store import DataSchema
from openghg.store.base import BaseStore
__all__ = ["FluxTimeseries"]
logger = logging.getLogger("openghg.store")
logger.setLevel(logging.DEBUG) # Have to set level for logger as well as handler
[docs]
class FluxTimeseries(BaseStore):
"""This class is used to process ond dimension timeseries data"""
_data_type = "flux_timeseries"
""" _root = "FluxTimeseries"
_uuid = "099b597b-0598-4efa-87dd-472dfe027f5d8"
_metakey = f"{_root}/uuid/{_uuid}/metastore"""
[docs]
def read_data(self, binary_data: bytes, metadata: Dict, file_metadata: Dict) -> Optional[Dict]:
"""Ready a footprint from binary data
Args:
binary_data: Footprint data
metadata: Dictionary of metadata
file_metadat: File metadata
Returns:
dict: UUIDs of Datasources data has been assigned to
"""
with TemporaryDirectory() as tmpdir:
tmpdir_path = Path(tmpdir)
try:
filename = file_metadata["filename"]
except KeyError:
raise KeyError("We require a filename key for metadata read.")
filepath = tmpdir_path.joinpath(filename)
filepath.write_bytes(binary_data)
return self.read_file(filepath=filepath, **metadata)
[docs]
def read_file(
self,
filepath: Union[str, Path],
species: str,
source: str,
region: str,
domain: Optional[str] = None,
database: Optional[str] = None,
database_version: Optional[str] = None,
model: Optional[str] = None,
source_format: str = "crf",
period: Optional[Union[str, tuple]] = None,
continuous: bool = True,
if_exists: str = "auto",
save_current: str = "auto",
overwrite: bool = False,
force: bool = False,
compressor: Optional[Any] = None,
filters: Optional[Any] = None,
optional_metadata: Optional[Dict] = None,
) -> dict:
"""Read one dimension timeseries file
Args:
filepath: Path of flux timeseries / emissions timeseries file
species: Species name
domain: Region for Flux timeseries
source: Source of the emissions data, e.g. "energy", "anthro", default is 'anthro'.
region: Region/Country of the CRF data
domain: Geographic domain, default is 'None'. Instead region is used to identify area
database: Name of database source for this input (if relevant)
database_version: Name of database version (if relevant)
model: Model name (if relevant)
source_format : Type of data being input e.g. openghg (internal format)
period: Period of measurements. Only needed if this can not be inferred from the time coords
If specified, should be one of:
- "yearly", "monthly"
- suitable pandas Offset Alias
- tuple of (value, unit) as would be passed to pandas.Timedelta function
continuous: Whether time stamps have to be continuous.
if_exists: What to do if existing data is present.
- "auto" - checks new and current data for timeseries overlap
- adds data if no overlap
- raises DataOverlapError if there is an overlap
- "new" - just include new data and ignore previous
- "combine" - replace and insert new data into current timeseries
save_current: Whether to save data in current form and create a new version.
- "auto" - this will depend on if_exists input ("auto" -> False), (other -> True)
- "y" / "yes" - Save current data exactly as it exists as a separate (previous) version
- "n" / "no" - Allow current data to updated / deleted
overwrite: Deprecated. This will use options for if_exists="new".
force: Force adding of data even if this is identical to data stored.
compressor: A custom compressor to use. If None, this will default to
`Blosc(cname="zstd", clevel=5, shuffle=Blosc.SHUFFLE)`.
See https://zarr.readthedocs.io/en/stable/api/codecs.html for more information on compressors.
filters: Filters to apply to the data on storage, this defaults to no filtering. See
https://zarr.readthedocs.io/en/stable/tutorial.html#filters for more information on picking filters.
optional_metadata: Allows to pass in additional tags to distinguish added data. e.g {"project":"paris", "baseline":"Intem"}
Returns:
dict: Dictionary of datasource UUIDs data assigned to
"""
# Get initial values which exist within this function scope using locals
# MUST be at the top of the function
fn_input_parameters = locals().copy()
from openghg.store.spec import define_standardise_parsers
from openghg.util import (
clean_string,
load_standardise_parser,
check_if_need_new_version,
split_function_inputs,
)
species = clean_string(species)
source = clean_string(source)
region = clean_string(region)
if domain:
domain = clean_string(domain)
# Specify any additional metadata to be added
additional_metadata = {}
if overwrite and if_exists == "auto":
logger.warning(
"Overwrite flag is deprecated in preference to `if_exists` (and `save_current`) inputs."
"See documentation for details of these inputs and options."
)
if_exists = "new"
# Making sure new version will be created by default if force keyword is included.
if force and if_exists == "auto":
if_exists = "new"
new_version = check_if_need_new_version(if_exists, save_current)
filepath = Path(filepath)
standardise_parsers = define_standardise_parsers()[self._data_type]
try:
source_format = standardise_parsers[source_format.upper()].value
except KeyError:
raise ValueError(f"Unknown data type {source_format} selected.")
# Load the data retrieve object
parser_fn = load_standardise_parser(data_type=self._data_type, source_format=source_format)
# Get current parameter values and filter to only include function inputs
fn_current_parameters = locals().copy() # Make a copy of parameters passed to function
fn_input_parameters = {key: fn_current_parameters[key] for key in fn_input_parameters}
_, unseen_hashes = self.check_hashes(filepaths=filepath, force=force)
if not unseen_hashes:
return {}
filepath = next(iter(unseen_hashes.values()))
# Define parameters to pass to the parser function and remaining keys
parser_input_parameters, additional_input_parameters = split_function_inputs(
fn_input_parameters, parser_fn
)
flux_timeseries_data = parser_fn(**parser_input_parameters)
# Checking against expected format for Flux
for split_data in flux_timeseries_data.values():
em_data = split_data["data"]
FluxTimeseries.validate_data(em_data)
# Check to ensure no required keys are being passed through optional_metadata dict
self.check_info_keys(optional_metadata)
if optional_metadata is not None:
additional_metadata.update(optional_metadata)
# Mop up and add additional keys to metadata which weren't passed to the parser
flux_timeseries_data = self.update_metadata(
flux_timeseries_data, additional_input_parameters, additional_metadata
)
data_type = "flux_timeseries"
datasource_uuids = self.assign_data(
data=flux_timeseries_data,
if_exists=if_exists,
new_version=new_version,
data_type=data_type,
compressor=compressor,
filters=filters,
)
# Record the file hash in case we see this file again
self.store_hashes(unseen_hashes)
return datasource_uuids
[docs]
@staticmethod
def validate_data(data: Dataset) -> None:
"""
Validate input data against FluxTimeseries schema - definition from
FluxTimeseries.schema() method.
Args:
data : xarray Dataset in expected format
Returns:
None
Raises: ValueError if the input data does not match the schema
to the FluxTimeseries schema.
"""
data_schema = FluxTimeseries.schema()
data_schema.validate_data(data)
[docs]
@staticmethod
def schema() -> DataSchema:
"""
Define schema for one dimensional timeseries(FluxTimeseries) Dataset.
Includes observation for each time of the defined domain:
- "Obs"
- expected dimensions: ("time")
Expected data types for all variables and coordinates also included.
Returns:
DataSchema : Contains schema for FluxTimeseries.
"""
from openghg.store import DataSchema
data_vars: Dict[str, Tuple[str, ...]] = {"flux_timeseries": ("time",)}
dtypes = {
"time": np.datetime64,
"flux_timeseries": np.floating,
}
data_format = DataSchema(data_vars=data_vars, dtypes=dtypes)
return data_format