import logging
from pathlib import Path
from typing import Any, cast
from collections.abc import Hashable
import xarray as xr
from openghg.standardise.meta import dataset_formatter
from openghg.types import pathType
from openghg.util import check_and_set_null_variable, not_set_metadata_values
logger = logging.getLogger("openghg.standardise.surface")
logger.setLevel(logging.DEBUG)  # Have to set level for logger as well as handler
[docs]
def parse_noaa(
    filepath: pathType,
    site: str,
    measurement_type: str | None,
    inlet: str | None = None,
    network: str = "NOAA",
    instrument: str | None = None,
    sampling_period: str | None = None,
    update_mismatch: str = "never",
    site_filepath: pathType | None = None,
    **kwarg: dict,
) -> dict:
    """Read NOAA data from raw text file or ObsPack NetCDF
    Args:
        filepath: Data filepath
        site: Three letter site code
        inlet: Inlet height (as value unit e.g. "10m")
        measurement_type: One of ("flask", "insitu", "pfp")
        network: Network, defaults to NOAA
        instrument: Instrument name
        sampling_period: Sampling period
        update_mismatch: This determines how mismatches between the internal data
            attributes and the supplied / derived metadata are handled.
            This includes the options:
                - "never" - don't update mismatches and raise an AttrMismatchError
                - "attributes" - update mismatches based on input attributes
                - "metadata" - update mismatches based on input metadata
        site_filepath: Alternative site info file (see openghg/openghg_defs repository for format).
            Otherwise will use the data stored within openghg_defs/data/site_info JSON file by default.
    Returns:
        dict: Dictionary of data and metadata
    """
    if sampling_period is None:
        sampling_period = check_and_set_null_variable(sampling_period)
    sampling_period = str(sampling_period)
    valid_types = ("flask", "insitu", "pfp")
    if measurement_type is None:
        raise ValueError(
            f"measurement_type must be specified for source_format='noaa'. This must be one of {valid_types}"
        )
    elif measurement_type not in valid_types:
        raise ValueError(f"measurement_type is '{measurement_type}' but must be one of {valid_types}")
    file_extension = Path(filepath).suffix
    if file_extension == ".nc":
        return _read_obspack(
            filepath=filepath,
            site=site,
            inlet=inlet,
            measurement_type=measurement_type,
            instrument=instrument,
            sampling_period=sampling_period,
            update_mismatch=update_mismatch,
            site_filepath=site_filepath,
        )
    else:
        return _read_raw_file(
            filepath=filepath,
            site=site,
            inlet=inlet,
            measurement_type=measurement_type,
            instrument=instrument,
            sampling_period=sampling_period,
            update_mismatch=update_mismatch,
            site_filepath=site_filepath,
        ) 
def _standarise_variables(obspack_ds: xr.Dataset, species: str) -> xr.Dataset:
    """
    Converts data from NOAA ObsPack dataset into our standardised variables to be stored within the object store.
    The species is also needed so this name can be used to label the variables in the new dataset.
    Expects inputs with: "value", "value_std_dev" or "value_unc", "nvalue" as per NOAA ObsPack standard.
    Args:
        obspack_ds : Dataset derived from a netcdf file within the NOAA obspack
        species : Standardised species name (e.g. "ch4")
    Returns:
        Dataset : Standardised dataset with variables extracted and renamed
    Example output:
        For species = "ch4":
            xarray.Dataset("ch4":[...]
                           "ch4_variability":[...]
                           "ch4_number_of_observations": [...])
    """
    processed_ds = obspack_ds.copy()
    # Rename variables to match our internal standard
    # "value_std_dev" --> f"{species}_variability"
    # "value_unc" --> ??
    # TODO: Clarify what "value_unc" should be renamed to
    variable_names = {
        "value": species,
        "value_std_dev": f"{species}_variability",
        "value_unc": f"{species}_variability",  # May need to be updated
        "nvalue": f"{species}_number_of_observations",
    }
    to_extract = [name for name in variable_names.keys() if name in obspack_ds]
    # For the error variables we only want to take one set of values from the
    # obspack dataset but multiple variables may be available.
    # If multiple are present, combine these together and only extract one
    error_names = ["value_std_dev", "value_unc"]
    error_variables = [name for name in error_names if name in to_extract]
    if len(error_variables) > 1:
        main_ev = error_variables[0]  # Treat first item in the list at the one to keep
        history_attr = "history"
        processed_ds[main_ev].attrs[history_attr] = f"Merged {main_ev} variable from original file with "
        for ev in error_variables[1:]:
            # Combine details from additional additional error variable with main variable
            variable = processed_ds[main_ev]
            new_variable = processed_ds[ev]
            # Update Dataset and add details within attributes
            updated_variable = variable.combine_first(new_variable)
            processed_ds[main_ev] = updated_variable
            processed_ds[main_ev].attrs[history_attr] += f"{ev}, "
            # Remove this extra variables from the list of variables to extract from the dataset
            to_extract.remove(ev)
    # Create dictionary of names to convert obspack ds to our format
    name_dict = {name: key for name, key in variable_names.items() if name in to_extract}
    if not to_extract:
        wanted = variable_names.keys()
        raise ValueError(
            f"No valid data variables columns found in obspack dataset. We expect the following data variables in the passed NetCDF: {wanted}"
        )
    # Grab only the variables we want to keep and rename these
    processed_ds = processed_ds[to_extract]
    processed_ds = processed_ds.rename(name_dict)
    processed_ds = processed_ds.sortby("time")
    return processed_ds
