Source code for openghg.standardise.surface._gcwerks

from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from pandas import DataFrame

from openghg.standardise.meta import dataset_formatter
from openghg.types import optionalPathType


def find_files(
    data_path: Union[str, Path], skip_str: Union[str, List[str]] = "sf6"
) -> List[Tuple[Path, Path]]:
    """A helper file to find GCWERKS data and precisions file in a given folder.
    It searches for .C files of the format macehead.19.C, looks for a precisions file
    named macehead.19.precions.C and if it exists creates a tuple for these files.

    Please note the limited scope of this function, it will only work with
    files that are named in the correct pattern.

    Args:
        data_path: Folder path to search
        skip_str: String or list of strings, if found in filename these files are skipped
    Returns:
        list: List of tuples
    """
    import re
    from pathlib import Path

    data_path = Path(data_path)

    files = data_path.glob("*.C")

    if not isinstance(skip_str, list):
        skip_str = [skip_str]

    data_regex = re.compile(r"[\w'-]+\.\d+.C")

    data_precision_tuples = []
    for file in files:
        data_match = data_regex.match(file.name)

        if data_match:
            prec_filepath = data_path / Path(Path(file).stem + ".precisions.C")
            filepath = data_path / data_match.group()

            if any(s in data_match.group() for s in skip_str):
                continue

            if prec_filepath.exists():
                data_precision_tuples.append((filepath, prec_filepath))

    data_precision_tuples.sort()

    return data_precision_tuples


