from __future__ import annotations
from pathlib import Path
from typing import Any
import numpy as np
from xarray import Dataset
import logging
from openghg.store import DataSchema
from openghg.store.base import BaseStore
from openghg.util import load_standardise_parser, split_function_inputs
logger = logging.getLogger("openghg.store")
logger.setLevel(logging.DEBUG) # Have to set level for logger as well as handler
__all__ = ["EulerianModel"]
# TODO: Currently built around these keys but will probably need more unique distiguishers for different setups
# model name
# species
# date (start_date)
# ...
# setup (included as option for now)
[docs]
class EulerianModel(BaseStore):
"""This class is used to process Eulerian model data"""
_data_type = "eulerian_model"
_root = "EulerianModel"
_uuid = "63ff2365-3ba2-452a-a53d-110140805d06"
_metakey = f"{_root}/uuid/{_uuid}/metastore"
[docs]
def read_file(
self,
filepath: str | Path | list[str | Path],
model: str,
species: str,
source_format: str = "openghg",
start_date: str | None = None,
end_date: str | None = None,
setup: str | None = None,
tag: str | list | None = None,
if_exists: str = "auto",
save_current: str = "auto",
overwrite: bool = False,
force: bool = False,
compressor: Any | None = None,
filters: Any | None = None,
chunks: dict | None = None,
info_metadata: dict | None = None,
) -> list[dict]:
"""Read Eulerian model output
Args:
filepath: Path of Eulerian model species output
model: Eulerian model name
species: Species name
source_format: Data format, for example openghg (internal format)
start_date: Start date (inclusive) associated with model run
end_date: End date (exclusive) associated with model run
setup: Additional setup details for run
tag: Special tagged values to add to the Datasource. This will be added to any
current values if the tag key already exists in a list.
if_exists: What to do if existing data is present.
- "auto" - checks new and current data for timeseries overlap
- adds data if no overlap
- raises DataOverlapError if there is an overlap
- "new" - just include new data and ignore previous
- "combine" - replace and insert new data into current timeseries
save_current: Whether to save data in current form and create a new version.
- "auto" - this will depend on if_exists input ("auto" -> False), (other -> True)
- "y" / "yes" - Save current data exactly as it exists as a separate (previous) version
- "n" / "no" - Allow current data to updated / deleted
overwrite: Deprecated. This will use options for if_exists="new".
force: Force adding of data even if this is identical to data stored.
compressor: A custom compressor to use. If None, this will default to
`Blosc(cname="zstd", clevel=5, shuffle=Blosc.SHUFFLE)`.
See https://zarr.readthedocs.io/en/stable/api/codecs.html for more information on compressors.
filters: Filters to apply to the data on storage, this defaults to no filtering. See
https://zarr.readthedocs.io/en/stable/tutorial.html#filters for more information on picking filters.
chunks: Chunking schema to use when storing data. It expects a dictionary of dimension name and chunk size,
for example {"time": 100}. If None then a chunking schema will be set automatically by OpenGHG.
See documentation for guidance on chunking: https://docs.openghg.org/tutorials/local/Adding_data/Adding_ancillary_data.html#chunking.
To disable chunking pass in an empty dictionary.
info_metadata: Allows to pass in additional tags to describe the data. e.g {"comment":"Quality checks have been applied"}
"""
# TODO: As written, this currently includes some light assumptions that we're dealing with GEOSChem SpeciesConc format.
# May need to split out into multiple modules (like with ObsSurface) or into separate retrieve functions as needed.
# Get initial values which exist within this function scope using locals
# MUST be at the top of the function
fn_input_parameters = locals().copy()
from openghg.store.spec import define_standardise_parsers
from openghg.util import (
clean_string,
check_if_need_new_version,
)
model = clean_string(model)
species = clean_string(species)
start_date = clean_string(start_date)
end_date = clean_string(end_date)
setup = clean_string(setup)
# Specify any additional metadata to be added
additional_metadata = {}
if overwrite and if_exists == "auto":
logger.warning(
"Overwrite flag is deprecated in preference to `if_exists` (and `save_current`) inputs."
"See documentation for details of these inputs and options."
)
if_exists = "new"
# Making sure new version will be created by default if force keyword is included.
if force and if_exists == "auto":
if_exists = "new"
new_version = check_if_need_new_version(if_exists, save_current)
standardise_parsers = define_standardise_parsers()[self._data_type]
try:
source_format = standardise_parsers[source_format.upper()].value
except KeyError:
raise ValueError(f"Unknown data type {source_format} selected.")
# Loading parse
parser_fn = load_standardise_parser(data_type=self._data_type, source_format=source_format)
_, unseen_hashes = self.check_hashes(filepaths=filepath, force=force)
if not unseen_hashes:
return [{}]
filepath = next(iter(unseen_hashes.values()))
if chunks is None:
chunks = {}
# Get current parameter values and filter to only include function inputs
fn_current_parameters = locals().copy() # Make a copy of parameters passed to function
fn_input_parameters = {key: fn_current_parameters[key] for key in fn_input_parameters}
fn_input_parameters["filepath"] = filepath
# Define parameters to pass to the parser function and remaining keys
parser_input_parameters, additional_input_parameters = split_function_inputs(
fn_input_parameters, parser_fn
)
# Call appropriate standardisation function with input parameters
eulerian_model_data = parser_fn(**parser_input_parameters)
# # TODO: Add schema and validate methods so this can be checked against expected format
# for split_data in eulerian_model_data.values():
# em_data = split_data["data"]
# EulerianModel.validate_data(em_data)
# Check to ensure no required keys are being passed through info_metadata dict
self.check_info_keys(info_metadata)
if info_metadata is not None:
additional_metadata.update(info_metadata)
# Mop up and add additional keys to metadata which weren't passed to the parser
model_data = self.update_metadata(
eulerian_model_data, additional_input_parameters, additional_metadata
)
data_type = "eulerian_model"
datasource_uuids = self.assign_data(
data=model_data,
if_exists=if_exists,
new_version=new_version,
data_type=data_type,
compressor=compressor,
filters=filters,
)
# TODO: MAY NEED TO ADD BACK IN OR CAN DELETE
# update_keys = ["start_date", "end_date", "latest_version"]
# model_data = update_metadata(
# data_dict=model_data, uuid_dict=datasource_uuids, update_keys=update_keys
# )
# em_store.add_datasources(
# uuids=datasource_uuids, data=model_data, metastore=metastore, update_keys=update_keys
# )
logger.info(f"Completed processing: {filepath.name}.")
# Record the file hash in case we see this file again
self.store_hashes(unseen_hashes)
return datasource_uuids
[docs]
@staticmethod
def schema() -> DataSchema:
"""
Define schema for Eulerian model Dataset.
At present, this doesn't check the variables but does check that
"lat", "lon", "time" are included as appropriate types.
Returns:
DataSchema : Contains dummy schema for EulerianModel.
TODO: Decide on data_vars checks as we build up the use of this data_type
"""
data_vars: dict[str, tuple[str, ...]] = {}
dtypes = {"lat": np.floating, "lon": np.floating, "time": np.datetime64}
data_format = DataSchema(data_vars=data_vars, dtypes=dtypes)
return data_format
[docs]
@staticmethod
def validate_data(data: Dataset) -> None:
"""
Validate input data against EulerianModel schema - definition from
EulerianModel.schema() method.
Args:
data : xarray Dataset in expected format
Returns:
None
Raises a ValueError with details if the input data does not adhere
to the EulerianModel schema.
"""
data_schema = EulerianModel.schema()
data_schema.validate_data(data)