def _split_inlets(obspack_ds: xr.Dataset, attributes: dict, metadata: dict, inlet: str | None = None) -> dict:
    """
    Splits the overall dataset by different inlet values, if present. The expected dataset input should be from the NOAA ObsPack.
    Args:
        obspack_ds : Dataset derived from a netcdf file within the NOAA obspack
        attributes: Attributes extracted from the NOAA obspack. Should contain at least "species" and "measurement_type"
        metadata: Metadata to store alongside standardised data
    Returns:
        Dict: gas data containing "data", "metadata", "attributes" for each inlet
    Example output:
        {"ch4": {"data": xr.Dataset(...), "attributes": {...}, "metadata": {...}}}
        or
        {"ch4_40m": {"data": xr.Dataset(...), "attributes": {...}, "metadata": {...}}, "ch4_60m": {...}, ...}
    """
    from openghg.util import format_inlet
    orig_attrs = obspack_ds.attrs
    species = attributes["species"]
    measurement_type = attributes["measurement_type"]
    height_var = "intake_height"
    # Check whether the input data contains different inlet height values for each data point ("intake_height" data variable)
    # If so we need to select the data for each inlet and indicate this is a separate Datasource
    # Each data key is labelled based on the species and the inlet (if needed)
    gas_data: dict[str, dict] = {}
    if height_var in obspack_ds.data_vars:
        if inlet is not None:
            # TODO: Add to logging?
            logger.warning(
                f"Ignoring inlet value of {inlet} since file has each data point has an associated height (contains 'intake_height' variable)"
            )
        # Group dataset by the height values
        # Note: could use ds.groupby_bins(...) if necessary if there are lots of small height differences to group these
        obspack_ds_grouped = obspack_ds.groupby(height_var)
        num_groups = len(obspack_ds_grouped.groups)
        # For each group standardise and store with id based on species and inlet height
        for ht, obspack_ds_ht in obspack_ds_grouped:
            # Creating id keys of the form "<species>_<inlet>" e.g. "ch4_40m" or "co_12.5m"
            inlet_str = format_inlet(str(ht), key_name="inlet")
            inlet_magl_str = format_inlet(str(ht), key_name="inlet_height_magl")
            if num_groups > 1:
                id_key = f"{species}_{inlet_str}"
            else:
                id_key = f"{species}"
            # Extract wanted variables and convert to standardised names
            standarised_ds = _standarise_variables(obspack_ds_ht, species)
            gas_data[id_key] = {}
            gas_data[id_key]["data"] = standarised_ds
            # Add inlet details to attributes and metadata
            attrs_copy = attributes.copy()
            meta_copy = metadata.copy()
            attrs_copy["inlet"] = inlet_str
            attrs_copy["inlet_height_magl"] = inlet_magl_str
            meta_copy["inlet"] = inlet_str
            meta_copy["inlet_height_magl"] = inlet_magl_str
            gas_data[id_key]["metadata"] = meta_copy
            gas_data[id_key]["attributes"] = attrs_copy
    else:
        try:
            inlet_value = orig_attrs["dataset_intake_ht"]
        except KeyError:
            inlet_from_file = None
        else:
            inlet_from_file = format_inlet(str(inlet_value))
        if measurement_type == "flask":
            inlet_from_file = "flask"
        # Check inlet from file against any provided inlet
        if inlet is None and inlet_from_file:
            inlet = inlet_from_file
        elif inlet is not None and inlet_from_file:
            if inlet != inlet_from_file:
                logger.warning(
                    f"Provided inlet {inlet} does not match inlet derived from the input file: {inlet_from_file}"
                )
        else:
            raise ValueError(
                "Unable to derive inlet from NOAA file. Please pass as an input. If flask data pass 'flask' as inlet."
            )
        id_key = f"{species}"
        if inlet != "flask":
            inlet_magl_str = format_inlet(inlet, key_name="inlet_height_magl")
        else:
            inlet_magl_str = "NA"
        metadata["inlet"] = inlet
        metadata["inlet_height_magl"] = inlet_magl_str
        attributes["inlet"] = inlet
        attributes["inlet_height_magl"] = inlet_magl_str
        standardised_ds = _standarise_variables(obspack_ds, species)
        gas_data[id_key] = {"data": standardised_ds, "metadata": metadata, "attributes": attributes}
    return gas_data
