Source code for openghg.standardise.column._tccon

from pathlib import Path
from typing import cast
from collections.abc import MutableMapping
from datetime import datetime
import numpy as np
import xarray as xr

from openghg.util import load_internal_json
from openghg.types import pathType

import logging

logger = logging.getLogger("openghg.standardise.column._tccon")


def _filter_and_resample(ds: xr.Dataset, species: str, quality_filt: bool, resample: bool) -> xr.Dataset:
    """
    Filter (if quality_filt = True) the data keeping those for which "extrapolation_flags_ak_x{species}" is equal to 2.
    Then resample the data on an hourly scale.
    Args:
        ds: dataset with column concentrations
        species: species name e.g. "ch4"
        quality_filt: if True, filters the data keeping those for which "extrapolation_flags_ak_x{species}" is equal to 2.
        resample: if True resamples the data at hourly scale.
    Returns:
        dataset resampled and filtered (if asked)
    """
    if quality_filt:
        logger.info(f"Applying filter based on variable 'extrapolation_flags_ak_x{species}'.")
        ds = ds.where(abs(ds[f"extrapolation_flags_ak_x{species}"]) != 2)
    ds.dropna("time").sortby("time")

    if not resample:
        ds[f"x{species}_uncertainty"] = ds[f"x{species}_error"]
        return ds

    output = ds.resample(time="h").mean(dim="time")
    output[f"x{species}_uncertainty"] = ds[f"x{species}_error"].resample(time="h").max(dim="time")
    logger.warning(
        "Not sure that we should resample at this stage (and also resample the uncertainty like that)."
    )
    del output[f"extrapolation_flags_ak_x{species}"]
    output = output.dropna("time")

    return output


def _define_var_attrs(ds: xr.Dataset, species: str, method: str) -> xr.Dataset:
    """
    Add attributes to "x{species}_uncertainty" and "pressure_weights" variables, add global attribute "derivation_method" that keeps track of the method used to derive the pressure weights.
    Estimate the min and max values of each variables and store them in the attributes.
    Args:
        ds: dataset with column concentrations
        species: species name e.g. "ch4"
        method: method used to calculate the integration_operator.
            Options possible : "pressure_weight" and "integration_operator". method use to calculate the integration_operator.
            Options possible : "pressure_weight" and "integration_operator".
            See https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Calculating_comparison_quantities for description.
    Returns:
        dataset with updated attributes.
    """
    ds[f"x{species}_uncertainty"].attrs = {
        "long_name": f"uncertainty on the x{species} measurement",
        "description": f"max of x{species}_error on resampling period",
        "unit": ds[f"x{species}"].units,
    }

    if method == "pressure_weights":
        ds["pressure_weights"].attrs["long_name"] = "pressure_weights derived using pressure weight method"
        ds["pressure_weights"].attrs[
            "description"
        ] = "see doc https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Using_pressure_weights"
        ds.attrs["derivation_method"] = "pressure weight"

    elif method == "integration_operator":
        ds["pressure_weights"].attrs["long_name"] = "pressure_weights derived using integration_operator"
        ds["pressure_weights"].attrs[
            "description"
        ] = "see doc https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Using_the_integration_operator"
        ds.attrs["derivation_method"] = "integration operator"

    for var in ds.data_vars:
        ds[var].attrs["vmin"] = f"{ds[var].values.min():.1f}"
        ds[var].attrs["vmax"] = f"{ds[var].values.max():.1f}"

    return ds


def _convert_prior_profile_to_dry(ds: xr.Dataset, species: list | str) -> None:
    """
    Calculate (inplace) the dry "prior_h2o", "integration_operator" and "prior_{sp}" and ovewrite the ones that are wet.
    Args:
        ds: dataset with column concentrations
        species: species name(s) e.g. "ch4"
    """
    logger.warning(
        f"According to the variables attributes, 'x{species}' is dry but the profiles 'prior_h2o' and 'prior_{species}' are wet, so we dry the profiles. Should check that with the TCCON team before starting to really use the data."
    )

    if isinstance(species, str):
        species = [
            species,
        ]

    if ds["prior_h2o"].attrs["standard_name"] == "wet_atmosphere_mole_fraction_of_water":
        h2o_attrs = ds["prior_h2o"].attrs
        ds["prior_h2o"] = ds["prior_h2o"] / (1 - ds["prior_h2o"])
        ds["prior_h2o"].attrs = {k: v.replace("wet", "dry") for k, v in h2o_attrs.items() if k != "note"}
    elif ds["prior_h2o"].attrs["standard_name"] != "dry_atmosphere_mole_fraction_of_water":
        raise ValueError("'standard_name' of 'prior_h2o' is not what expected. Please check.")

    if "wet" in ds["integration_operator"].attrs["description"]:
        io_attrs = ds["integration_operator"].attrs
        ds["integration_operator"] = ds["integration_operator"] / (1 + ds["prior_h2o"])
        ds["integration_operator"].attrs = {k: v.replace("wet", "dry") for k, v in io_attrs.items()}
    elif "dry" in ds["integration_operator"].attrs["description"]:
        logger.info("'integration_operator' already dried, skipping conversion from wet to dry.")
    else:
        raise ValueError("'description' of 'integration_operator' is not what expected. Please check.")

    for sp in species:
        if ds[f"prior_{sp}"].attrs["standard_name"][:32] == "wet_atmosphere_mole_fraction_of_":
            sp_attrs = ds[f"prior_{sp}"].attrs
            ds[f"prior_{sp}"] = ds[f"prior_{sp}"] * (1 + ds["prior_h2o"])
            ds[f"prior_{sp}"].attrs = {k: v.replace("wet", "dry") for k, v in sp_attrs.items() if k != "note"}
        elif ds[f"prior_{sp}"].attrs["standard_name"][:32] == "dry_atmosphere_mole_fraction_of_":
            logger.info(f"Prior profile of {sp} already dried, skipping conversion from wet to dry.")
        else:
            raise ValueError(f"'standard_name' of 'prior_{sp}' is not what expected. Please check.")


