from pathlib import Path
from typing import cast
from collections.abc import MutableMapping
from datetime import datetime
import numpy as np
import pandas as pd
import xarray as xr
from openghg.util import load_internal_json
from openghg.types import pathType
import logging
logger = logging.getLogger("openghg.standardise.column._tccon")
def _filter_and_resample(ds: xr.Dataset, species: str, quality_filt: bool, resample: bool) -> xr.Dataset:
"""
Filter (if quality_filt = True) the data keeping those for which "extrapolation_flags_ak_x{species}" is equal to 2.
Then resample the data on an hourly scale.
Args:
ds: dataset with column concentrations
species: species name e.g. "ch4"
quality_filt: if True, filters the data keeping those for which "extrapolation_flags_ak_x{species}" is equal to 2.
resample: if True resamples the data at hourly scale.
Returns:
dataset resampled and filtered (if asked)
"""
if quality_filt:
logger.info(f"Applying filter based on variable 'extrapolation_flags_ak_x{species}'.")
ds = ds.where(abs(ds[f"extrapolation_flags_ak_x{species}"]) != 2)
ds = ds.dropna("time").sortby("time")
if not resample:
ds[f"x{species}_uncertainty"] = ds[f"x{species}_error"]
return ds
output = ds.resample(time="h").mean(dim="time")
output[f"x{species}_uncertainty"] = ds[f"x{species}_error"].resample(time="h").max(dim="time")
logger.warning(
"Not sure that we should resample at this stage (and also resample the uncertainty like that)."
)
del output[f"extrapolation_flags_ak_x{species}"]
output = output.dropna("time")
return output
def _define_var_attrs(ds: xr.Dataset, species: str, method: str) -> xr.Dataset:
"""
Add attributes to "x{species}_uncertainty" and "pressure_weights" variables, add global attribute "derivation_method" that keeps track of the method used to derive the pressure weights.
Estimate the min and max values of each variables and store them in the attributes.
Args:
ds: dataset with column concentrations
species: species name e.g. "ch4"
method: method used to calculate the integration_operator.
Options possible : "pressure_weight" and "integration_operator". method use to calculate the integration_operator.
Options possible : "pressure_weight" and "integration_operator".
See https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Calculating_comparison_quantities for description.
Returns:
dataset with updated attributes.
"""
ds[f"x{species}_uncertainty"].attrs = {
"long_name": f"uncertainty on the x{species} measurement",
"description": f"max of x{species}_error on resampling period",
"unit": ds[f"x{species}"].units,
}
if method == "pressure_weights":
ds["pressure_weights"].attrs["long_name"] = "pressure_weights derived using pressure weight method"
ds["pressure_weights"].attrs[
"description"
] = "see doc https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Using_pressure_weights"
ds.attrs["derivation_method"] = "pressure weight"
elif method == "integration_operator":
ds["pressure_weights"].attrs["long_name"] = "pressure_weights derived using integration_operator"
ds["pressure_weights"].attrs[
"description"
] = "see doc https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Using_the_integration_operator"
ds.attrs["derivation_method"] = "integration operator"
for var in ds.data_vars:
ds[var].attrs["vmin"] = f"{ds[var].values.min():.1f}"
ds[var].attrs["vmax"] = f"{ds[var].values.max():.1f}"
return ds
def _convert_prior_profile_to_dry(ds: xr.Dataset, species: list | str) -> None:
"""
Calculate (inplace) the dry "prior_h2o", "integration_operator" and "prior_{sp}" and ovewrite the ones that are wet.
Args:
ds: dataset with column concentrations
species: species name(s) e.g. "ch4"
"""
logger.warning(
f"According to the variables attributes, 'x{species}' is dry but the profiles 'prior_h2o' and 'prior_{species}' are wet, so we dry the profiles. Should check that with the TCCON team before starting to really use the data."
)
if isinstance(species, str):
species = [
species,
]
if ds["prior_h2o"].attrs["standard_name"] == "wet_atmosphere_mole_fraction_of_water":
h2o_attrs = ds["prior_h2o"].attrs
ds["prior_h2o"] = ds["prior_h2o"] / (1 - ds["prior_h2o"])
ds["prior_h2o"].attrs = {k: v.replace("wet", "dry") for k, v in h2o_attrs.items() if k != "note"}
elif ds["prior_h2o"].attrs["standard_name"] != "dry_atmosphere_mole_fraction_of_water":
raise ValueError("'standard_name' of 'prior_h2o' is not what expected. Please check.")
if "wet" in ds["integration_operator"].attrs["description"]:
io_attrs = ds["integration_operator"].attrs
ds["integration_operator"] = ds["integration_operator"] / (1 + ds["prior_h2o"])
ds["integration_operator"].attrs = {k: v.replace("wet", "dry") for k, v in io_attrs.items()}
elif "dry" in ds["integration_operator"].attrs["description"]:
logger.info("'integration_operator' already dried, skipping conversion from wet to dry.")
else:
raise ValueError("'description' of 'integration_operator' is not what expected. Please check.")
for sp in species:
if ds[f"prior_{sp}"].attrs["standard_name"][:32] == "wet_atmosphere_mole_fraction_of_":
sp_attrs = ds[f"prior_{sp}"].attrs
ds[f"prior_{sp}"] = ds[f"prior_{sp}"] * (1 + ds["prior_h2o"])
ds[f"prior_{sp}"].attrs = {k: v.replace("wet", "dry") for k, v in sp_attrs.items() if k != "note"}
elif ds[f"prior_{sp}"].attrs["standard_name"][:32] == "dry_atmosphere_mole_fraction_of_":
logger.info(f"Prior profile of {sp} already dried, skipping conversion from wet to dry.")
else:
raise ValueError(f"'standard_name' of 'prior_{sp}' is not what expected. Please check.")
def _reformat_convert_units(ds: xr.Dataset, species: list | str, final_units: str | dict) -> xr.Dataset:
"""
Convert f"prior_{sp}", f"prior_x{sp}", f"x{sp}", f"x{sp}_uncertainty", f"x{sp}_error" units to the one specified in final_units
Args:
ds: dataset with column concentrations
species: species name(s) e.g. "ch4"
final_units: dict with species as keys containing the targeted unit (e.g. ppb) for each species as values. If float, unit is applied to all.
Returns:
dataset with variables converted to desired units.
"""
if isinstance(species, str):
species = [
species,
]
if isinstance(final_units, str):
final_units = {sp: final_units for sp in species}
unit_converter = load_internal_json(filename="attributes.json")["unit_interpret"]
for sp in species:
for var in [f"prior_{sp}", f"prior_x{sp}", f"x{sp}", f"x{sp}_uncertainty", f"x{sp}_error"]:
with xr.set_options(keep_attrs=True):
ds[var] = (
ds[var]
* float(unit_converter[ds[var].attrs["units"]])
/ float(unit_converter[final_units[sp]])
)
ds[var].attrs["units"] = final_units[sp]
return ds
[docs]
def parse_tccon(
filepath: pathType,
species: str,
domain: str | None = None,
pressure_weights_method: str = "integration_operator",
quality_filt: bool = True,
resample: bool = True,
chunks: dict | None = None,
) -> dict:
"""
Parse and extract data from netcdf downloaded from tccon archive (https://tccondata.org/).
Args:
filepath: Path of observation file
species: Species name or synonym e.g. "ch4"
domain:
pressure_weights_method: method use to calculate the integration_operator.
Options possible : "pressure_weight" and "integration_operator".
See https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Calculating_comparison_quantities for description.
quality_filt: If True, filters data keeping data with extrapolation_flags_ak_x{species} equal to 2.
resample: If True, resample data at hourly scale.
chunks: Chunking schema to use when storing data. It expects a dictionary of dimension name and chunk size,
for example {"time": 100}. If None then a chunking schema will be set automatically by OpenGHG.
See documentation for guidance on chunking: https://docs.openghg.org/tutorials/local/Adding_data/Adding_ancillary_data.html#chunking.
To disable chunking pass in an empty dictionary.
Returns:
Dict : Dictionary of source_name : data, metadata, attributes
"""
filepath = Path(filepath)
if filepath.suffix.lower() != ".nc":
raise ValueError("Input file must be a .nc (netcdf) file.")
splitted_filename = filepath.name.split(".")
if splitted_filename[1:] != ["public", "qc", "nc"]:
raise ValueError(
"File should be of the ending by 'public.qc.nc' when downloaded directly from the tccon archive."
)
var_to_read = [
f"x{species}",
f"prior_x{species}",
f"prior_{species}",
f"ak_x{species}",
f"extrapolation_flags_ak_x{species}",
f"x{species}_error",
"integration_operator",
"long",
"lat",
"prior_h2o",
"ak_pressure",
"prior_gravity",
]
data = xr.open_dataset(filepath, decode_times=False)[var_to_read].chunk(
chunks if chunks is not None else {}
)
decode_times = pd.to_datetime(data.time.values, unit="s", origin="unix", utc=True)
data = data.assign_coords(time=decode_times.values.astype("datetime64[ns]"))
# Create metadata #
attributes = cast(MutableMapping, data.attrs)
attributes["file_start_date"] = datetime.strptime(splitted_filename[0][2:10], "%Y%m%d").strftime(
"%Y-%m-%d"
)
attributes["file_end_date"] = datetime.strptime(splitted_filename[0][11:19], "%Y%m%d").strftime(
"%Y-%m-%d"
)
site_tccon_shortname = splitted_filename[0][:2]
attributes["species"] = species
attributes["domain"] = domain
attributes["site"] = "T" + site_tccon_shortname.upper()
attributes["network"] = "TCCON"
attributes["platform"] = "column"
attributes["inlet"] = "column"
attributes["pressure_weights_method"] = pressure_weights_method
attributes["original_file_description"] = attributes["description"]
attributes["description"] = (
f"TCCON data standardised from {filepath.name}, with the pressure weights estimated via '{pressure_weights_method}'."
)
attributes["calibration_scale"] = data[f"x{species}"].attrs.get("wmo_or_analogous_scale", "unknown")
contact = attributes["contact"].split(" ")
if "@" in contact[-1]:
attributes["data_owner"] = (" ").join(contact[:-1])
attributes["data_owner_email"] = contact[-1]
else:
raise ValueError(
"Couldn't parse the data owner and data owner email, sorry, might have to update the code."
)
if data.long.values.std() > 1e-3 or data.lat.values.std() > 1e-3:
raise ValueError(
"Longitude and/or latitude seems to be changing over time. This situation is not currently being handled."
)
attributes["longitude"] = f"{data.long.values.mean():.3f}"
attributes["latitude"] = f"{data.lat.values.mean():.3f}"
logger.warning("Add a check here that the site is really in the domain")
# Prepare data #
# Align units
if data[f"prior_{species}"].units == "ppb" and data[f"prior_x{species}"].units == "ppm":
with xr.set_options(keep_attrs=True):
data[f"prior_{species}"] = data[f"prior_{species}"] * 1e-3
data[f"prior_{species}"].attrs["units"] = "ppm"
if data[f"prior_{species}"].units != data[f"prior_x{species}"].units:
raise ValueError(
f"'prior_{species}' and 'prior_x{species}' have different units, please update this part of code to correct that."
)
# Convert wet profile into dry
_convert_prior_profile_to_dry(data, species=species)
# Define integartion_operator
if pressure_weights_method == "integration_operator":
if attributes["file_format_version"][:4] == "2020" and attributes["data_revision"] == "R0":
raise ValueError(
f"A bug is affecting the 'integration_operator' variable in version 2020.R0 (see https://tccon-wiki.caltech.edu/Main/AuxiliaryDataGGG2020#Using_the_integration_operator, last access:2025/07/17). Therefore the 'pressure weights should be used instead of 'integration_operator' while standardising {filepath}."
)
elif pressure_weights_method == "pressure_weight":
# Derive pressure thickness
press = data.ak_pressure.values[:-1] - data.ak_pressure.values[1:]
press = np.concatenate([press, [data.ak_pressure.values[-1]]]) / data.ak_pressure.values[0]
data = data.assign({"dpj": (("prior_altitude"), press)})
# Derive pressure weight (hj), wet to dry conversion factor,
# dry mole fraction of water (fdry_h2o) and prior dry xch4
if data["prior_h2o"].attrs["standard_name"] != "dry_atmosphere_mole_fraction_of_water":
raise ValueError("Looks like the data haven't been dried..")
M_dryH2O, M_dryAir = 18.0153, 28.9647
data["hj"] = data["dpj"] / (
data["prior_gravity"] * M_dryAir * (1 + (data["prior_h2o"] * M_dryH2O / M_dryAir))
)
data["integration_operator"] = data["hj"] / data["hj"].sum(dim="prior_altitude")
# Clean dataset
data = data.drop_vars(["dpj", "hj"])
else:
raise ValueError(
f"pressure_weights_method = '{pressure_weights_method}' is not a valid option. Options available: 'pressure_weight' or 'integration_operator'."
)
data = data.drop_vars(["prior_gravity", "prior_h2o", "long", "lat"])
# Test coherency between dry and wet stuff
max_diff = (
(
abs(
data["prior_xch4"]
- (data["prior_ch4"] * data["integration_operator"]).sum(dim="prior_altitude")
)
/ data["prior_xch4"]
)
.max()
.values
)
if max_diff > 1e-6:
logger.warning(
f"Incoherencies between 'x{species}_prior' (supposed dry) and its recalculation from the derived integration operator and dried {species} profile (abs. rel. diff up to {100 * max_diff:.1f}% of 'x{species}_prior'). Is 'x{species}_prior' in tccon file really dry? Or have I misunderstood something?"
)
# Filter the data and resample to hourly
data = _filter_and_resample(data, species, quality_filt, resample)
# reformat units
final_units = {"ch4": "ppb"}
data = _reformat_convert_units(data, species, final_units=final_units)
# Rename variables
data = data.rename(
{
"integration_operator": "pressure_weights",
"ak_pressure": "pressure_levels",
f"prior_{species}": f"{species}_profile_apriori",
f"prior_x{species}": f"x{species}_apriori",
f"ak_x{species}": f"x{species}_averaging_kernel",
}
)
# Define attributes
data = _define_var_attrs(data, species, pressure_weights_method)
# Align dimensions
if all(data["ak_altitude"].values == data["prior_altitude"].values):
data["altitude"] = data["ak_altitude"]
lev_coord = np.arange(data["ak_altitude"].size)
for var in data.data_vars:
if "ak_altitude" in data[var].dims:
old_dim = "ak_altitude"
elif "prior_altitude" in data[var].dims:
old_dim = "prior_altitude"
else:
continue
new_var = data[var].rename({old_dim: "lev"})
new_var["lev"] = lev_coord
data[var] = new_var
data = data.drop_dims(["ak_altitude", "prior_altitude"])
data["lev"].attrs["short_description"] = "Number for each level within the vertically resolved data."
else:
raise ValueError("'ak_altitude' and ' prior_altitude' are different.")
# Define metadata
required_metadata = [
"species",
"domain",
"inlet",
"site",
"network",
"platform",
"longitude",
"latitude",
"data_owner",
"data_owner_email",
"file_start_date",
"file_end_date",
"file_format_version",
"data_revision",
"description",
"calibration_scale",
"pressure_weights_method",
]
metadata = {k: attributes[k] for k in required_metadata}
# Prepare dict to return
gas_data = {species: {"metadata": metadata, "data": data, "attributes": attributes}}
return gas_data