def _read_obspack(
    filepath: pathType,
    site: str,
    sampling_period: str,
    measurement_type: str,
    inlet: str | None = None,
    instrument: str | None = None,
    update_mismatch: str = "never",
    site_filepath: pathType | None = None,
) -> dict[str, dict]:
    """Read NOAA ObsPack NetCDF files
    Args:
        filepath: Path to file
        site: Three letter site code
        sampling_period: Sampling period
        measurement_type: One of flask, insitu or pfp
        inlet: Inlet height, if no height use measurement type e.g. flask
        instrument: Instrument name
        update_mismatch: This determines how mismatches between the internal data
            "attributes" and the supplied / derived "metadata" are handled.
            This includes the options:
              - "never" - don't update mismatches and raise an AttrMismatchError
              - "from_source" / "attributes" - update mismatches based on input data (e.g. data attributes)
              - "from_definition" / "metadata" - update mismatches based on associated data (e.g. site_info.json)
        site_filepath: Alternative site info file (see openghg/openghg_defs repository for format).
            Otherwise will use the data stored within openghg_defs/data/site_info JSON file by default.
    Returns:
        dict: Dictionary of results
    """
    from openghg.standardise.meta import assign_attributes
    from openghg.util import clean_string
    with xr.open_dataset(filepath) as temp:
        obspack_ds = temp
        orig_attrs = temp.attrs
    # Want to find and drop any duplicate time values for the original dataset
    # Using xarray directly we have to do in a slightly convoluted way as this is not well built
    # into the xarray workflow yet - https://github.com/pydata/xarray/pull/5239
    # - can use da.drop_duplicates() but only on one variable at a time and not on the whole Dataset
    # This method keeps attributes for each of the variables including units
    # The dimension within the original dataset is called "obs" and has no associated coordinates
    # Extract time from original Dataset (dimension is "obs")
    time = obspack_ds.time
    # To keep associated "obs" dimension, need to assign coordinate values to this (just 0, len(obs))
    time = time.assign_coords(obs=obspack_ds.obs)
    # Make "time" the primary dimension (while retaining "obs") and add "time" values as coordinates
    time = time.swap_dims(dims_dict={"obs": "time"})
    time = time.assign_coords(time=time)
    # Drop any duplicate time values and extract the associated "obs" values
    # TODO: Work out what to do with duplicates - may be genuine multiple measurements
    time_unique = time.drop_duplicates(dim="time", keep="first")
    obs_unique = time_unique.obs
    # Use these obs values to filter the original dataset to remove any repeated times
    processed_ds = obspack_ds.sel(obs=obs_unique)
    processed_ds = processed_ds.set_coords(["time"])
    # Estimate sampling period using metadata and midpoint time
    not_set_values = not_set_metadata_values()
    if sampling_period in not_set_values:
        sampling_period_estimate = _estimate_sampling_period(obspack_ds)
    else:
        sampling_period_estimate = -1.0
    species = clean_string(obspack_ds.attrs["dataset_parameter"])
    network = "NOAA"
    try:
        # Extract units attribute from value data variable
        units = processed_ds["value"].units
    except (KeyError, AttributeError):
        logger.warning("Unable to extract units from 'value' within input dataset")
        units = "NA"
    metadata = {}
    metadata["site"] = site
    metadata["network"] = network
    metadata["measurement_type"] = measurement_type
    metadata["species"] = species
    metadata["units"] = units
    metadata["sampling_period"] = sampling_period
    metadata["dataset_source"] = "noaa_obspack"
    metadata["data_type"] = "surface"
    # Add additional sampling_period_estimate if sampling_period is not set
    if sampling_period_estimate >= 0.0:
        metadata["sampling_period_estimate"] = str(
            sampling_period_estimate
        )  # convert to string to keep consistent with "sampling_period"
    # Define not_set value to use as a default
    not_set_value = not_set_values[0]
    # Add instrument if present
    if instrument is not None:
        metadata["instrument"] = instrument
    else:
        metadata["instrument"] = orig_attrs.get("instrument", not_set_value)
    # Add data owner details, station position and calibration scale, if present
    metadata["data_owner"] = orig_attrs.get("provider_1_name", not_set_value)
    metadata["data_owner_email"] = orig_attrs.get("provider_1_email", not_set_value)
    metadata["station_longitude"] = orig_attrs.get("site_longitude", not_set_value)
    metadata["station_latitude"] = orig_attrs.get("site_latitude", not_set_value)
    metadata["calibration_scale"] = orig_attrs.get("dataset_calibration_scale", not_set_value)
    # Create attributes with copy of metadata values
    attributes = cast(dict[Hashable, Any], metadata.copy())  # Cast to match xarray attributes type
    # TODO: At the moment all attributes from the NOAA ObsPack are being copied
    # plus any variables we're adding - decide if we want to reduce this
    attributes.update(orig_attrs)
    # expected_keys = {
    #     "site",
    #     "species",
    #     "inlet",
    #     "instrument",
    #     "sampling_period",
    #     "calibration_scale",
    #     "station_longitude",
    #     "station_latitude",
    # }
    gas_data = _split_inlets(processed_ds, attributes, metadata, inlet=inlet)
    gas_data = dataset_formatter(data=gas_data)
    gas_data = assign_attributes(
        data=gas_data,
        site=site,
        network=network,
        update_mismatch=update_mismatch,
        site_filepath=site_filepath,
    )
    return gas_data
