Source code for openghg.standardise.flux_timeseries._crf

import numpy as np
from pathlib import Path
from openghg.store import infer_date_range


[docs] def parse_crf( filepath: Path, species: str, source: str = "anthro", region: str = "UK", domain: str | None = None, data_type: str = "flux_timeseries", database: str | None = None, database_version: str | None = None, model: str | None = None, period: str | tuple | None = None, continuous: bool = True, ) -> dict: """ Parse CRF emissions data from the specified file. Args: filepath: Path to the '.xlsx' file containing CRF emissions data. species: Name of species source: Source of the emissions data, e.g. "energy", "anthro", default is 'anthro'. region: Region/Country of the CRF data domain: Geographic domain, default is 'None'. Instead region is used to identify area data_type: Type of data, default is 'flux_timeseries'. database: Database name if applicable. database_version: Version of the database if applicable. model: Model name if applicable. period: Period of measurements. Only needed if this can not be inferred from the time coords If specified, should be one of: - "yearly", "monthly" - suitable pandas Offset Alias - tuple of (value, unit) as would be passed to pandas.Timedelta function continuous: Whether time stamps have to be continuous. Returns: Dict: Parsed flux timeseries data in dictionary format. """ import pandas as pd from openghg.util import timestamp_now # Dictionary of species corresponding to sheet names sheet_selector = {"ch4": "Table10s3", "co2": "Table10s1", "n2o": "Table10s4", "hfc": "Table10s5"} # Creating dataframe based on species name if species.lower() in sheet_selector: dataframe = pd.read_excel(filepath, sheet_name=sheet_selector[species.lower()], skiprows=4) else: raise ValueError(f"Species {species} is incorrect. Please select from {list(sheet_selector.keys())}") if species.lower() == "co2" or species.lower() == "hfc": dataframe = dataframe.iloc[1] else: dataframe = dataframe.iloc[49] dataframe = pd.DataFrame(dataframe).iloc[2:-1] dataframe = dataframe.rename(columns={dataframe.columns[0]: "flux_timeseries"}).astype(np.floating) dataframe.index = pd.to_datetime(dataframe.index, format="%Y") metadata = {} metadata["species"] = species if domain is not None: metadata["domain"] = domain metadata["source"] = source optional_keywords = {"database": database, "database_version": database_version, "model": model} for key, value in optional_keywords.items(): if value is not None: metadata[key] = value author_name = "OpenGHG Cloud" metadata["author"] = author_name metadata["data_type"] = data_type metadata["processed"] = str(timestamp_now()) metadata["source_format"] = "crf" dataframe = dataframe.rename_axis("time") dataarray = dataframe.to_xarray() dataarray = dataarray.assign_coords(time=dataarray.time) start_date, end_date, period_str = infer_date_range( dataarray.time, filepath=filepath, period=period, continuous=continuous ) metadata["start-date"] = str(start_date) metadata["end-date"] = str(end_date) metadata["period"] = str(period_str) metadata["region"] = region key = "_".join((species, source, region)) flux_timeseries_data: dict[str, dict] = {} flux_timeseries_data[key] = {} flux_timeseries_data[key]["data"] = dataarray flux_timeseries_data[key]["metadata"] = metadata return flux_timeseries_data