Source code for openghg.standardise.surface._crds

from pathlib import Path

from openghg.standardise.meta import dataset_formatter
from openghg.types import pathType
from openghg.util import (
    clean_string,
    find_duplicate_timestamps,
    format_inlet,
    get_site_info,
    load_internal_json,
)

import pandas as pd
from pandas import Timedelta


[docs] def parse_crds( filepath: pathType, site: str, network: str, inlet: str | None = None, instrument: str | None = None, sampling_period: str | float | int | None = None, drop_duplicates: bool = True, update_mismatch: str = "never", site_filepath: pathType | None = None, **kwargs: dict, ) -> dict: """Parses a CRDS data file and creates a dictionary of xarray Datasets ready for storage in the object store. Args: filepath: Path to file site: Three letter site code network: Network name inlet: Inlet height instrument: Instrument name sampling_period: Sampling period in seconds drop_duplicates: Drop measurements at duplicate timestamps, keeping the first. update_mismatch: This determines how mismatches between the internal data "attributes" and the supplied / derived "metadata" are handled. This includes the options: - "never" - don't update mismatches and raise an AttrMismatchError - "from_source" / "attributes" - update mismatches based on input data (e.g. data attributes) - "from_definition" / "metadata" - update mismatches based on associated data (e.g. site_info.json) site_filepath: Alternative site info file (see openghg/openghg_defs repository for format). Otherwise will use the data stored within openghg_defs/data/site_info JSON file by default. Returns: dict: Dictionary of gas data """ from openghg.standardise.meta import assign_attributes if not isinstance(filepath, Path): filepath = Path(filepath) inlet = format_inlet(inlet) # This may seem like an almost pointless function as this is all we do # but it makes it a lot easier to test assign_attributes gas_data = _read_data( filepath=filepath, site=site, network=network, inlet=inlet, instrument=instrument, sampling_period=sampling_period, drop_duplicates=drop_duplicates, ) gas_data = dataset_formatter(data=gas_data) # Ensure the data is CF compliant gas_data = assign_attributes( data=gas_data, site=site, sampling_period=sampling_period, update_mismatch=update_mismatch, site_filepath=site_filepath, ) return gas_data
def _get_raw_dataframe(filepath: Path, drop_duplicates: bool = True) -> pd.DataFrame: """Read raw CRDS data into dataframe with combined data for all gases. Args: filepath: path to raw data drop_duplicates: if True, drop duplicate times. Returns: dict mapping species to pd.DataFrame containing data for that gas Raises: ValueError: if duplicate times found and `drop_duplicates` is False. """ # parse with multiindex for columns combined_df = pd.read_csv(filepath, header=[0, 1], skiprows=1, sep=r"\s+") # parse dates datetime_series = ( combined_df.pop(("-", "date")).astype(str) + " " + combined_df.pop(("-", "time")).astype(str).str.zfill(6) ) combined_df["time"] = pd.to_datetime(datetime_series, format="%y%m%d %H%M%S") combined_df = combined_df.set_index("time") # drop duplicate times dupes = find_duplicate_timestamps(data=combined_df) if dupes: if not drop_duplicates: raise ValueError(f"Duplicate dates detected: {dupes}") combined_df = combined_df.loc[~combined_df.index.duplicated(keep="first")] return combined_df def _read_data( filepath: Path, site: str, network: str, inlet: str | None = None, instrument: str | None = None, sampling_period: str | float | int | None = None, site_filepath: pathType | None = None, drop_duplicates: bool = True, ) -> dict: """Read the datafile passed in and extract the data we require. Args: filepath: Path to file site: Three letter site code network: Network name inlet: Inlet height instrument: Instrument name sampling_period: Sampling period in seconds site_filepath: Alternative site info file (see openghg/openghg_defs repository for format). Otherwise will use the data stored within openghg_defs/data/site_info JSON file by default. drop_duplicates: Drop measurements at duplicate timestamps, keeping the first. Returns: dict: Dictionary of gas data """ split_fname = filepath.stem.split(".") site = site.lower() try: site_fname = clean_string(split_fname[0]) inlet_fname = clean_string(split_fname[3]) except IndexError: raise ValueError( "Error reading metadata from filename, we expect a form hfd.picarro.1minute.100m.dat" ) if site_fname != site: raise ValueError("Site mismatch between passed site code and that read from filename.") if "m" not in inlet_fname: raise ValueError("No inlet found, we expect filenames such as: bsd.picarro.1minute.108m.dat") if inlet is not None and inlet != inlet_fname: raise ValueError("Inlet mismatch between passed inlet and that read from filename.") else: inlet = inlet_fname # get data data = _get_raw_dataframe(filepath, drop_duplicates=drop_duplicates) # get metadata metadata = _read_metadata(filepath) # add port and type from raw data, then drop these columns metadata["type"] = data[("-", "type")].iloc[0] metadata["port"] = str(data[("-", "port")].iloc[0]) drop_cols = [col for col in data.columns if col[0] == "-"] data = data.drop(columns=drop_cols) if network is not None: metadata["network"] = network if sampling_period is not None: sampling_period = float(sampling_period) # Compare against value extracted from the file name file_sampling_period = Timedelta(seconds=float(metadata["sampling_period"])) given_sampling_period = Timedelta(seconds=sampling_period) comparison_seconds = abs(given_sampling_period - file_sampling_period).total_seconds() tolerance_seconds = 1 if comparison_seconds > tolerance_seconds: raise ValueError( f"Input sampling period {sampling_period} does not match to value " f"extracted from the file name of {metadata['sampling_period']} seconds." ) # Read the scale from JSON # I'll leave this here for the possible future movement from class to functions network_metadata = load_internal_json(filename="process_gcwerks_parameters.json") crds_metadata = network_metadata["CRDS"] # This dictionary is used to store the gas data and its associated metadata combined_data = {} # get gases from first level of column multi-index gases = set([col[0] for col in data.columns]) for gas in gases: gas_data = data.loc[:, gas] species = gas.lower() try: gas_data.columns = [species, f"{species}_variability", f"{species}_number_of_observations"] except ValueError as e: raise ValueError("CRDS data should have three columns per gas.") from e gas_data = gas_data.dropna(axis="index", how="any") gas_data[f"{species}_number_of_observations"] = gas_data[f"{species}_number_of_observations"].astype( int ) # Here we can convert the Dataframe to a Dataset and then write the attributes gas_data = gas_data.to_xarray() site_attributes = _get_site_attributes( site=site, inlet=inlet, crds_metadata=crds_metadata, site_filepath=site_filepath ) scale = crds_metadata["default_scales"].get(species.upper(), "NA") # Create a copy of the metadata dict species_metadata = metadata.copy() species_metadata["species"] = clean_string(species) species_metadata["inlet"] = format_inlet(inlet, key_name="inlet") species_metadata["calibration_scale"] = scale species_metadata["long_name"] = site_attributes["long_name"] species_metadata["data_type"] = "surface" # Make sure metadata keys are included in attributes site_attributes.update(species_metadata) combined_data[species] = { "metadata": species_metadata, "data": gas_data, "attributes": site_attributes, } return combined_data def _read_metadata(filepath: Path) -> dict: """Parse CRDS files and create a metadata dict. Args: filepath: Data filepath Returns: dict: Dictionary containing metadata """ # Split the filename to get the site and resolution split_filename = str(filepath.name).split(".") if len(split_filename) < 4: raise ValueError( "Error reading metadata from filename. The expected format is \ {site}.{instrument}.{sampling period}.{height}.dat" ) site = split_filename[0] instrument = split_filename[1] sampling_period_str = split_filename[2] inlet = split_filename[3] if sampling_period_str == "1minute": sampling_period = "60.0" elif sampling_period_str == "hourly": sampling_period = "3600.0" else: raise ValueError("Unable to read sampling period from filename.") metadata = {} metadata["site"] = site metadata["instrument"] = instrument metadata["sampling_period"] = str(sampling_period) metadata["inlet"] = format_inlet(inlet, key_name="inlet") return metadata def _get_site_attributes( site: str, inlet: str, crds_metadata: dict, site_filepath: pathType | None = None, ) -> dict: """Get site specific attributes for writing to Datasets. Args: site: Site name inlet: Inlet height, example: 108m crds_metadata: General CRDS metadata site_filepath: Alternative site info file (see openghg/openghg_defs repository for format). Otherwise will use the data stored within openghg_defs/data/site_info JSON file by default. Returns: dict: Dictionary of attributes """ try: site_attributes: dict = crds_metadata["sites"][site.upper()] global_attributes: dict = site_attributes["global_attributes"] except KeyError: raise ValueError(f"Unable to read attributes for site: {site}") # TODO - we need to combine the metadata full_site_metadata = get_site_info(site_filepath) attributes = global_attributes.copy() try: metadata = full_site_metadata[site.upper()] except KeyError: pass else: network_key = next(iter(metadata)) site_metadata = metadata[network_key] attributes["station_latitude"] = str(site_metadata["latitude"]) attributes["station_longitude"] = str(site_metadata["longitude"]) attributes["station_long_name"] = site_metadata["long_name"] attributes["station_height_masl"] = site_metadata["height_station_masl"] attributes["inlet_height_magl"] = format_inlet(inlet, key_name="inlet_height_magl") attributes["comment"] = crds_metadata["comment"] attributes["long_name"] = site_attributes["gcwerks_site_name"] return attributes