[docs] def parse_gcwerks( filepath: Union[str, Path], precision_filepath: Union[str, Path], site: str, network: str, inlet: Optional[str] = None, instrument: Optional[str] = None, sampling_period: Optional[str] = None, measurement_type: Optional[str] = None, update_mismatch: str = "never", site_filepath: optionalPathType = None, ) -> Dict: """Reads a GC data file by creating a GC object and associated datasources Args: filepath: Path of data file precision_filepath: Path of precision file site: Three letter code or name for site instrument: Instrument name network: Network name update_mismatch: This determines how mismatches between the internal data "attributes" and the supplied / derived "metadata" are handled. This includes the options: - "never" - don't update mismatches and raise an AttrMismatchError - "from_source" / "attributes" - update mismatches based on input data (e.g. data attributes) - "from_definition" / "metadata" - update mismatches based on associated data (e.g. site_info.json) site_filepath: Alternative site info file (see openghg/openghg_defs repository for format). Otherwise will use the data stored within openghg_defs/data/site_info JSON file by default. Returns: dict: Dictionary of source_name : UUIDs """ from pathlib import Path from openghg.standardise.meta import assign_attributes from openghg.util import clean_string, load_internal_json filepath = Path(filepath) precision_filepath = Path(precision_filepath) # Do some setup for processing # Load site data gcwerks_data = load_internal_json(filename="process_gcwerks_parameters.json") gc_params = gcwerks_data["GCWERKS"] network = clean_string(network) # We don't currently do anything with inlet here as it's always read from data # or taken from process_gcwerks_parameters.json if inlet is not None: inlet = clean_string(inlet) if instrument is not None: instrument = clean_string(instrument) # Check if the site code passed matches that read from the filename site = _check_site( filepath=filepath, site_code=site, gc_params=gc_params, ) # If we're not passed the instrument name and we can't find it raise an error if instrument is None: instrument = _check_instrument(filepath=filepath, gc_params=gc_params, should_raise=True) else: fname_instrument = _check_instrument(filepath=filepath, gc_params=gc_params, should_raise=False) if fname_instrument is not None and instrument != fname_instrument: raise ValueError( f"Mismatch between instrument passed as argument {instrument} and instrument read from filename {fname_instrument}" ) instrument = str(instrument) gas_data = _read_data( filepath=filepath, precision_filepath=precision_filepath, site=site, instrument=instrument, network=network, sampling_period=sampling_period, gc_params=gc_params, ) gas_data = dataset_formatter(data=gas_data) # Assign attributes to the data for CF compliant NetCDFs gas_data = assign_attributes( data=gas_data, site=site, update_mismatch=update_mismatch, site_filepath=site_filepath ) return gas_data
def _check_site(filepath: Path, site_code: str, gc_params: Dict) -> str: """Check if the site passed in matches that in the filename Args: filepath: Path to data file site: Site code gc_params: Dictionary of GCWERKS parameters Returns: str: Site code """ from re import findall site_data = gc_params["sites"] name_code_conversion = {value["gcwerks_site_name"]: site_code for site_code, value in site_data.items()} site_code = site_code.lower() site_name = findall(r"[\w']+", str(filepath.name))[0].lower() if len(site_code) > 3: raise ValueError("Please pass in a 3 letter site code as the site argument.") try: confirmed_code = name_code_conversion[site_name].lower() except KeyError: raise ValueError(f"Cannot match {site_name} to a site code.") if site_code != confirmed_code: raise ValueError( f"Mismatch between code reasd from filename: {confirmed_code} and that given: {site_code}" ) return site_code def _check_instrument(filepath: Path, gc_params: Dict, should_raise: bool = False) -> Union[str, None]: """Ensure we have the correct instrument or translate an instrument suffix to an instrument name. Args: instrument_suffix: Instrument suffix such as md should_raise: Should we raise if we can't find a valid instrument gc_params: GCWERKS parameters Returns: str: Instrument name """ from re import findall instrument: str = findall(r"[\w']+", str(filepath.name))[1].lower() try: if instrument in gc_params["instruments"]: return instrument else: try: instrument = gc_params["suffix_to_instrument"][instrument] except KeyError: if "medusa" in instrument: instrument = "medusa" else: raise KeyError(f"Invalid instrument {instrument}") except KeyError: if should_raise: raise else: return None return instrument def _read_data( filepath: Path, precision_filepath: Path, site: str, instrument: str, network: str, gc_params: Dict, sampling_period: Optional[str] = None, ) -> Dict: """Read data from the data and precision files Args: filepath: Path of data file precision_filepath: Path of precision file site: Name of site instrument: Instrument name network: Network name gc_params: GCWERKS parameters sampling_period: Period over which the measurement was samplied. Returns: dict: Dictionary of gas data keyed by species """ from pandas import Series from pandas import Timedelta as pd_Timedelta from pandas import read_csv # Read header header = read_csv(filepath, skiprows=2, nrows=2, header=None, sep=r"\s+") # Read the data in and automatically create a datetime column from the 5 columns # Dropping the yyyy', 'mm', 'dd', 'hh', 'mi' columns here data = read_csv( filepath, skiprows=4, sep=r"\s+", parse_dates={"Datetime": [1, 2, 3, 4, 5]}, date_format="%Y %m %d %H %M", index_col="Datetime", ) if data.empty: raise ValueError("Cannot process empty file.") # This metadata will be added to when species are split and attributes are written metadata: Dict[str, str] = { "instrument": instrument, "site": site, "network": network, } extracted_sampling_period = _get_sampling_period(instrument=instrument, gc_params=gc_params) metadata["sampling_period"] = extracted_sampling_period if sampling_period is not None: # Compare input to definition within json file file_sampling_period_td = pd_Timedelta(seconds=float(extracted_sampling_period)) sampling_period_td = pd_Timedelta(seconds=float(sampling_period)) comparison_seconds = abs(sampling_period_td - file_sampling_period_td).total_seconds() tolerance_seconds = 1 if comparison_seconds > tolerance_seconds: raise ValueError( f"Input sampling period {sampling_period} does not match to value " f"extracted from the file name of {metadata['sampling_period']} seconds." ) units = {} scale = {} flag_columns: List[Series] = [] species = [] columns_renamed = {} for column in data.columns: if "Flag" in column: # Location of this column in a range (0, n_columns-1) col_loc = data.columns.get_loc(column) # Get name of column before this one for the gas name gas_name = data.columns[col_loc - 1] # Add it to the dictionary for renaming later columns_renamed[column] = gas_name + "_flag" # Create 2 new series based on the flag columns status_flag = (data[column].str[0] != "-").astype(int).rename(f"{gas_name} status_flag") integration_flag = (data[column].str[1] != "-").astype(int).rename(f"{gas_name} integration_flag") flag_columns.extend((status_flag, integration_flag)) col_shift = 4 units[gas_name] = header.iloc[1, col_loc + col_shift] scale[gas_name] = header.iloc[0, col_loc + col_shift] if units[gas_name] == "--": units[gas_name] = "NA" if scale[gas_name] == "--": scale[gas_name] = "NA" species.append(gas_name) data = data.join(flag_columns) # Rename columns to include the gas this flag represents data = data.rename(columns=columns_renamed, inplace=False) precision, precision_species = _read_precision(filepath=precision_filepath) # Check if the index is sorted if not precision.index.is_monotonic_increasing: precision = precision.sort_index() for sp in species: try: precision_index = precision_species.index(sp) * 2 + 1 except ValueError: raise ValueError(f"Cannot find {sp} in precisions file.") data[sp + " repeatability"] = ( precision[precision_index].astype(float).reindex_like(data, method="pad") ) # Apply timestamp correction, because GCwerks currently outputs the centre of the sampling period data["new_time"] = data.index - pd_Timedelta(seconds=int(metadata["sampling_period"]) / 2.0) data = data.set_index("new_time", inplace=False, drop=True) data.index.name = "time" gas_data = _split_species( data=data, site=site, species=species, instrument=instrument, metadata=metadata, units=units, scale=scale, gc_params=gc_params, ) return gas_data def _read_precision(filepath: Path) -> Tuple[DataFrame, List]: """Read GC precision file Args: filepath: Path of precision file Returns: tuple (Pandas.DataFrame, list): Precision DataFrame and list of species in precision data """ from pandas import read_csv # Read precision species precision_header = read_csv(filepath, skiprows=3, nrows=1, header=None, sep=r"\s+") precision_species = precision_header.values[0][1:].tolist() precision = read_csv( filepath, skiprows=5, header=None, sep=r"\s+", index_col=0, parse_dates={"Datetime": [0]}, date_format="%y%m%d", ) # Drop any duplicates from the index precision = precision.loc[~precision.index.duplicated(keep="first")] return precision, precision_species def _split_species( data: DataFrame, site: str, instrument: str, species: List, metadata: Dict, units: Dict, scale: Dict, gc_params: Dict, ) -> Dict: """Splits the species into separate dataframe into sections to be stored within individual Datasources Args: data: DataFrame of raw data site: Name of site from which this data originates instrument: Name of instrument species: List of species contained in data metadata: Dictionary of metadata units: Dictionary of units for each species scale: Dictionary of scales for each species gc_params: GCWERKS parameter dictionary Returns: dict: Dataframe of gas data and metadata """ from fnmatch import fnmatch from addict import Dict as aDict from openghg.util import format_inlet from openghg.standardise.meta import define_species_label # Read inlets from the parameters expected_inlets = _get_inlets(site_code=site, gc_params=gc_params) try: data_inlets = data["Inlet"].unique().tolist() except KeyError: raise KeyError( "Unable to read inlets from data, please ensure this data is of the GC type expected by this retrieve module" ) combined_data = aDict() for spec in species: # Skip this species if the data is all NaNs if data[spec].isnull().all(): continue # Here inlet is the inlet in the data and inlet_label is the label we want to use as metadata for inlet, inlet_label in expected_inlets.items(): inlet_label = format_inlet(inlet_label) # Create a copy of metadata for local modification spec_metadata = metadata.copy() spec_metadata["units"] = units[spec] spec_metadata["calibration_scale"] = scale[spec] # If we've only got a single inlet if inlet == "any" or inlet == "air": spec_data = data[ [ spec, spec + " repeatability", spec + " status_flag", spec + " integration_flag", "Inlet", ] ] spec_data = spec_data.dropna(axis="index", how="any") spec_metadata["inlet"] = inlet_label elif "date" in inlet: dates = inlet.split("_")[1:] data_sliced = data.loc[dates[0] : dates[1]] spec_data = data_sliced[ [ spec, spec + " repeatability", spec + " status_flag", spec + " integration_flag", "Inlet", ] ] spec_data = spec_data.dropna(axis="index", how="any") spec_metadata["inlet"] = inlet_label else: # Find the inlet matching_inlets = [i for i in data_inlets if fnmatch(i, inlet)] if not matching_inlets: continue # Only set the label in metadata when we have the correct label spec_metadata["inlet"] = inlet_label # There should only be one matching label select_inlet = matching_inlets[0] # Take only data for this inlet from the dataframe inlet_data = data.loc[data["Inlet"] == select_inlet] spec_data = inlet_data[ [ spec, spec + " repeatability", spec + " status_flag", spec + " integration_flag", "Inlet", ] ] spec_data = spec_data.dropna(axis="index", how="any") # Now we drop the inlet column spec_data = spec_data.drop("Inlet", axis="columns") # Check that the Dataframe has something in it if spec_data.empty: continue attributes = _get_site_attributes( site=site, inlet=inlet_label, instrument=instrument, gc_params=gc_params ) attributes = attributes.copy() # We want an xarray Dataset spec_data = spec_data.to_xarray() # Create a standardised / cleaned species label comp_species = define_species_label(spec)[0] # Add the cleaned species name to the metadata and alternative name if present spec_metadata["species"] = comp_species spec_metadata["data_type"] = "surface" if comp_species != spec.lower() and comp_species != spec.upper(): spec_metadata["species_alt"] = spec # Rename variables so they have lowercase and alphanumeric names to_rename = {} for var in spec_data.variables: if spec in var: new_name = var.replace(spec, comp_species) to_rename[var] = new_name spec_data = spec_data.rename(to_rename) # As a single species may have measurements from multiple inlets we # use the species and inlet as a key data_key = f"{comp_species}_{inlet_label}" combined_data[data_key]["metadata"] = spec_metadata combined_data[data_key]["data"] = spec_data combined_data[data_key]["attributes"] = attributes to_return: Dict = combined_data.to_dict() return to_return def _get_sampling_period(instrument: str, gc_params: Dict) -> str: """Process the suffix from the filename to get the correct instrument name then retrieve the sampling period of that instrument from metadata. Args: instrument: Instrument name Returns: str: Precision of instrument in seconds """ instrument = instrument.lower() try: sampling_period = str(gc_params["sampling_period"][instrument]) except KeyError: raise ValueError( f"Invalid instrument: {instrument}\nPlease select one of {gc_params['sampling_period'].keys()}\n" ) return sampling_period def _get_inlets(site_code: str, gc_params: Dict) -> Dict: """Get the inlets we expect to be at this site and create a mapping dictionary so we get consistent labelling. Args: site: Site code gc_params: GCWERKS parameters Returns: dict: Mapping dictionary of inlet and required inlet label """ site = site_code.upper() site_params = gc_params["sites"] # Create a mapping of inlet to match to the inlet label inlets = site_params[site]["inlets"] try: inlet_labels = site_params[site]["inlet_label"] except KeyError: inlet_labels = inlets mapping_dict = {k: v for k, v in zip(inlets, inlet_labels)} return mapping_dict def _get_site_attributes(site: str, inlet: str, instrument: str, gc_params: Dict) -> Dict[str, str]: """Gets the site specific attributes for writing to Datsets Args: site: Site code inlet: Inlet height in metres instrument: Instrument name gc_params: GCWERKS parameters Returns: dict: Dictionary of attributes """ from openghg.util import format_inlet site = site.upper() instrument = instrument.lower() attributes: Dict[str, str] = gc_params["sites"][site]["global_attributes"] attributes["inlet_height_magl"] = format_inlet(inlet, key_name="inlet_height_magl") try: attributes["comment"] = gc_params["comment"][instrument] except KeyError: valid_instruments = list(gc_params["comment"].keys()) raise KeyError(f"Invalid instrument {instrument} passed, valid instruments : {valid_instruments}") return attributes