def _read_raw_file(
    filepath: pathType,
    site: str,
    sampling_period: str,
    measurement_type: str,
    inlet: str | None = None,
    instrument: str | None = None,
    update_mismatch: str = "never",
    site_filepath: pathType | None = None,
) -> dict:
    """Reads NOAA data files and returns a dictionary of processed
    data and metadata.
    Args:
        filepath: Path of file to load
        site: Site name
        sampling_period: Sampling period
        measurement_type: One of flask, insitu or pfp
        inlet: Inlet height, if no height use measurement type e.g. flask
        instrument: Instrument name
        update_mismatch: This determines how mismatches between the internal data
            "attributes" and the supplied / derived "metadata" are handled.
            This includes the options:
              - "never" - don't update mismatches and raise an AttrMismatchError
              - "from_source" / "attributes" - update mismatches based on input data (e.g. data attributes)
              - "from_definition" / "metadata" - update mismatches based on associated data (e.g. site_info.json)
        site_filepath: Alternative site info file (see openghg/openghg_defs repository for format).
            Otherwise will use the data stored within openghg_defs/data/site_info JSON file by default.
    Returns:
        list: UUIDs of Datasources data has been assigned to
    """
    from openghg.standardise.meta import assign_attributes
    # TODO: Added this for now to make sure inlet is specified but may be able to remove
    # if this can be derived from the data format.
    if inlet is None:
        raise ValueError("Inlet must be specified to derive data from NOAA raw (txt) files.")
    filepath = Path(filepath)
    filename = filepath.name
    species = filename.split("_")[0].lower()
    source_name = filepath.stem
    source_name = source_name.split("-")[0]
    gas_data = _read_raw_data(
        filepath=filepath,
        inlet=inlet,
        species=species,
        measurement_type=measurement_type,
        sampling_period=sampling_period,
    )
    gas_data = dataset_formatter(data=gas_data)
    gas_data = assign_attributes(
        data=gas_data, site=site, network="NOAA", update_mismatch=update_mismatch, site_filepath=site_filepath
    )
    return gas_data
