Source code for openghg.standardise.meta._attributes

from typing import Any, Dict, Hashable, List, Optional, Tuple, Union, cast
import logging
from xarray import Dataset
from openghg.types import optionalPathType

__all__ = [
    "assign_attributes",
    "get_attributes",
    "define_species_label",
    "assign_flux_attributes",
    "get_flux_attributes",
    "dataset_formatter",
    "data_variable_formatter",
]

logger = logging.getLogger("openghg.standardise")
logger.setLevel(logging.DEBUG)  # Have to set level for logger as well as handler


[docs] def assign_attributes( data: Dict, site: Optional[str] = None, network: Optional[str] = None, sampling_period: Optional[Union[str, float, int]] = None, update_mismatch: str = "never", site_filepath: optionalPathType = None, species_filepath: optionalPathType = None, ) -> Dict: """Assign attributes to each site and species dataset. This ensures that the xarray Datasets produced are CF 1.7 compliant. Some of the attributes written to the Dataset are saved as metadata to the Datasource allowing more detailed searching of data. If accessing underlying stored site or species definitions, this will be accessed from the openghg/openghg_defs repository by default. Args: data: Dictionary containing data, metadata and attributes site: Site code sampling_period: Number of seconds for which air sample is taken. Only for time variable attribute network: Network name update_mismatch: This determines how mismatches between the internal data "attributes" and the supplied / derived "metadata" are handled. This includes the options: - "never" - don't update mismatches and raise an AttrMismatchError - "from_source" / "attributes" - update mismatches based on input data (e.g. data attributes) - "from_definition" / "metadata" - update mismatches based on associated data (e.g. site_info.json) site_filepath: Alternative site info file species_filepath: Alternative species info file Returns: dict: Dictionary of combined data with correct attributes assigned to Datasets """ from openghg.standardise.meta import sync_surface_metadata for _, gas_data in data.items(): site_attributes = gas_data.get("attributes", {}) species = gas_data["metadata"]["species"] if site is None: site = gas_data.get("metadata", {}).get("site") if network is None: network = gas_data.get("metadata", {}).get("network") units = gas_data.get("metadata", {}).get("units") scale = gas_data.get("metadata", {}).get("calibration_scale") if sampling_period is None: sampling_period = str(gas_data.get("metadata", {}).get("sampling_period", "NOT_SET")) gas_data["data"] = get_attributes( ds=gas_data["data"], species=species, site=site, network=network, units=units, scale=scale, global_attributes=site_attributes, sampling_period=sampling_period, site_filepath=site_filepath, species_filepath=species_filepath, ) measurement_data = gas_data["data"] metadata = gas_data["metadata"] attrs = measurement_data.attrs metadata_aligned, attrs_aligned = sync_surface_metadata( metadata=metadata, attributes=attrs, update_mismatch=update_mismatch ) gas_data["metadata"] = metadata_aligned gas_data["attributes"] = attrs_aligned measurement_data.attrs = gas_data["attributes"] return data
[docs] def get_attributes( ds: Dataset, species: str, site: str, network: Optional[str] = None, global_attributes: Optional[Dict[str, str]] = None, units: Optional[str] = None, scale: Optional[str] = None, sampling_period: Optional[Union[str, float, int]] = None, date_range: Optional[List[str]] = None, site_filepath: optionalPathType = None, species_filepath: optionalPathType = None, ) -> Dataset: """ This function writes attributes to an xarray.Dataset so that they conform with the CF Convention v1.6 Attributes of the xarray DataSet are modified, and variable names are changed If accessing underlying stored site or species definitions, this will be accessed from the openghg/openghg_defs repository by default. Variable naming related to species name will be defined using define_species_label() function. Args: ds: Should contain variables such as "ch4", "ch4 repeatability". Must have a "time" dimension. species: Species name. e.g. "CH4", "HFC-134a", "dCH4C13" site: Three-letter site code network: Network site is associated with global_attribuates: Dictionary containing any info you want to add to the file header (e.g. {"Contact": "Contact_Name"}) units: This routine will try to guess the units unless this is specified. Options are in units_interpret scale: Calibration scale for species. sampling_period: Number of seconds for which air sample is taken. Only for time variable attribute date_range: Start and end date for output If you only want an end date, just put a very early start date (e.g. ["1900-01-01", "2010-01-01"]) site_filepath: Alternative site info file species_filepath: Alternative species info file """ from openghg.util import load_internal_json, timestamp_now, get_species_info from pandas import Timestamp as pd_Timestamp if not isinstance(ds, Dataset): raise TypeError("This function only accepts xarray Datasets") # Load attributes files species_attrs = get_species_info() attributes_data = load_internal_json(filename="attributes.json") unit_interpret = attributes_data["unit_interpret"] unit_mol_fraction = attributes_data["unit_mol_fraction"] unit_non_standard = attributes_data["unit_non_standard"] # Extract both label to use for species and key for attributes # Typically species_label will be the lower case version of species_key species_label, species_key = define_species_label(species, species_filepath) # Global attributes global_attributes_default = { "conditions_of_use": "Ensure that you contact the data owner at the outset of your project.", "source": "In situ measurements of air", "Conventions": "CF-1.8", } if global_attributes is not None: # TODO - for some reason mypy doesn't see a Dict[str,str] as a valid Mapping[Hashable, Any] type global_attributes.update(global_attributes_default) # type: ignore else: global_attributes = global_attributes_default global_attributes["file_created"] = str(timestamp_now()) global_attributes["processed_by"] = "OpenGHG_Cloud" global_attributes["species"] = species_label if scale is None: global_attributes["calibration_scale"] = "unknown" else: global_attributes["calibration_scale"] = scale if sampling_period is None: global_attributes["sampling_period"] = "NOT_SET" else: global_attributes["sampling_period"] = str(sampling_period) global_attributes["sampling_period_unit"] = "s" # 04/2023: Switched around global and site attributes so # global attributes now supercede site attributes. # Add some site attributes site_attributes = _site_info_attributes(site.upper(), network, site_filepath) ds.attrs.update(site_attributes) # Update the Dataset attributes ds.attrs.update(global_attributes) # type: ignore # Species-specific attributes # Extract long name try: name = species_attrs[species_key]["name"] except KeyError: name = species_label # Extract units if not defined if units is None: try: units = species_attrs[species_key]["units"] except KeyError: units = "" # Define label based on units if units in unit_mol_fraction: sp_long = f"mole_fraction_of_{name}_in_air" else: sp_long = name ancillary_variables = [] variable_names = cast(Dict[str, Any], ds.variables) # Write units as attributes to variables containing any of these match_words = ["variability", "repeatability", "stdev", "count"] for key in variable_names: key = key.lower() if species_label in key: # Standard name attribute # ds[key].attrs["standard_name"]=key.replace(species_label, sp_long) ds[key].attrs["long_name"] = key.replace(species_label, sp_long) # If units are required for variable, add attribute if key == species_label or any(word in key for word in match_words): if units in unit_interpret: ds[key].attrs["units"] = unit_interpret[units] # If units are non-standard, add details if units in unit_non_standard: ds[key].attrs["units_description"] = units elif units == "": ds[key].attrs["units"] = unit_interpret["else"] else: ds[key].attrs["units"] = units # Add to list of ancilliary variables if key != species_label: ancillary_variables.append(key) # TODO - for the moment skip this step - check status of ancilliary variables in standard # Write ancilliary variable list # ds[species_label_lower].attrs["ancilliary_variables"] = ", ".join(ancillary_variables) # Add quality flag attributes # NOTE - I've removed the whitespace before status_flag and integration_flag here variable_names = cast(Dict[str, Any], ds.variables) quality_flags = [key for key in variable_names if "status_flag" in key] for key in quality_flags: ds[key] = ds[key].astype(int) try: long_name = ds[species_label].attrs["long_name"] except KeyError: raise KeyError(key, quality_flags) ds[key].attrs = { "flag_meaning": "0 = unflagged, 1 = flagged", "long_name": f"{long_name} status_flag", } variable_names = cast(Dict[str, Any], ds.variables) # Add integration flag attributes integration_flags = [key for key in variable_names if "integration_flag" in key] for key in integration_flags: ds[key] = ds[key].astype(int) long_name = ds[species_label].attrs["long_name"] ds[key].attrs = { "flag_meaning": "0 = area, 1 = height", "standard_name": f"{long_name} integration_flag", "comment": "GC peak integration method (by height or by area). Does not indicate data quality", } first_year = pd_Timestamp(str(ds.time[0].values)).year ds.time.encoding = {"units": f"seconds since {str(first_year)}-01-01 00:00:00"} time_attributes: Dict[str, str] = {} time_attributes["label"] = "left" time_attributes["standard_name"] = "time" time_attributes["comment"] = ( "Time stamp corresponds to beginning of sampling period. " + "Time since midnight UTC of reference date. " + "Note that sampling periods are approximate." ) if sampling_period is not None: time_attributes["sampling_period_seconds"] = str(sampling_period) ds.time.attrs.update(time_attributes) # If a date range is specified, slice dataset if date_range: ds = ds.loc[dict(time=slice(*date_range))] return ds
[docs] def define_species_label(species: str, species_filepath: optionalPathType = None) -> Tuple[str, str]: """Define standardised label to use for observation datasets. This uses the data stored within openghg_defs/data/site_info JSON file by default with alternative names ('alt') defined within. Formatting: - species label will be all lower case - any spaces will be replaced with underscores - if species or synonym cannot be found, species name will used but with any hyphens taken out (see also openghg.util.clean_string function) Note: Suggested naming for isotopologues should be d<species><isotope>, e.g. dCH4C13, or dCO2C14 Args: species : Species name. species_filepath : Alternative species info file. Returns: str, str: Both the species label to be used exactly and the original attribute key needed to extract additional data from the 'site_info.json' attributes file. Example: >>> define_species_label("methane") ("ch4", "CH4") >>> define_species_label("radon") ("rn", "Rn") >>> define_species_label("cfc-11") ("cfc11", "CFC11") >>> define_species_label("CH4C13") ("dch4c13", "DCH4C13") """ from openghg.util import clean_string, synonyms # Extract species label using synonyms function try: species_label = synonyms( species, lower=False, allow_new_species=False, species_filepath=species_filepath ) except ValueError: species_underscore = species.replace(" ", "_") species_remove_dash = species_underscore.replace("-", "") species_label = clean_string(species_remove_dash) species_label_lower = species_label.lower() return species_label_lower, species_label
def _site_info_attributes( site: str, network: Optional[str] = None, site_filepath: optionalPathType = None ) -> Dict: """Reads site attributes from JSON This uses the data stored within openghg_defs/data/site_info JSON file by default. Args: site: Site code network: Network name site_filepath: Alternative site info file Returns: dict: Dictionary of site attributes """ from openghg.util import get_site_info site = site.upper() # Read site info file site_data = get_site_info(site_filepath) if network is None: network = next(iter(site_data[site])) else: network = network.upper() attributes_dict = { "longitude": "station_longitude", "latitude": "station_latitude", "long_name": "station_long_name", "height_station_masl": "station_height_masl", } attributes = {} if site in site_data: for attr in attributes_dict: try: if attr in site_data[site][network]: attr_key = attributes_dict[attr] attributes[attr_key] = site_data[site][network][attr] except KeyError: pass else: logger.info( f"We haven't seen site {site} before, please let us know so we can update our records." + "\nYou can help us by opening an issue on GitHub for our supplementary data: https://github.com/openghg/openghg_defs" ) # TODO - log not seen site message here # raise ValueError(f"Invalid site {site} passed. Please use a valid site code such as BSD for Bilsdale") return attributes
[docs] def assign_flux_attributes( data: Dict, species: Optional[str] = None, source: Optional[str] = None, domain: Optional[str] = None, units: str = "mol/m2/s", prior_info_dict: Optional[dict] = None, ) -> Dict: """ Assign attributes for the input flux dataset within dictionary based on metadata and passed arguments. Args: data: Dictionary containing data, metadata and attributes species: Species name source: Source name domain: Domain name units: Unit values for the "flux" variable. Default = "mol/m2/s" prior_info_dict: Dictionary of additional 'prior' information about for the emissions sources. Expect this to be of the form e.g. {"EDGAR": {"version": "v4.3.2", "raw_resolution": "0.1 degree x 0.1 degree", "reference": "http://edgar.jrc.ec.europa.eu/overview.php?v=432_GHG" ...}, ...} Returns: Dict : Same format as inputted but with updated "data" component (Dataset) """ for flux_dict in data.values(): flux_attributes = flux_dict.get("attributes", {}) # Ensure values for these attributes have been specified either manually # or within metadata. attribute_values = {"species": species, "source": source, "domain": domain} metadata = flux_dict["metadata"] for attr, value in attribute_values.items(): if value is None: try: attribute_values[attr] = metadata[attr] except KeyError: raise ValueError(f"Attribute {attr} must be specified.") input_attributes = cast(Dict[str, str], attribute_values) flux_dict["data"] = get_flux_attributes( ds=flux_dict["data"], units=units, prior_info_dict=prior_info_dict, global_attributes=flux_attributes, **input_attributes, ) return data
def get_flux_attributes( ds: Dataset, species: str, source: str, domain: str, units: str = "mol/m2/s", prior_info_dict: Optional[dict] = None, global_attributes: Optional[Dict[Hashable, Any]] = None, ) -> Dataset: """ Assign additional attributes for the flux dataset. Args: ds: Should contain "flux" variable species: Species name source: Source name domain: Domain name units: Unit values for the "flux" variable. Default = "mol/m2/s" prior_info_dict: Dictionary of additional 'prior' information about for the emissions sources. Expect this to be of the form e.g. {"EDGAR": {"version": "v4.3.2", "raw_resolution": "0.1 degree x 0.1 degree", "reference": "http://edgar.jrc.ec.europa.eu/overview.php?v=432_GHG" ...}, ...} global_attributes: Additional global attributes to write to dataset. Returns: Dataset: Input dataset with updated variable/coordinate and global attributes """ # Example flux attributes (from files) # :title = "EDGAR 4.3.2 year 2004" ; # :author = "ag12733" ; # :date_created = "2018-07-16 13:10:57.346915" ; # :number_of_prior_files_used = 1L ; # :prior_file_1 = "EDGAR" ; # :prior_file_1_version = "/data/shared/Gridded_fluxes/N2O/EDGAR_v4.3.2/v432_N2O_TOTALS_nc/v432_N2O_2004.0.1x0.1.nc" ; # :prior_file_1_raw_resolution = "0.1 degree x 0.1 degree" ; # :prior_file_1_reference = "http://edgar.jrc.ec.europa.eu/overview.php?v=432_GHG" ; # :regridder_used = "acrg_grid.regrid.regrid_3D" ; from openghg.util import timestamp_now # Define species variable/coordinate attributes and assign flux_attrs = {"source": source, "units": units, "species": species} lat_attrs = {"long_name": "latitude", "units": "degrees_north", "notes": "centre of cell"} lon_attrs = {"long_name": "longitude", "units": "degrees_east", "notes": "centre of cell"} ds["flux"].attrs = flux_attrs ds["lat"].attrs = lat_attrs ds["lon"].attrs = lon_attrs # Define default values for global attributes global_attributes_default: Dict[Hashable, Any] = { "conditions_of_use": "Ensure that you contact the data owner at the outset of your project.", "Conventions": "CF-1.8", } if global_attributes is None: global_attributes = global_attributes_default else: global_attributes.update(global_attributes_default) # Extract any current attributes from the Dataset current_attributes = ds.attrs # Extract "title" from current attributes or define this. if "title" in current_attributes and "title" not in global_attributes: global_attributes["title"] = current_attributes["title"] else: global_attributes["title"] = f"{source} emissions/flux of {species} for {domain} domain" if "file_created" not in global_attributes: global_attributes["file_created"] = str(timestamp_now()) if "process_by" not in global_attributes: global_attributes["processed_by"] = "OpenGHG_Cloud" species_label = define_species_label(species) global_attributes["species"] = species_label global_attributes["source"] = source global_attributes["domain"] = domain # Add any 'prior' information for flux / emissions databases. if prior_info_dict is not None: # For composite flux / emissions files this may contain > 1 prior input global_attributes["number_of_prior_files_used"] = len(prior_info_dict.keys()) for i, source_key in enumerate(prior_info_dict.keys()): prior_number = i + 1 label_start = f"prior_file_{prior_number}" global_attributes[label_start] = source_key for key, value in prior_info_dict[source_key].items(): attr_key = f"{label_start}_{key}" global_attributes[attr_key] = value # Ensure keys which have been updated by OpenGHG are not overwritten # by current attributes. updated_keys = ["Conventions", "title", "file_created", "processed_by"] for key in updated_keys: if key in current_attributes: current_attributes.pop(key) global_attributes.update(current_attributes) ds.attrs = global_attributes return ds def dataset_formatter( data: Dict, ) -> Dict: """ Formats species/variables from the dataset by removing the whitespaces with underscores and species to lower case Args: data: Dict containing dataset information(gas_data) Returns: Dict: Dictionary of source_name : data, metadata, attributes """ for _, gas_data in data.items(): species = gas_data["metadata"]["species"] species_label, species_key = define_species_label(species) gas_data["data"] = data_variable_formatter( ds=gas_data["data"], species=species, species_label=species_label ) return data def data_variable_formatter(ds: Dataset, species: str, species_label: str) -> Dataset: """ Formats variables from the dataset by removing the whitespaces with underscores and species data var to lower case Args: ds: Should contain variables such as "ch4", "ch4 repeatability". Must have a "time" dimension. species: Species name species_label: Species label Returns: ds: xarray dataset """ variable_names = cast(Dict[str, Any], ds.variables) to_underscores = {var: var.lower().replace(" ", "_") for var in variable_names} to_underscores.pop("time") # Added to remove warning around resetting time index. ds = ds.rename(to_underscores) # type: ignore species_lower = species.lower() species_search = species_lower.replace(" ", "_") variable_names = cast(Dict[str, Any], ds.variables) matched_keys = [var for var in variable_names if species_search in var] # If we don't have any variables to rename, raise an error if not matched_keys: raise NameError(f"Cannot find species {species_search} in Dataset variables") species_rename = {} for var in matched_keys: species_rename[var] = var.replace(species_search, species_label) ds = ds.rename(species_rename) return ds