def _reformat_convert_units(ds: xr.Dataset, species: list | str, final_units: str | dict) -> xr.Dataset:
    """
    Convert f"prior_{sp}", f"prior_x{sp}", f"x{sp}", f"x{sp}_uncertainty", f"x{sp}_error" units to the one specified in final_units
    Args:
        ds: dataset with column concentrations
        species: species name(s) e.g. "ch4"
        final_units: dict with species as keys containing the targeted unit (e.g. ppb) for each species as values. If float, unit is applied to all.
    Returns:
        dataset with variables converted to desired units.
    """
    if isinstance(species, str):
        species = [
            species,
        ]
    if isinstance(final_units, str):
        final_units = {sp: final_units for sp in species}

    unit_converter = load_internal_json(filename="attributes.json")["unit_interpret"]

    for sp in species:
        for var in [f"prior_{sp}", f"prior_x{sp}", f"x{sp}", f"x{sp}_uncertainty", f"x{sp}_error"]:
            with xr.set_options(keep_attrs=True):
                ds[var] = (
                    ds[var]
                    * float(unit_converter[ds[var].attrs["units"]])
                    / float(unit_converter[final_units[sp]])
                )
            ds[var].attrs["units"] = final_units[sp]
    return ds


[docs] def parse_tccon( filepath: pathType, species: str, domain: str | None = None, pressure_weights_method: str = "integration_operator", quality_filt: bool = True, resample: bool = True, chunks: dict | None = None, ) -> dict: """ Parse and extract data from netcdf downloaded from tccon archive (https://tccondata.org/). Args: filepath: Path of observation file species: Species name or synonym e.g. "ch4" domain: pressure_weights_method: method use to calculate the integration_operator. Options possible : "pressure_weight" and "integration_operator". See https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Calculating_comparison_quantities for description. quality_filt: If True, filters data keeping data with extrapolation_flags_ak_x{species} equal to 2. resample: If True, resample data at hourly scale. chunks: Chunking schema to use when storing data. It expects a dictionary of dimension name and chunk size, for example {"time": 100}. If None then a chunking schema will be set automatically by OpenGHG. See documentation for guidance on chunking: https://docs.openghg.org/tutorials/local/Adding_data/Adding_ancillary_data.html#chunking. To disable chunking pass in an empty dictionary. Returns: Dict : Dictionary of source_name : data, metadata, attributes """ filepath = Path(filepath) if filepath.suffix.lower() != ".nc": raise ValueError("Input file must be a .nc (netcdf) file.") splitted_filename = filepath.name.split(".") if splitted_filename[1:] != ["public", "qc", "nc"]: raise ValueError( "File should be of the ending by 'public.qc.nc' when downloaded directly from the tccon archive." ) var_to_read = [ f"x{species}", f"prior_x{species}", f"prior_{species}", f"ak_x{species}", f"extrapolation_flags_ak_x{species}", f"x{species}_error", "integration_operator", "long", "lat", "prior_h2o", "ak_pressure", "prior_gravity", ] data = xr.open_dataset(filepath)[var_to_read].chunk(chunks if chunks is not None else {}) # Create metadata # attributes = cast(MutableMapping, data.attrs) attributes["file_start_date"] = datetime.strptime(splitted_filename[0][2:10], "%Y%m%d").strftime( "%Y-%m-%d" ) attributes["file_end_date"] = datetime.strptime(splitted_filename[0][11:19], "%Y%m%d").strftime( "%Y-%m-%d" ) site_tccon_shortname = splitted_filename[0][:2] attributes["species"] = species attributes["domain"] = domain attributes["site"] = "T" + site_tccon_shortname.upper() attributes["network"] = "TCCON" attributes["platform"] = "site" attributes["inlet"] = "column" attributes["pressure_weights_method"] = pressure_weights_method attributes["original_file_description"] = attributes["description"] attributes["description"] = ( f"TCCON data standardised from {filepath.name}, with the pressure weights estimated via '{pressure_weights_method}'." ) attributes["calibration_scale"] = data[f"x{species}"].attrs.get("wmo_or_analogous_scale", "unknown") contact = attributes["contact"].split(" ") if "@" in contact[-1]: attributes["data_owner"] = (" ").join(contact[:-1]) attributes["data_owner_email"] = contact[-1] else: raise ValueError( "Couldn't parse the data owner and data owner email, sorry, might have to update the code." ) if data.long.values.std() > 1e-3 or data.lat.values.std() > 1e-3: raise ValueError( "Longitude and/or latitude seems to be changing over time. This situation is not currently being handled." ) attributes["longitude"] = f"{data.long.values.mean():.3f}" attributes["latitude"] = f"{data.lat.values.mean():.3f}" logger.warning("Add a check here that the site is really in the domain") # Prepare data # # Align units if data[f"prior_{species}"].units == "ppb" and data[f"prior_x{species}"].units == "ppm": with xr.set_options(keep_attrs=True): data[f"prior_{species}"] = data[f"prior_{species}"] * 1e-3 data[f"prior_{species}"].attrs["units"] = "ppm" if data[f"prior_{species}"].units != data[f"prior_x{species}"].units: raise ValueError( f"'prior_{species}' and 'prior_x{species}' have different units, please update this part of code to correct that." ) # Convert wet profile into dry _convert_prior_profile_to_dry(data, species=species) # Define integartion_operator if pressure_weights_method == "integration_operator": if attributes["file_format_version"][:4] == "2020" and attributes["data_revision"] == "R0": raise ValueError( f"A bug is affecting the 'integration_operator' variable in version 2020.R0 (see https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Using_the_integration_operator, last access:2025/07/17). Therefore the 'pressure weights should be used instead of 'integration_operator' while standardising {filepath}." ) elif pressure_weights_method == "pressure_weight": # Derive pressure thickness press = data.ak_pressure.values[:-1] - data.ak_pressure.values[1:] press = np.concatenate([press, [data.ak_pressure.values[-1]]]) / data.ak_pressure.values[0] data = data.assign({"dpj": (("prior_altitude"), press)}) # Derive pressure weight (hj), wet to dry conversion factor, # dry mole fraction of water (fdry_h2o) and prior dry xch4 if data["prior_h2o"].attrs["standard_name"] != "dry_atmosphere_mole_fraction_of_water": raise ValueError("Looks like the data haven't been dried..") M_dryH2O, M_dryAir = 18.0153, 28.9647 data["hj"] = data["dpj"] / ( data["prior_gravity"] * M_dryAir * (1 + (data["prior_h2o"] * M_dryH2O / M_dryAir)) ) data["integration_operator"] = data["hj"] / data["hj"].sum(dim="prior_altitude") # Clean dataset data = data.drop_vars(["dpj", "hj"]) else: raise ValueError( f"pressure_weights_method = '{pressure_weights_method}' is not a valid option. Options available: 'pressure_weight' or 'integration_operator'." ) data = data.drop_vars(["prior_gravity", "prior_h2o", "long", "lat"]) # Test coherency between dry and wet stuff max_diff = ( ( abs( data["prior_xch4"] - (data["prior_ch4"] * data["integration_operator"]).sum(dim="prior_altitude") ) / data["prior_xch4"] ) .max() .values ) if max_diff > 1e-6: logger.warning( f"Incoherencies between 'x{species}_prior' (supposed dry) and its recalculation from the derived integration operator and dried {species} profile (abs. rel. diff up to {100 * max_diff:.1f}% of 'x{species}_prior'). Is 'x{species}_prior' in tccon file really dry? Or have I misunderstood something?" ) # Filter the data and resample to hourly data = _filter_and_resample(data, species, quality_filt, resample) # reformat units final_units = {"ch4": "ppb"} data = _reformat_convert_units(data, species, final_units=final_units) # Rename variables data = data.rename( { "integration_operator": "pressure_weights", "ak_pressure": "pressure_levels", f"prior_{species}": f"{species}_profile_apriori", f"prior_x{species}": f"x{species}_apriori", f"ak_x{species}": f"x{species}_averaging_kernel", } ) # Define attributes data = _define_var_attrs(data, species, pressure_weights_method) # Align dimensions if all(data["ak_altitude"].values == data["prior_altitude"].values): data["altitude"] = data["ak_altitude"] lev_coord = np.arange(data["ak_altitude"].size) for var in data.data_vars: if "ak_altitude" in data[var].dims: old_dim = "ak_altitude" elif "prior_altitude" in data[var].dims: old_dim = "prior_altitude" else: continue new_var = data[var].rename({old_dim: "lev"}) new_var["lev"] = lev_coord data[var] = new_var data = data.drop_dims(["ak_altitude", "prior_altitude"]) data["lev"].attrs["short_description"] = "Number for each level within the vertically resolved data." else: raise ValueError("'ak_altitude' and ' prior_altitude' are different.") # Define metadata required_metadata = [ "species", "domain", "inlet", "site", "network", "platform", "longitude", "latitude", "data_owner", "data_owner_email", "file_start_date", "file_end_date", "file_format_version", "data_revision", "description", "calibration_scale", "pressure_weights_method", ] metadata = {k: attributes[k] for k in required_metadata} # Prepare dict to return gas_data = {species: {"metadata": metadata, "data": data, "attributes": attributes}} return gas_data