Source code for openghg.store._footprints

import logging
from collections import defaultdict
from pathlib import Path
from typing import DefaultDict, Dict, Literal, List, Optional, Tuple, Union, cast
import numpy as np
from openghg.store import DataSchema
from openghg.store.base import BaseStore
from xarray import Dataset

__all__ = ["Footprints"]

logger = logging.getLogger("openghg.store")
logger.setLevel(logging.DEBUG)  # Have to set level for logger as well as handler


[docs]class Footprints(BaseStore): """This class is used to process footprints model output""" _root = "Footprints" _uuid = "62db5bdf-c88d-4e56-97f4-40336d37f18c" _metakey = f"{_root}/uuid/{_uuid}/metastore"
[docs] @staticmethod def read_data(binary_data: bytes, metadata: Dict, file_metadata: Dict) -> Optional[Dict]: """Ready a footprint from binary data Args: binary_data: Footprint data metadata: Dictionary of metadata file_metadat: File metadata Returns: dict: UUIDs of Datasources data has been assigned to """ raise NotImplementedError("This branch doesn't currently support cloud.")
# with TemporaryDirectory() as tmpdir: # tmpdir_path = Path(tmpdir) # try: # filename = file_metadata["filename"] # except KeyError: # raise KeyError("We require a filename key for metadata read.") # filepath = tmpdir_path.joinpath(filename) # filepath.write_bytes(binary_data) # return Footprints.read_file(filepath=filepath, **metadata) # @staticmethod # def read_data(binary_data: bytes, metadata: Dict, file_metadata: Dict) -> Dict: # """Ready a footprint from binary data # Args: # binary_data: Footprint data # metadata: Dictionary of metadata # file_metadat: File metadata # Returns: # dict: UUIDs of Datasources data has been assigned to # """ # from openghg.store import assign_data, infer_date_range, load_metastore, datasource_lookup # fp = Footprints.load() # # Load in the metadata store # metastore = load_metastore(key=fp._metakey) # sha1_hash = file_metadata["sha1_hash"] # overwrite = metadata.get("overwrite", False) # if sha1_hash in fp._file_hashes and not overwrite: # print( # f"This file has been uploaded previously with the filename : {fp._file_hashes[sha1_hash]} - skipping." # ) # data_buf = BytesIO(binary_data) # fp_data = open_dataset(data_buf) # fp_time = fp_data.time # period = metadata.get("period") # continuous = metadata["continous"] # high_spatial_res = metadata["high_spatial_res"] # species = metadata["species"] # filename = file_metadata["filename"] # site = metadata["site"] # domain = metadata["domain"] # model = metadata["model"] # height = metadata["height"] # start_date, end_date, period_str = infer_date_range( # fp_time, filepath=filename, period=period, continuous=continuous # ) # metadata["start_date"] = str(start_date) # metadata["end_date"] = str(end_date) # metadata["time_period"] = period_str # metadata["max_longitude"] = round(float(fp_data["lon"].max()), 5) # metadata["min_longitude"] = round(float(fp_data["lon"].min()), 5) # metadata["max_latitude"] = round(float(fp_data["lat"].max()), 5) # metadata["min_latitude"] = round(float(fp_data["lat"].min()), 5) # # TODO: Pull out links to underlying data format into a separate format function # # - high_spatial_res - data vars - "fp_low", "fp_high", coords - "lat_high", "lon_high" # # - high_time_res - data vars - "fp_HiTRes", coords - "H_back" # metadata["spatial_resolution"] = "standard_spatial_resolution" # if high_spatial_res: # try: # metadata["max_longitude_high"] = round(float(fp_data["lon_high"].max()), 5) # metadata["min_longitude_high"] = round(float(fp_data["lon_high"].min()), 5) # metadata["max_latitude_high"] = round(float(fp_data["lat_high"].max()), 5) # metadata["min_latitude_high"] = round(float(fp_data["lat_high"].min()), 5) # metadata["spatial_resolution"] = "high_spatial_resolution" # except KeyError: # raise KeyError("Expected high spatial resolution. Unable to find lat_high or lon_high data.") # if species == "co2": # # Expect co2 data to have high time resolution # high_time_res = True # metadata["time_resolution"] = "standard_time_resolution" # if high_time_res: # if "fp_HiTRes" in fp_data: # metadata["time_resolution"] = "high_time_resolution" # else: # raise KeyError("Expected high time resolution. Unable to find fp_HiTRes data.") # metadata["heights"] = [float(h) for h in fp_data.height.values] # # Do we also need to save all the variables we have available in this footprints? # metadata["variables"] = list(fp_data.keys()) # # if model_params is not None: # # metadata["model_parameters"] = model_params # # Set the attributes of this Dataset # fp_data.attrs = {"author": "OpenGHG Cloud", "processed": str(timestamp_now())} # # This might seem longwinded now but will help when we want to read # # more than one footprints at a time # key = "_".join((site, domain, model, height)) # footprint_data: DefaultDict[str, Dict[str, Union[Dict, Dataset]]] = defaultdict(dict) # footprint_data[key]["data"] = fp_data # footprint_data[key]["metadata"] = metadata # # These are the keys we will take from the metadata to search the # # metadata store for a Datasource, they should provide as much detail as possible # # to uniquely identify a Datasource # required = ("site", "model", "height", "domain") # lookup_results = datasource_lookup(metastore=metastore, data=footprint_data, required_keys=required) # data_type = "footprints" # datasource_uuids: Dict[str, Dict] = assign_data( # data_dict=footprint_data, # lookup_results=lookup_results, # overwrite=overwrite, # data_type=data_type, # ) # fp.add_datasources(uuids=datasource_uuids, data=footprint_data, metastore=metastore) # # Record the file hash in case we see this file again # fp._file_hashes[sha1_hash] = filename # fp.save() # metastore.close() # return datasource_uuids # def _store_data(data: Dataset, metadata: Dict): # """ Takes an xarray Dataset # Args: # data: xarray Dataset # metadata: Metadata dict # Returns: # """
[docs] @staticmethod def read_file( filepath: Union[str, Path], site: str, domain: str, model: str, inlet: Optional[str] = None, height: Optional[str] = None, metmodel: Optional[str] = None, species: Optional[str] = None, network: Optional[str] = None, period: Optional[Union[str, tuple]] = None, chunks: Union[int, Dict, Literal["auto"], None] = None, continuous: bool = True, retrieve_met: bool = False, high_spatial_res: bool = False, high_time_res: bool = False, short_lifetime: bool = False, overwrite: bool = False, # model_params: Optional[Dict] = None, ) -> Optional[Dict]: """Reads footprints data files and returns the UUIDS of the Datasources the processed data has been assigned to Args: filepath: Path of file to load site: Site name domain: Domain of footprints model: Model used to create footprint (e.g. NAME or FLEXPART) inlet: Height above ground level in metres. Format 'NUMUNIT' e.g. "10m" height: Alias for inlet. One of height or inlet MUST be included. metmodel: Underlying meteorlogical model used (e.g. UKV) species: Species name. Only needed if footprint is for a specific species e.g. co2 (and not inert) network: Network name period: Period of measurements. Only needed if this can not be inferred from the time coords continuous: Whether time stamps have to be continuous. retrieve_met: Whether to also download meterological data for this footprints area high_spatial_res : Indicate footprints include both a low and high spatial resolution. high_time_res: Indicate footprints are high time resolution (include H_back dimension) Note this will be set to True automatically if species="co2" (Carbon Dioxide). short_lifetime: Indicate footprint is for a short-lived species. Needs species input. Note this will be set to True if species has an associated lifetime. overwrite: Overwrite any currently stored data Returns: dict: UUIDs of Datasources data has been assigned to """ # from xarray import load_dataset import xarray as xr from openghg.store import ( assign_data, datasource_lookup, infer_date_range, update_zero_dim, load_metastore, ) from openghg.util import clean_string, format_inlet, hash_file, species_lifetime, timestamp_now filepath = Path(filepath) site = clean_string(site) network = clean_string(network) domain = clean_string(domain) # Make sure `inlet` OR the alias `height` is included # Note: from this point only `inlet` variable should be used. if inlet is None and height is None: raise ValueError("One of inlet (or height) must be specified as an input") elif inlet is None: inlet = height # Try to ensure inlet is 'NUM''UNIT' e.g. "10m" inlet = clean_string(inlet) inlet = format_inlet(inlet) inlet = cast(str, inlet) fp = Footprints.load() # Load in the metadata store metastore = load_metastore(key=fp._metakey) file_hash = hash_file(filepath=filepath) if file_hash in fp._file_hashes and not overwrite: logger.warning( f"This file has been uploaded previously with the filename : {fp._file_hashes[file_hash]} - skipping." ) return None # Load this into memory fp_data = xr.open_dataset(filepath, chunks=chunks) if species == "co2": # Expect co2 data to have high time resolution if not high_time_res: logger.info("Updating high_time_res to True for co2 data") high_time_res = True if short_lifetime and not species: raise ValueError( "When indicating footprint is for short lived species, 'species' input must be included" ) elif not short_lifetime and species: lifetime = species_lifetime(species) if lifetime is not None: # TODO: May want to add a check on length of lifetime here logger.info("Updating short_lifetime to True since species has an associated lifetime") short_lifetime = True # Checking against expected format for footprints # Based on configuration (some user defined, some inferred) Footprints.validate_data( fp_data, high_spatial_res=high_spatial_res, high_time_res=high_time_res, short_lifetime=short_lifetime, ) # Need to read the metadata from the footprints and then store it # Do we need to chunk the footprints / will a Datasource store it correctly? metadata: Dict[str, Union[str, float, List[float]]] = {} metadata["data_type"] = "footprints" metadata["site"] = site metadata["domain"] = domain metadata["model"] = model # Include both inlet and height keywords for backwards compatability metadata["inlet"] = inlet metadata["height"] = inlet if species is not None: metadata["species"] = clean_string(species) if network is not None: metadata["network"] = clean_string(network) if metmodel is not None: metadata["metmodel"] = clean_string(metmodel) # Check if time has 0-dimensions and, if so, expand this so time is 1D if "time" in fp_data.coords: fp_data = update_zero_dim(fp_data, dim="time") fp_time = fp_data["time"] start_date, end_date, period_str = infer_date_range( fp_time, filepath=filepath, period=period, continuous=continuous ) metadata["start_date"] = str(start_date) metadata["end_date"] = str(end_date) metadata["time_period"] = period_str metadata["max_longitude"] = round(float(fp_data["lon"].max()), 5) metadata["min_longitude"] = round(float(fp_data["lon"].min()), 5) metadata["max_latitude"] = round(float(fp_data["lat"].max()), 5) metadata["min_latitude"] = round(float(fp_data["lat"].min()), 5) if high_spatial_res: try: metadata["max_longitude_high"] = round(float(fp_data["lon_high"].max()), 5) metadata["min_longitude_high"] = round(float(fp_data["lon_high"].min()), 5) metadata["max_latitude_high"] = round(float(fp_data["lat_high"].max()), 5) metadata["min_latitude_high"] = round(float(fp_data["lat_high"].min()), 5) metadata["spatial_resolution"] = "high_spatial_resolution" except KeyError: raise KeyError("Expected high spatial resolution. Unable to find lat_high or lon_high data.") else: metadata["spatial_resolution"] = "standard_spatial_resolution" if high_time_res: metadata["time_resolution"] = "high_time_resolution" else: metadata["time_resolution"] = "standard_time_resolution" metadata["heights"] = [float(h) for h in fp_data.height.values] # Do we also need to save all the variables we have available in this footprints? metadata["variables"] = list(fp_data.data_vars) # if model_params is not None: # metadata["model_parameters"] = model_params # Set the attributes of this Dataset fp_data.attrs = {"author": "OpenGHG Cloud", "processed": str(timestamp_now())} # This might seem longwinded now but will help when we want to read # more than one footprints at a time key = "_".join((site, domain, model, inlet)) footprint_data: DefaultDict[str, Dict[str, Union[Dict, Dataset]]] = defaultdict(dict) footprint_data[key]["data"] = fp_data footprint_data[key]["metadata"] = metadata # These are the keys we will take from the metadata to search the # metadata store for a Datasource, they should provide as much detail as possible # to uniquely identify a Datasource required = ("site", "model", "inlet", "domain") lookup_results = datasource_lookup(metastore=metastore, data=footprint_data, required_keys=required) data_type = "footprints" datasource_uuids: Dict[str, Dict] = assign_data( data_dict=footprint_data, lookup_results=lookup_results, overwrite=overwrite, data_type=data_type, ) fp.add_datasources(uuids=datasource_uuids, data=footprint_data, metastore=metastore) # Record the file hash in case we see this file again fp._file_hashes[file_hash] = filepath.name fp.save() fp_data.close() metastore.close() return datasource_uuids
[docs] @staticmethod def schema( particle_locations: bool = True, high_spatial_res: bool = False, high_time_res: bool = False, short_lifetime: bool = False, ) -> DataSchema: """ Define schema for footprint Dataset. The returned schema depends on what the footprint represents, indicated using the keywords. By default, this will include "fp" variable but this will be superceded if high_spatial_res or high_time_res are specified. Args: particle_locations: Include 4-directional particle location variables: - "particle_location_[nesw]" and include associated additional dimensions ("height") high_spatial_res : Set footprint variables include high and low resolution options: - "fp_low" - "fp_high" and include associated additional dimensions ("lat_high", "lon_high"). high_time_res : Set footprint variable to be high time resolution - "fp_HiTRes" and include associated dimensions ("H_back"). short_lifetime: Include additional particle age parameters for short lived species: - "mean_age_particles_[nesw]" """ # Names of data variables and associated dimensions (as a tuple) data_vars: Dict[str, Tuple[str, ...]] = {} # Internal data types of data variables and coordinates dtypes = { "lat": np.floating, # Covers np.float16, np.float32, np.float64 types "lon": np.floating, "time": np.datetime64, } if not high_time_res and not high_spatial_res: # Includes standard footprint variable data_vars["fp"] = ("time", "lat", "lon") dtypes["fp"] = np.floating if high_spatial_res: # Include options for high spatial resolution footprint # This includes footprint data on multiple resolutions data_vars["fp_low"] = ("time", "lat", "lon") data_vars["fp_high"] = ("time", "lat_high", "lon_high") dtypes["fp_low"] = np.floating dtypes["fp_high"] = np.floating if high_time_res: # Include options for high time resolution footprint (usually co2) # This includes a footprint data with an additional hourly back dimension data_vars["fp_HiTRes"] = ("time", "lat", "lon", "H_back") dtypes["fp_HiTRes"] = np.floating dtypes["H_back"] = np.number # float or integer # Includes particle location directions - one for each regional boundary if particle_locations: data_vars["particle_locations_n"] = ("time", "lon", "height") data_vars["particle_locations_e"] = ("time", "lat", "height") data_vars["particle_locations_s"] = ("time", "lon", "height") data_vars["particle_locations_w"] = ("time", "lat", "height") dtypes["height"] = np.floating dtypes["particle_locations_n"] = np.floating dtypes["particle_locations_e"] = np.floating dtypes["particle_locations_s"] = np.floating dtypes["particle_locations_w"] = np.floating # TODO: Could also add check for meteorological + other data # "pressure", "wind_speed", "wind_direction", "PBLH" # "release_lon", "release_lat" # Include options for short lifetime footprints (short-lived species) # This includes additional particle ages (allow calculation of decay based on particle lifetimes) if short_lifetime: data_vars["mean_age_particles_n"] = ("time", "lon", "height") data_vars["mean_age_particles_e"] = ("time", "lat", "height") data_vars["mean_age_particles_s"] = ("time", "lon", "height") data_vars["mean_age_particles_w"] = ("time", "lat", "height") dtypes["mean_age_particles_n"] = np.floating dtypes["mean_age_particles_e"] = np.floating dtypes["mean_age_particles_s"] = np.floating dtypes["mean_age_particles_w"] = np.floating data_format = DataSchema(data_vars=data_vars, dtypes=dtypes) return data_format
[docs] @staticmethod def validate_data( data: Dataset, particle_locations: bool = True, high_spatial_res: bool = False, high_time_res: bool = False, short_lifetime: bool = False, ) -> None: """ Validate data against Footprint schema - definition from Footprints.schema(...) method. Args: data : xarray Dataset in expected format See Footprints.schema() method for details on optional inputs. Returns: None Raises a ValueError with details if the input data does not adhere to the Footprints schema. """ data_schema = Footprints.schema( particle_locations=particle_locations, high_spatial_res=high_spatial_res, high_time_res=high_time_res, short_lifetime=short_lifetime, ) data_schema.validate_data(data)