def _read_raw_data(
    filepath: Path,
    species: str,
    inlet: str,
    sampling_period: str,
    measurement_type: str = "flask",
) -> dict:
    """Separates the gases stored in the dataframe in
    separate dataframes and returns a dictionary of gases
    with an assigned UUID as gas:UUID and a list of the processed
    dataframes
    Args:
        filepath: Path of datafile
        species: Species string such as CH4, CO
        measurement_type: Type of measurements e.g. flask
    Returns:
        dict: Dictionary containing attributes, data and metadata keys
    """
    from openghg.util import clean_string, get_site_info, load_internal_json, read_header
    from pandas import read_csv
    header = read_header(filepath=filepath)
    column_names = header[-1][14:].split()
    # Number of header lines to skip
    n_skip = len(header)
    date_cols = [
        "sample_year",
        "sample_month",
        "sample_day",
        "sample_hour",
        "sample_minute",
        "sample_seconds",
    ]
    data = read_csv(
        filepath,
        skiprows=n_skip,
        names=column_names,
        sep=r"\s+",
        skipinitialspace=True,
        parse_dates={"time": date_cols},
        date_format="%Y %m %d %H %M %S",
        index_col="time",
    )
    # Drop duplicates
    data = data.loc[~data.index.duplicated(keep="first")]
    # Check if the index is sorted
    if not data.index.is_monotonic_increasing:
        data = data.sort_index()
    # Read the site code from the Dataframe
    site = str(data["sample_site_code"][0]).upper()
    site_data = get_site_info()
    # If this isn't a site we recognize try and read it from the filename
    if site not in site_data:
        site = str(filepath.name).split("_")[1].upper()
        if site not in site_data:
            raise ValueError(f"The site {site} is not recognized.")
    if species is not None:
        # If we're passed a species ensure that it is in fact the correct species
        data_species = str(data["parameter_formula"].values[0]).lower()
        passed_species = species.lower()
        if data_species != passed_species:
            raise ValueError(
                f"Mismatch between passed species ({passed_species}) and species read from data ({data_species})"
            )
    species = species.upper()
    # add 0/1 variable for second part of analysis flag
    data[species + "_selection_flag"] = (data["analysis_flag"].str[1] != ".").apply(int)
    # filter data by first part of analysis flag
    flag = data["analysis_flag"].str[0] == "."
    data = data[flag]
    data = data[
        [
            "sample_latitude",
            "sample_longitude",
            "sample_altitude",
            "analysis_value",
            "analysis_uncertainty",
            species + "_selection_flag",
        ]
    ]
    rename_dict = {
        "analysis_value": species,
        "analysis_uncertainty": species + "_repeatability",
        "sample_longitude": "longitude",
        "sample_latitude": "latitude",
        "sample_altitude": "altitude",
    }
    data = data.rename(columns=rename_dict, inplace=False)
    data = data.to_xarray()
    # TODO  - this could do with a better name
    noaa_params = load_internal_json(filename="attributes.json")["NOAA"]
    site_attributes = noaa_params["global_attributes"]
    site_attributes["inlet_height_magl"] = "NA"
    site_attributes["instrument"] = noaa_params["instrument"][species.upper()]
    site_attributes["sampling_period"] = sampling_period
    metadata = {}
    metadata["species"] = clean_string(species)
    metadata["site"] = site
    metadata["measurement_type"] = measurement_type
    metadata["network"] = "NOAA"
    metadata["inlet"] = inlet
    metadata["sampling_period"] = sampling_period
    metadata["instrument"] = noaa_params["instrument"][species.upper()]
    metadata["data_type"] = "surface"
    metadata["source_format"] = "noaa"
    combined_data = {
        species.lower(): {
            "metadata": metadata,
            "data": data,
            "attributes": site_attributes,
        }
    }
    return combined_data
def _estimate_sampling_period(obspack_ds: xr.Dataset, min_estimate: float = 10.0) -> float:
    """
    Estimate the sampling period for the NOAA data using either the "data_selection_tag"
    attribute (this sometimes contains useful information such as "HourlyData") or by using
    the midpoint_time within the data itself.
    Note: midpoint_time often seems to match start_time implying instantaneous measurement
    or that this value has not been correctly included.
    If the estimate is less than `min_estimate` the estimate sampling period will be set to
    this value.
    Args:
        obspack_ds : Dataset of raw obs pack file opened as an xarray Dataset
        min_estimate : Minimum sampling period estimate to use in seconds.
    Returns:
        int: Seconds for the estimated sampling period.
    """
    # Check useful attributes
    data_selection = obspack_ds.attrs["dataset_selection_tag"]
    hourly_s = 60 * 60
    daily_s = hourly_s * 24
    weekly_s = daily_s * 7
    monthly_s = daily_s * 28  # approx
    yearly_s = daily_s * 365  # approx
    sampling_period_estimate = 0.0  # seconds
    frequency_keywords = {
        "hourly": hourly_s,
        "daily": daily_s,
        "weekly": weekly_s,
        "monthly": monthly_s,
        "yearly": yearly_s,
    }
    for freq, time_s in frequency_keywords.items():
        if freq in data_selection.lower():
            sampling_period_estimate = time_s
    if not sampling_period_estimate:
        if "start_time" in obspack_ds and "midpoint_time" in obspack_ds:
            half_sample_time = (obspack_ds["midpoint_time"] - obspack_ds["start_time"]).values
            half_sample_time_s = half_sample_time.astype("timedelta64[s]").mean().astype(float)
            sampling_period_estimate = round(half_sample_time_s * 2, 1)
    if sampling_period_estimate < min_estimate:
        sampling_period_estimate = min_estimate
    return sampling_period_estimate