"""
The ModelScenario class allows users to collate related data sources and calculate
modelled output based on this data. The types of data currently included are:
- Timeseries observation data (ObsData)
- Fixed domain sensitivity maps known as footprints (FootprintData)
- Fixed domain flux maps (FluxData) - multiple maps can be included and
referenced by source name
- Fixed domain vertical curtains at each boundary (BoundaryConditionsData)
A ModelScenario instance can be created by searching the object store manually
and providing these outputs:
>>> obs = get_obs_surface(site, species, inlet, ...)
>>> footprint = get_footprint(site, domain, inlet, ...)
>>> flux = get_flux(species, source, domain, ...)
>>> bc = get_bc(species, domain, bc_input, ...)
>>> model = ModelScenario(obs=obs, footprint=footprint, flux=flux, bc=bc)
A ModelScenario instance can also be created using keywords to search the object store:
>>> model = ModelScenario(site,
species,
inlet,
network,
domain,
sources=sources,
bc_input=bc_input,
start_date=start_date,
end_date=end_date)
A ModelScenario instance can also be initialised and then populated after creation:
>>> model = ModelScenario()
>>> model.add_obs(obs=obs)
>>> model.add_footprint(site, inlet, domain, ...)
>>> model.add_flux(species, domain, sources, ...)
>>> model.add_bc(species, domain, bc_input, ...)
Once created, methods can be called on ModelScenario which will combine these
data sources and cache the outputs (if requested) to make for quicker calculation.
>>> modelled_obs = model.calc_modelled_obs()
>>> modelled_baseline = model.calc_modelled_baseline()
>>> combined_data = model.footprints_data_merge()
If some input types needed for these operations are missing, the user will be alerted
on which data types are missing.
"""
import logging
from rich.progress import track
from typing import Any, Dict, List, Literal, Optional, Sequence, Tuple, Union, cast
import numpy as np
import pandas as pd
from openghg.dataobjects import BoundaryConditionsData, FluxData, FootprintData, ObsData
from openghg.retrieve import (
get_obs_surface,
get_bc,
get_flux,
get_footprint,
search_surface,
search_bc,
search_flux,
search_footprints,
)
from openghg.util import synonyms
from openghg.types import SearchError
from pandas import Timestamp
from xarray import DataArray, Dataset
__all__ = ["ModelScenario", "combine_datasets", "stack_datasets", "calc_dim_resolution", "match_dataset_dims"]
# TODO: Really with the emissions, they shouldn't need to match against a domain
# We should be able to grab global/bigger area emissions and cut that down
# to whichever area our LPDM model covers.
# TODO: Add static methods for different ways of creating the class
# e.g. from_existing_data(), from_search(), empty() , ...
ParamType = Union[List[Dict[str, Optional[str]]], Dict[str, Optional[str]]]
methodType = Optional[Literal["nearest", "pad", "ffill", "backfill", "bfill"]]
logger = logging.getLogger("openghg.analyse")
logger.setLevel(logging.INFO) # Have to set level for logger as well as handler
[docs]
class ModelScenario:
"""
This class stores together observation data with ancillary data and allows
operations to be performed combining these inputs.
"""
def __bool__(self) -> bool:
return bool(self.obs) or bool(self.footprint) or bool(self.fluxes) or bool(self.bc)
[docs]
def __init__(
self,
site: Optional[str] = None,
species: Optional[str] = None,
inlet: Optional[str] = None,
height: Optional[str] = None,
network: Optional[str] = None,
domain: Optional[str] = None,
model: Optional[str] = None,
met_model: Optional[str] = None,
fp_inlet: Union[str, list, None] = None,
source: Optional[str] = None,
sources: Optional[Union[str, Sequence]] = None,
bc_input: Optional[str] = None,
start_date: Union[str, Timestamp, None] = None,
end_date: Union[str, Timestamp, None] = None,
obs: Optional[ObsData] = None,
footprint: Optional[FootprintData] = None,
flux: Union[FluxData, Dict[str, FluxData], None] = None,
bc: Optional[BoundaryConditionsData] = None,
store: Optional[str] = None,
):
"""
Create a ModelScenario instance based on a set of keywords to be
or directly supplied objects. This can be created as an empty class to be
populated.
The keywords are related to observation, footprint and flux data
which may be available within the object store. The combination of these supplied
will be used to extract the relevant data. Related keywords are as follows:
- Observation data: site, species, inlet, network, start_date, end_data
- Footprint data: site, inlet, domain, model, met_model, species, start_date, end_date
- Flux data: species, sources, domain, start_date, end_date
Args:
site: Site code e.g. "TAC"
species: Species code e.g. "ch4"
inlet: Inlet value e.g. "10m"
height: Alias for inlet.
network: Network name e.g. "AGAGE"
domain: Domain name e.g. "EUROPE"
model: Model name used in creation of footprint e.g. "NAME"
met_model: Name of met model used in creation of footprint e.g. "UKV"
fp_inlet: Specify footprint release height options if this doesn't match to site value.
sources: Emissions sources
bc_input: Input keyword for boundary conditions e.g. "mozart" or "cams"
start_date: Start of date range to use. Note for flux this may not be applied
end_date: End of date range to use. Note for flux this may not be applied
obs: Supply ObsData object directly (e.g. from get_obs...() functions)
footprint: Supply FootprintData object directly (e.g. from get_footprint() function)
flux: Supply FluxData object directly (e.g. from get_flux() function)
store: Name of object store to retrieve data from.
Returns:
None
Sets up instance of class with associated values.
TODO: For obs, footprint, flux should we also allow Dataset input and turn
these into the appropriate class?
"""
self.obs: Optional[ObsData] = None
self.footprint: Optional[FootprintData] = None
self.fluxes: Optional[Dict[str, FluxData]] = None
self.bc: Optional[BoundaryConditionsData] = None
if species is not None:
species = synonyms(species)
# Add observation data (directly or through keywords)
self.add_obs(
site=site,
species=species,
inlet=inlet,
height=height,
network=network,
start_date=start_date,
end_date=end_date,
obs=obs,
store=store,
)
# Make sure obs data is present, make sure inputs match to metadata
if self.obs is not None:
obs_metadata = self.obs.metadata
site = obs_metadata["site"]
species = obs_metadata["species"]
inlet = obs_metadata["inlet"]
logger.info("Updating any inputs based on observation data")
logger.info(f"site: {site}, species: {species}, inlet: {inlet}")
# Add footprint data (directly or through keywords)
self.add_footprint(
site=site,
inlet=inlet,
height=height,
domain=domain,
model=model,
met_model=met_model,
fp_inlet=fp_inlet,
start_date=start_date,
end_date=end_date,
species=species,
footprint=footprint,
store=store,
)
# Add flux data (directly or through keywords)
self.add_flux(
species=species,
domain=domain,
source=source,
sources=sources,
start_date=start_date,
end_date=end_date,
flux=flux,
store=store,
)
# Add boundary conditions (directly or through keywords)
self.add_bc(
species=species,
bc_input=bc_input,
domain=domain,
start_date=start_date,
end_date=end_date,
bc=bc,
store=store,
)
# Initialise attributes used for caching
self.scenario: Optional[Dataset] = None
self.modelled_obs: Optional[DataArray] = None
self.modelled_baseline: Optional[DataArray] = None
self.flux_stacked: Optional[Dataset] = None
# TODO: Check species, site etc. values align between inputs?
def _get_data(self, keywords: ParamType, data_type: str) -> Any:
"""
Use appropriate get function to search for data in object store.
"""
get_functions = {
"obs_surface": get_obs_surface,
"footprint": get_footprint,
"flux": get_flux,
"boundary_conditions": get_bc,
}
search_functions = {
"obs_surface": search_surface,
"footprint": search_footprints,
"flux": search_flux,
"boundary_conditions": search_bc,
}
get_fn = get_functions[data_type]
search_fn = search_functions.get(data_type)
if isinstance(keywords, dict):
keywords = [keywords]
data = None
num_checks = len(keywords)
for i, keyword_set in enumerate(keywords):
try:
data = get_fn(**keyword_set) # type:ignore
except (SearchError, AttributeError):
num = i + 1
if num == num_checks:
logger.warning(f"Unable to add {data_type} data based on keywords supplied.")
logger.warning(" Inputs -")
for key, value in keyword_set.items():
logger.info(f" {key}: {value}")
if search_fn is not None:
data_search = search_fn(**keyword_set) # type:ignore
logger.info("---- Search results ---")
logger.info(f"Number of results returned: {len(data_search)}")
logger.info(data_search.results)
print("\n")
data = None
else:
logger.info(f"Adding {data_type} to model scenario")
break
return data
[docs]
def add_obs(
self,
site: Optional[str] = None,
species: Optional[str] = None,
inlet: Optional[str] = None,
height: Optional[str] = None,
network: Optional[str] = None,
start_date: Optional[Union[str, Timestamp]] = None,
end_date: Optional[Union[str, Timestamp]] = None,
obs: Optional[ObsData] = None,
store: Optional[str] = None,
) -> None:
"""
Add observation data based on keywords or direct ObsData object.
"""
from openghg.util import clean_string, format_inlet
# Search for obs data based on keywords
if site is not None and obs is None:
site = clean_string(site)
if height is not None and inlet is None:
inlet = height
inlet = clean_string(inlet)
inlet = format_inlet(inlet)
# search for obs based on suitable keywords - site, species, inlet
obs_keywords = {
"site": site,
"species": species,
"inlet": inlet,
"network": network,
"start_date": start_date,
"end_date": end_date,
"store": store,
}
obs = self._get_data(obs_keywords, data_type="obs_surface")
self.obs = obs
# Add keywords to class for convenience
if self.obs is not None:
self.site = self.obs.metadata["site"]
self.species = self.obs.metadata["species"]
self.inlet = self.obs.metadata["inlet"]
[docs]
def add_flux(
self,
species: Optional[str] = None,
domain: Optional[str] = None,
source: Optional[str] = None,
sources: Optional[Union[str, Sequence]] = None,
start_date: Optional[Union[str, Timestamp]] = None,
end_date: Optional[Union[str, Timestamp]] = None,
flux: Optional[Union[FluxData, Dict[str, FluxData]]] = None,
store: Optional[str] = None,
) -> None:
"""
Add flux data based on keywords or direct FluxData object.
Can add flux datasets for multiple sources.
"""
if self.fluxes is not None:
# Check current species in any flux data
if species is not None:
current_flux_1 = list(self.fluxes.values())[0]
current_species = current_flux_1.metadata["species"]
if species != current_species:
raise ValueError(
f"New data must match current species {current_species} in ModelScenario. Input value: {species}"
)
if species is not None and flux is None:
flux = {}
if sources is None and source is not None:
sources = [source]
elif sources is None or isinstance(sources, str):
sources = [sources]
for name in sources:
flux_keywords_1 = {"species": species, "source": name, "domain": domain, "store": store}
# For CO2 we need additional emissions data before a start_date to
# match to high time resolution footprints.
# For now, just extract all data
if species == "co2":
flux_keywords = [flux_keywords_1]
else:
flux_keywords_2 = flux_keywords_1.copy()
flux_keywords_1["start_date"] = start_date
flux_keywords_1["end_date"] = end_date
flux_keywords = [flux_keywords_1, flux_keywords_2]
# TODO: Add something to allow for e.g. global domain or no domain
flux_source = self._get_data(flux_keywords, data_type="flux")
# TODO: May need to update this check if flux_source is empty FluxData() object
if flux_source is not None:
# try to infer source name from FluxData metadata
if name is None and len(sources) == 1:
try:
name = flux_source.metadata["source"]
except KeyError:
raise ValueError(
"Flux 'source' could not be inferred from metadata/attributes. Please specify the source(s) of the flux."
)
flux[name] = flux_source
elif flux is not None:
if not isinstance(flux, dict):
try:
name = flux.metadata["source"]
except KeyError:
raise ValueError(
"Flux 'source' could not be inferred from `flux` metadata/attributes. Please specify the source(s) of the flux in the `FluxData` metadata.."
)
else:
flux = {name: flux}
# TODO: Make this so flux.anthro can be called etc. - link in some way
if self.fluxes is not None:
if flux:
self.fluxes.update(flux)
else:
# Flux can be None or empty dict.
if flux:
self.fluxes = flux
if self.fluxes is not None:
if not hasattr(self, "species"):
flux_values = list(self.fluxes.values())
flux_1 = flux_values[0]
self.species = flux_1.metadata["species"]
self.flux_sources = list(self.fluxes.keys())
[docs]
def add_bc(
self,
species: Optional[str] = None,
bc_input: Optional[str] = None,
domain: Optional[str] = None,
start_date: Optional[Union[str, Timestamp]] = None,
end_date: Optional[Union[str, Timestamp]] = None,
bc: Optional[BoundaryConditionsData] = None,
store: Optional[str] = None,
) -> None:
"""
Add boundary conditions data based on keywords or direct BoundaryConditionsData object.
"""
# Search for boundary conditions data based on keywords
# - domain, species, bc_input
if domain is not None and bc is None:
bc_keywords = {
"species": species,
"domain": domain,
"bc_input": bc_input,
"start_date": start_date,
"end_date": end_date,
"store": store,
}
bc = self._get_data(bc_keywords, data_type="boundary_conditions")
self.bc = bc
def _check_data_is_present(self, need: Optional[Union[str, Sequence]] = None) -> None:
"""
Check whether correct data types have been included. This should
be used by functions to check whether they can perform the requested
operation with the data types available.
Args:
need (list) : Names of objects needed for the function being called.
Should be one or more of "obs", "footprint", "fluxes" (or "flux")
Returns:
None
Raises ValueError is necessary data is missing.
"""
if need is None:
need = ["obs", "footprint"]
elif isinstance(need, str):
need = [need]
need = ["fluxes" if value == "flux" else value for value in need] # Make sure attributes match
missing = []
for attr in need:
value = getattr(self, attr)
if value is None:
missing.append(attr)
logger.error(f"Must have {attr} data linked to this ModelScenario to run this function")
logger.error("Include this by using the add function, with appropriate inputs:")
logger.error(" ModelScenario.add_{attr}(...)")
if missing:
raise ValueError(f"Missing necessary {' and '.join(missing)} data.")
def _get_platform(self) -> Optional[str]:
"""
Find the platform for a site, if present.
This will access the "site_info.json" file from openghg_defs dependency to
find this information.
"""
from openghg.util import get_site_info
try:
site = self.site
site_upper = site.upper()
except AttributeError:
return None
else:
site_data = get_site_info()
try:
site_details = site_data[site_upper]
except KeyError:
return None
else:
platform: str = site_details.get("platform")
return platform
def _align_obs_footprint(self, resample_to: str = "coarsest", platform: Optional[str] = None) -> Tuple:
"""
Slice and resample obs and footprint data to align along time
This slices the date to the smallest time frame
spanned by both the footprint and obs, using the sliced start date
The time dimension is resampled based on the resample_to input using the mean.
The resample_to options are:
- "coarsest" - resample to the coarsest resolution between obs and footprints
- "obs" - resample to observation data frequency
- "footprint" - resample to footprint data frequency
- a valid resample period e.g. "2H"
Args:
resample_to: Resample option to use: either data based or using a valid pandas resample period.
platform: Observation platform used to decide whether to resample
Returns:
tuple: Two xarray.Dataset with aligned time dimensions
"""
# Check data is present (not None) and cast to correct type
self._check_data_is_present(need=["obs", "footprint"])
obs = cast(ObsData, self.obs)
footprint = cast(FootprintData, self.footprint)
obs_data = obs.data
footprint_data = footprint.data
resample_keyword_choices = ("obs", "footprint", "coarsest")
# Check whether resample has been requested by specifying a specific period rather than a keyword
if resample_to in resample_keyword_choices:
force_resample = False
else:
force_resample = True
if platform is not None:
platform = platform.lower()
# Do not apply resampling for "satellite" (but have re-included "flask" for now)
if platform == "satellite":
return obs_data, footprint_data
# Whether sampling period is present or we need to try to infer this
infer_sampling_period = False
# Get the period of measurements in time
obs_attributes = obs_data.attrs
if "averaged_period" in obs_attributes:
obs_data_period_s = float(obs_attributes["averaged_period"])
elif "sampling_period" in obs_attributes:
sampling_period = obs_attributes["sampling_period"]
if sampling_period == "NOT_SET":
infer_sampling_period = True
elif sampling_period == "multiple":
# If we have a varying sampling_period, make sure we always resample to footprint
obs_data_period_s = 1.0
else:
obs_data_period_s = float(sampling_period)
elif "sampling_period_estimate" in obs_attributes:
estimate = obs_attributes["sampling_period_estimate"]
logger.warning(f"Using estimated sampling period of {estimate}s for observational data")
obs_data_period_s = float(estimate)
else:
infer_sampling_period = True
if infer_sampling_period:
# Attempt to derive sampling period from frequency of data
obs_data_period_s = np.nanmedian(
(obs_data.time.data[1:] - obs_data.time.data[0:-1]) / 1e9
).astype("float32")
obs_data_period_s_min = np.diff(obs_data.time.data).min() / 1e9
obs_data_period_s_max = np.diff(obs_data.time.data).max() / 1e9
max_diff = (obs_data_period_s_max - obs_data_period_s_min).astype(float)
# Check if the periods differ by more than 1 second
if max_diff > 1.0:
raise ValueError("Sample period can be not be derived from observations")
estimate = f"{obs_data_period_s:.1f}"
logger.warning(f"Sampling period was estimated (inferred) from data frequency: {estimate}s")
obs.data.attrs["sampling_period_estimate"] = estimate
# TODO: Check regularity of the data - will need this to decide is resampling
# is appropriate or need to do checks on a per time point basis
obs_data_period_ns = obs_data_period_s * 1e9
obs_data_timeperiod = pd.Timedelta(obs_data_period_ns, unit="ns")
# Derive the footprints period from the frequency of the data
footprint_data_period_ns = np.nanmedian(
(footprint_data.time.data[1:] - footprint_data.time.data[0:-1]).astype("int64")
)
footprint_data_timeperiod = pd.Timedelta(footprint_data_period_ns, unit="ns")
# If resample_to is set to "coarsest", check whether "obs" or "footprint" have lower resolution
if resample_to == "coarsest":
if obs_data_timeperiod >= footprint_data_timeperiod:
resample_to = "obs"
elif obs_data_timeperiod < footprint_data_timeperiod:
resample_to = "footprint"
# Here we want timezone naive Timestamps
# Add sampling period to end date to make sure resample includes these values when matching
obs_startdate = Timestamp(obs_data.time[0].values)
obs_enddate = Timestamp(obs_data.time[-1].values) + obs_data_timeperiod
footprint_startdate = Timestamp(footprint_data.time[0].values)
footprint_enddate = Timestamp(footprint_data.time[-1].values) + footprint_data_timeperiod
start_date = max(obs_startdate, footprint_startdate)
end_date = min(obs_enddate, footprint_enddate)
# Ensure lower range is covered for obs
start_obs_slice = start_date - pd.Timedelta("1ns")
# Ensure extra buffer is added for footprint based on fp timeperiod.
# This is to ensure footprint can be forward-filled to obs (in later steps)
start_footprint_slice = start_date - (footprint_data_timeperiod - pd.Timedelta("1ns"))
# Subtract very small time increment (1 nanosecond) to make this an exclusive selection
end_slice = end_date - pd.Timedelta("1ns")
obs_data = obs_data.sel(time=slice(start_obs_slice, end_slice))
footprint_data = footprint_data.sel(time=slice(start_footprint_slice, end_slice))
if obs_data.time.size == 0 or footprint_data.time.size == 0:
raise ValueError("Obs data and Footprint data don't overlap")
# Only non satellite datasets with different periods need to be resampled
timeperiod_diff_s = np.abs(obs_data_timeperiod - footprint_data_timeperiod).total_seconds()
tolerance = 1e-9 # seconds
if timeperiod_diff_s >= tolerance or force_resample:
offset = pd.Timedelta(
hours=start_date.hour + start_date.minute / 60.0 + start_date.second / 3600.0
)
offset = cast(pd.Timedelta, offset)
if resample_to == "obs":
resample_period = str(round(obs_data_timeperiod / np.timedelta64(1, "h"), 5)) + "H"
footprint_data = footprint_data.resample(
indexer={"time": resample_period}, offset=offset
).mean()
elif resample_to == "footprint":
resample_period = str(round(footprint_data_timeperiod / np.timedelta64(1, "h"), 5)) + "H"
obs_data = obs_data.resample(indexer={"time": resample_period}, offset=offset).mean()
else:
resample_period = resample_to
footprint_data = footprint_data.resample(
indexer={"time": resample_period}, offset=offset
).mean()
obs_data = obs_data.resample(indexer={"time": resample_period}, offset=offset).mean()
return obs_data, footprint_data
def _clean_sources_input(self, sources: Optional[Union[str, List]] = None) -> List:
"""
Check sources input and make sure this is a list. If None, this will extract
all sources from self.fluxes.
"""
self._check_data_is_present(need=["fluxes"])
flux_dict = cast(Dict[str, FluxData], self.fluxes)
if sources is None:
sources = list(flux_dict.keys())
elif isinstance(sources, str):
sources = [sources]
return sources
[docs]
def combine_flux_sources(
self, sources: Optional[Union[str, List]] = None, cache: bool = True, recalculate: bool = False
) -> Dataset:
"""
Combine together flux sources on the time dimension. This will align to
the time of the highest frequency flux source both for time range and frequency.
Args:
sources : Names of sources to combine. Should already be attached to ModelScenario.
cache : Cache this data after calculation. Default = True
Returns:
Dataset: All flux sources stacked on the time dimension.
"""
self._check_data_is_present(need=["fluxes"])
flux_dict = cast(Dict[str, FluxData], self.fluxes)
time_dim = "time"
sources = self._clean_sources_input(sources)
sources_str = ", ".join(sources)
# Return any matching cached data
if self.flux_stacked is not None and not recalculate:
if self.flux_stacked.attrs["sources"] == sources_str:
return self.flux_stacked
flux_datasets = [flux_dict[source].data for source in sources]
if len(sources) == 1:
return flux_datasets[0]
# Make sure other dimensions than time are aligned between flux datasets
# - expects values to be closely aligned so only allows for a small floating point tolerance
dims = list(flux_datasets[0].dims)
dims.remove("time")
flux_datasets = match_dataset_dims(flux_datasets, dims=dims)
try:
flux_stacked = stack_datasets(flux_datasets, dim=time_dim, method="ffill")
except ValueError:
raise ValueError(f"Unable to combine flux data for sources: {sources_str}")
if cache:
flux_stacked.attrs["sources"] = sources_str
self.flux_stacked = flux_stacked
return flux_stacked
def _check_footprint_resample(self, resample_to: str) -> Dataset:
"""
Check whether footprint needs resampling based on resample_to input.
Ignores resample_to keywords of ("coarsest", "obs", "footprint") as this is
for comparison with observation data but uses pandas frequencies to resample.
"""
footprint = cast(FootprintData, self.footprint)
if resample_to in ("coarsest", "obs", "footprint"):
return footprint.data
else:
footprint_data = footprint.data
time = footprint_data["time"].values
start_date = Timestamp(time[0])
offset = pd.Timedelta(
hours=start_date.hour + start_date.minute / 60.0 + start_date.second / 3600.0
)
offset = cast(pd.Timedelta, offset) # mypy thinks this could be NaT
footprint_data = footprint_data.resample(indexer={"time": resample_to}, offset=offset).mean()
return footprint_data
def _param_setup(
self,
param: str = "modelled_obs",
resample_to: str = "coarsest",
platform: Optional[str] = None,
recalculate: bool = False,
) -> bool:
"""
Decide if calculation is needed for input parameter and set up
underlying parameters accordingly. This will populate the
self.scenario attribute if not already present or if this needs
to be recalculated.
Args:
param : Name of the parameter being calculated.
Should be one of "modelled_obs", "modelled_baseline"
resample_to: Resample option to use for averaging:
- either one of ["coarsest", "obs", "footprint"] to match to the datasets
- or using a valid pandas resample period e.g. "2H".
Default = "coarsest".
platform: Observation platform used to decide whether to resample e.g. "site", "satellite".
cache: Cache this data after calculation. Default = True.
recalculate: Make sure to recalculate this data rather than return from cache. Default = False.
Returns:
bool: True if param should be calculated, False otherwise
Populates details of ModelScenario.scenario to use in calculation.
"""
try:
parameter = getattr(self, param)
except AttributeError:
raise ValueError(f"Did not recognise input for {param}")
# Check if cached modelled observations exist
# if self.modelled_obs is None or recalculate:
if parameter is None or recalculate:
# Check if observations are present and use these for resampling
if self.obs is not None:
self.combine_obs_footprint(
resample_to, platform=platform, recalculate=recalculate, cache=True
)
else:
self.scenario = self._check_footprint_resample(resample_to)
else:
if self.obs is not None:
# Check previous resample_to input for cached data
# prev_resample_to = self.modelled_obs.attrs.get("resample_to")
prev_resample_to = parameter.attrs.get("resample_to")
# Check if this previous resample period matches input value
# - if not (or explicit recalculation requested), recreate scenario
# - if so return cached modelled observations
if prev_resample_to != resample_to or recalculate:
self.combine_obs_footprint(resample_to, platform=platform, cache=True)
else:
# return self.modelled_obs
return False
elif recalculate:
# Recalculate based on footprint data if obs not present
self.scenario = self._check_footprint_resample(resample_to)
# TODO: Add check for matching sources and recalculate otherwise
else:
# Return cached modelled observations if explicit recalculation not requested
# return self.modelled_obs
return False
return True
[docs]
def calc_modelled_obs(
self,
sources: Optional[Union[str, List]] = None,
resample_to: str = "coarsest",
platform: Optional[str] = None,
cache: bool = True,
recalculate: bool = False,
) -> DataArray:
"""
Calculate the modelled observation points based on site footprint and fluxes.
The time points returned are dependent on the resample_to option chosen.
If obs data is also linked to the ModelScenario instance, this will be used
to derive the time points where appropriate.
Args:
sources: Sources to use for flux. All will be used and stacked if not specified.
resample_to: Resample option to use for averaging:
- either one of ["coarsest", "obs", "footprint"] to match to the datasets
- or using a valid pandas resample period e.g. "2H".
Default = "coarsest".
platform: Observation platform used to decide whether to resample e.g. "site", "satellite".
cache: Cache this data after calculation. Default = True.
recalculate: Make sure to recalculate this data rather than return from cache. Default = False.
Returns:
xarray.DataArray: Modelled observation values along the time axis
If cache is True:
This data will also be cached as the ModelScenario.modelled_obs attribute.
The associated scenario data will be cached as the ModelScenario.scenario attribute.
"""
self._check_data_is_present(need=["footprint", "fluxes"])
param_calculate = self._param_setup(
param="modelled_obs", resample_to=resample_to, platform=platform, recalculate=recalculate
)
if not param_calculate:
modelled_obs = cast(DataArray, self.modelled_obs)
return modelled_obs
# Check species and use high time resolution steps if this is carbon dioxide
if self.species == "co2":
modelled_obs = self._calc_modelled_obs_HiTRes(
sources=sources, output_TS=True, output_fpXflux=False
)
name = "mf_mod_high_res"
else:
modelled_obs = self._calc_modelled_obs_integrated(
sources=sources, output_TS=True, output_fpXflux=False
)
name = "mf_mod"
modelled_obs.attrs["resample_to"] = resample_to
modelled_obs = modelled_obs.rename(name)
# Cache output from calculations
if cache:
logger.info("Caching calculated data")
self.modelled_obs = modelled_obs
# self.scenario[name] = modelled_obs
else:
self.modelled_obs = None # Make sure this is reset and not cached
self.scenario = None # Reset this to None after calculation completed
return modelled_obs
def _calc_modelled_obs_integrated(
self,
sources: Optional[Union[str, List]] = None,
output_TS: bool = True,
output_fpXflux: bool = False,
) -> Any:
"""
Calculate modelled mole fraction timeseries using integrated footprints data.
Args:
sources : Flux sources to use for the calculation. By default this will use all available sources.
output_TS : Whether to output the modelled mole fraction timeseries DataArray.
Default = True
output_fpXflux : Whether to output the modelled flux map DataArray used to create
the timeseries. Default = False
Returns:
DataArray / DataArray :
Modelled mole fraction timeseries, dimensions = (time)
Modelled flux map, dimensions = (lat, lon, time)
If one of output_TS and output_fpXflux are True:
DataArray is returned for the respective output
If both output_TS and output_fpXflux are both True:
Both DataArrays are returned.
"""
if self.scenario is None:
raise ValueError("Combined data must have been defined before calling this function.")
scenario = self.scenario
flux = self.combine_flux_sources(sources)
scenario, flux = match_dataset_dims([scenario, flux], dims=["lat", "lon"])
flux = flux.reindex_like(scenario, "ffill")
flux_modelled: DataArray = scenario["fp"] * flux["flux"]
timeseries: DataArray = flux_modelled.sum(["lat", "lon"])
# TODO: Add details about units to output
if output_TS and output_fpXflux:
return timeseries, flux_modelled
elif output_TS:
return timeseries
elif output_fpXflux:
return flux_modelled
def _calc_modelled_obs_HiTRes(
self,
sources: Optional[Union[str, List]] = None,
averaging: Optional[str] = None,
output_TS: bool = True,
output_fpXflux: bool = False,
) -> Any:
"""
Calculate modelled mole fraction timeseries using high time resolution
footprints data and emissions data. This is appropriate for time variable
species reliant on high time resolution footprints such as carbon dioxide (co2).
Args:
sources : Flux sources to use for the calculation. By default this will use all available sources.
averaging : Time resolution to use to average the time dimension. Default = None
output_TS : Whether to output the modelled mole fraction timeseries DataArray.
Default = True
output_fpXflux : Whether to output the modelled flux map DataArray used to create
the timeseries. Default = False
Returns:
DataArray / DataArray :
Modelled mole fraction timeseries, dimensions = (time)
Modelled flux map, dimensions = (lat, lon, time)
If one of output_TS and output_fpXflux are True:
DataArray is returned for the respective output
If both output_TS and output_fpXflux are both True:
Both DataArrays are returned.
TODO: Low frequency flux values may need to be selected from the month before
(currently selecting the same month).
TODO: Indexing for low frequency flux should be checked to make sure this
allows for crossing over the end of the year.
TODO: Currently using pure dask arrays (based on Hannah's original code)
but would be good to update this to add more pre-indexing using xarray
and/or use dask as part of datasets.
TODO: May want to update this to not rely on indexing when selecting
the appropriate flux values. At the moment this solution has been chosen
because selecting on a dimension, rather than indexing, can be *very* slow
depending on the operations performed beforehand on the Dataset (e.g.
resample and reindex)
TODO: This code currently resamples the frequency to be regular. This will
have no effect if the time frequency was already regular but this may
not be what we want and may want to add extra code to remove any NaNs, if
they are introduced or to find a way to remove this requirement.
TODO: mypy having trouble with different types options and incompatible types,
included as Any for now.
"""
from math import gcd
import dask.array as da # type: ignore
from pandas import date_range
# TODO: Need to work out how this fits in with high time resolution method
# Do we need to flag low resolution to use a different method? natural / anthro for example
if self.scenario is None:
raise ValueError("Combined data must have been defined before calling this function.")
fp_HiTRes = self.scenario.fp_HiTRes
flux_ds = self.combine_flux_sources(sources)
fp_HiTRes, flux_ds = match_dataset_dims([fp_HiTRes, flux_ds], dims=["lat", "lon"])
# Make sure any NaN values are set to zero as this is a multiplicative and summing operation
fp_HiTRes = fp_HiTRes.fillna(0.0)
flux_ds["flux"] = flux_ds["flux"].fillna(0.0)
# Calculate time resolution for both the flux and footprints data
nanosecond_to_hour = 1 / (1e9 * 60.0 * 60.0)
flux_res_H = int(flux_ds.time.diff(dim="time").values.mean() * nanosecond_to_hour)
fp_res_time_H = int(fp_HiTRes.time.diff(dim="time").values.mean() * nanosecond_to_hour)
fp_res_Hback_H = int(fp_HiTRes["H_back"].diff(dim="H_back").values.mean())
# Define resolution on time dimension in number in hours
if averaging:
try:
time_res_H = int(averaging)
time_resolution = f"{time_res_H}H"
except (ValueError, TypeError):
time_res_H = int(averaging[0])
time_resolution = averaging
else:
# If not specified derive from time from combined dataset
time_res_H = fp_res_time_H
time_resolution = f"{time_res_H}H"
# Resample fp timeseries to match time resolution
if fp_res_time_H != time_res_H:
fp_HiTRes = fp_HiTRes.resample(time=time_resolution).ffill()
# Define resolution on high frequency dimension in number of hours
# At the moment this is matched to the Hback dimension
time_hf_res_H = fp_res_Hback_H
# Only allow for high frequency resolution < 24 hours
if time_hf_res_H > 24:
raise ValueError(f"High frequency resolution must be <= 24 hours. Current: {time_hf_res_H}H")
elif 24 % time_hf_res_H != 0 or 24 % time_hf_res_H != 0.0:
raise ValueError(
f"High frequency resolution must exactly divide into 24 hours. Current: {time_hf_res_H}H"
)
# Find the greatest common denominator between time and high frequency resolutions.
# This is needed to make sure suitable flux frequency is used to allow for indexing.
# e.g. time: 1H; hf (high frequency): 2H, highest_res_H would be 1H
# e.g. time: 2H; hf (high frequency): 3H, highest_res_H would be 1H
highest_res_H = gcd(time_res_H, time_hf_res_H)
highest_resolution = f"{highest_res_H}H"
# create time array to loop through, with the required resolution
# fp_HiTRes.time is the release time of particles into the model
time_array = fp_HiTRes["time"]
lat = fp_HiTRes["lat"]
lon = fp_HiTRes["lon"]
hback = fp_HiTRes["H_back"]
ntime = len(time_array)
nlat = len(lat)
nlon = len(lon)
# nh_back = len(hback)
# Define maximum hour back
max_h_back = int(hback.values[-1])
# Define full range of dates to select from the flux input
date_start = time_array[0]
date_start_back = date_start - np.timedelta64(max_h_back, "h")
date_end = time_array[-1] + np.timedelta64(1, "s")
start = {
dd: getattr(np.datetime64(time_array[0].values, "h").astype(object), dd)
for dd in ["month", "year"]
}
# Create times for matching to the flux
full_dates = date_range(
date_start_back.values, date_end.values, freq=highest_resolution, inclusive="left"
).to_numpy()
# Create low frequency flux data (monthly)
flux_ds_low_freq = flux_ds.resample({"time": "1MS"}).mean().sel(time=slice(date_start_back, date_end))
flux_ds_low_freq = flux_ds_low_freq.transpose(*("lat", "lon", "time"))
# Select and align high frequency flux data
flux_ds_high_freq = flux_ds.sel(time=slice(date_start_back, date_end))
if flux_res_H <= 24:
offset = pd.Timedelta(
hours=date_start_back.dt.hour.data
+ date_start_back.dt.minute.data / 60.0
+ date_start_back.dt.second.data / 3600.0
)
offset = cast(pd.Timedelta, offset)
if flux_res_H <= highest_res_H:
# Downsample flux to match to footprints frequency
flux_ds_high_freq = flux_ds_high_freq.resample(
{"time": highest_resolution}, offset=offset
).mean()
elif flux_res_H > highest_res_H:
# Upsample flux to match footprints frequency and forward fill
flux_ds_high_freq = flux_ds_high_freq.resample(
{"time": highest_resolution}, offset=offset
).ffill()
# Reindex to match to correct values
flux_ds_high_freq = flux_ds_high_freq.reindex({"time": full_dates}, method="ffill")
elif flux_res_H > 24:
# If flux is not high frequency use the monthly averages instead.
flux_ds_high_freq = flux_ds_low_freq
# TODO: Add check to make sure time values are exactly aligned based on date range
# Make sure the dimensions match the expected order for indexing
fp_HiTRes = fp_HiTRes.transpose(*("lat", "lon", "time", "H_back"))
flux_ds_high_freq = flux_ds_high_freq.transpose(*("lat", "lon", "time"))
# Extract footprints array to use in numba loop
fp_HiTRes = da.array(fp_HiTRes)
# Set up a numpy array to calculate the product of the footprints (H matrix) with the fluxes
if output_fpXflux:
fpXflux = da.zeros((nlat, nlon, ntime))
if output_TS:
timeseries = da.zeros(ntime)
# Iterate through the time coord to get the total mf at each time step using the H back coord
# at each release time we disaggregate the particles backwards over the previous 24hrs
# The final value then contains the 29-day integrated residual footprints
logger.info("Calculating modelled timeseries comparison:")
iters = track(time_array.values)
for tt, time in enumerate(iters):
# Get correct index for low resolution data based on start and current date
current = {dd: getattr(np.datetime64(time, "h").astype(object), dd) for dd in ["month", "year"]}
tt_low = current["month"] - start["month"] + 12 * (current["year"] - start["year"])
# get 4 dimensional chunk of high time res footprints for this timestep
# units : mol/mol/mol/m2/s
# reverse the time coordinate to be chronological
fp_time = fp_HiTRes[:, :, tt, ::-1]
fp_high_freq = fp_time[:, :, 1:]
fp_residual = fp_time[:, :, 0:1] # First element (reversed) contains residual footprints
# Extract flux data from dataset
flux_high_freq = flux_ds_high_freq.flux
flux_low_freq = flux_ds_low_freq.flux
# Define high and low frequency fluxes based on inputs
# Allow for variable frequency within 24 hours
flux_low_freq = flux_low_freq[:, :, tt_low : tt_low + 1]
if flux_res_H <= 24:
# Define indices to correctly select matching date range from flux data
# This will depend on the various frequencies of the inputs
# At present, highest_res_H matches the flux frequency
tt_start = tt * int(time_res_H / highest_res_H) + 1
tt_end = tt_start + int(max_h_back / highest_res_H)
selection = int(time_hf_res_H / highest_res_H)
# Extract matching time range from whole flux array
flux_high_freq = flux_high_freq[..., tt_start:tt_end]
if selection > 1:
# If flux frequency does not match to the high frequency (hf, H_back)
# dimension, select entries which do. Reversed to make sure
# entries matching to the correct times are selected
flux_high_freq = flux_high_freq[..., ::-selection]
flux_high_freq = flux_high_freq[..., ::-1]
else:
flux_high_freq = flux_high_freq[:, :, tt_low : tt_low + 1]
# convert to array to use in numba loop
flux_high_freq = da.array(flux_high_freq)
flux_low_freq = da.array(flux_low_freq)
# Multiply the HiTRes footprints with the HiTRes emissions to give mf
# Multiply residual footprints by low frequency emissions data to give residual mf
# flux units : mol/m2/s; fp units : mol/mol/mol/m2/s
# --> mol/mol/mol/m2/s * mol/m2/s === mol / mol
fpXflux_time = flux_high_freq * fp_high_freq
fpXflux_residual = flux_low_freq * fp_residual
# append the residual emissions
fpXflux_time = np.dstack((fpXflux_time, fpXflux_residual))
if output_fpXflux:
# Sum over time (H back) to give the total mf at this timestep
fpXflux[:, :, tt] = fpXflux_time.sum(axis=2)
if output_TS:
# work out timeseries by summing over lat, lon (24 hrs)
timeseries[tt] = fpXflux_time.sum()
# TODO: Add details about units to output
if output_fpXflux:
fpXflux = DataArray(
fpXflux,
dims=("lat", "lon", "time"),
coords={"lat": lat, "lon": lon, "time": time_array},
)
if output_TS:
timeseries = DataArray(timeseries, dims=("time"), coords={"time": time_array})
if output_fpXflux and output_TS:
timeseries.compute()
fpXflux.compute()
return timeseries, fpXflux
elif output_fpXflux:
fpXflux.compute()
return fpXflux
elif output_TS:
timeseries.compute()
return timeseries
return None
[docs]
def calc_modelled_baseline(
self,
resample_to: str = "coarsest",
platform: Optional[str] = None,
output_units: float = 1e-9,
cache: bool = True,
recalculate: bool = False,
) -> DataArray:
"""
Calculate the modelled baseline points based on site footprint and boundary conditions.
Boundary conditions are multipled by any loss (exp(-t/lifetime)) for the species.
The time points returned are dependent on the resample_to option chosen.
If obs data is also linked to the ModelScenario instance, this will be used
to derive the time points where appropriate.
Args:
resample_to: Resample option to use for averaging:
- either one of ["coarsest", "obs", "footprint"] to match to the datasets
- or using a valid pandas resample period e.g. "2H".
Default = "coarsest".
platform: Observation platform used to decide whether to resample e.g. "site", "satellite".
cache: Cache this data after calculation. Default = True.
recalculate: Make sure to recalculate this data rather than return from cache. Default = False.
Returns:
xarray.DataArray: Modelled baselined values along the time axis
If cache is True:
This data will also be cached as the ModelScenario.modelled_baseline attribute.
The associated scenario data will be cached as the ModelScenario.scenario attribute.
"""
from openghg.util import check_lifetime_monthly, species_lifetime, time_offset
self._check_data_is_present(need=["footprint", "bc"])
bc = cast(BoundaryConditionsData, self.bc)
param_calculate = self._param_setup(
param="modelled_baseline", resample_to=resample_to, platform=platform, recalculate=recalculate
)
if not param_calculate:
modelled_baseline = cast(DataArray, self.modelled_baseline)
return modelled_baseline
scenario = cast(Dataset, self.scenario)
bc_data = bc.data
bc_data = bc_data.reindex_like(scenario, "ffill")
lifetime_value = species_lifetime(self.species)
check_monthly = check_lifetime_monthly(lifetime_value)
if check_monthly:
lifetime_monthly = cast(Optional[List[str]], lifetime_value)
lifetime: Optional[str] = None
else:
lifetime_monthly = None
lifetime = cast(Optional[str], lifetime_value)
if lifetime is not None:
short_lifetime = True
lt_time_delta = time_offset(period=lifetime)
lifetime_hrs: Union[float, np.ndarray] = lt_time_delta.total_seconds() / 3600.0
elif lifetime_monthly:
short_lifetime = True
lifetime_monthly_hrs = []
for lt in lifetime_monthly:
lt_time_delta = time_offset(period=lt)
lt_hrs = lt_time_delta.total_seconds() / 3600.0
lifetime_monthly_hrs.append(lt_hrs)
# calculate the lifetime_hrs associated with each time point in scenario data
# this is because lifetime can be a list of monthly values
time_month = scenario["time"].dt.month
lifetime_hrs = np.array([lifetime_monthly_hrs[item - 1] for item in time_month.values])
else:
short_lifetime = False
# Include loss condition if lifetime of species is specified
if short_lifetime:
expected_vars = (
"mean_age_particles_n",
"mean_age_particles_e",
"mean_age_particles_s",
"mean_age_particles_w",
)
for var in expected_vars:
if var not in scenario.data_vars:
raise ValueError(
f"Unable to calculate baseline for short-lived species {self.species} without species specific footprint."
)
# Ignoring type below - - problem with xarray patching np.exp to return DataArray rather than ndarray
loss_n: Union[DataArray, float] = np.exp(-1 * scenario["mean_age_particles_n"] / lifetime_hrs).rename("loss_n") # type: ignore
loss_e: Union[DataArray, float] = np.exp(-1 * scenario["mean_age_particles_e"] / lifetime_hrs).rename("loss_e") # type: ignore
loss_s: Union[DataArray, float] = np.exp(-1 * scenario["mean_age_particles_s"] / lifetime_hrs).rename("loss_s") # type: ignore
loss_w: Union[DataArray, float] = np.exp(-1 * scenario["mean_age_particles_w"] / lifetime_hrs).rename("loss_w") # type: ignore
else:
loss_n = 1.0
loss_e = 1.0
loss_s = 1.0
loss_w = 1.0
# Check and extract units as float, if present.
units_default = 1.0
units_n = check_units(bc_data["vmr_n"], default=units_default)
units_e = check_units(bc_data["vmr_e"], default=units_default)
units_s = check_units(bc_data["vmr_s"], default=units_default)
units_w = check_units(bc_data["vmr_w"], default=units_default)
modelled_baseline = (
(scenario["particle_locations_n"] * bc_data["vmr_n"] * loss_n * units_n / output_units).sum(
["height", "lon"]
)
+ (scenario["particle_locations_e"] * bc_data["vmr_e"] * loss_e * units_e / output_units).sum(
["height", "lat"]
)
+ (scenario["particle_locations_s"] * bc_data["vmr_s"] * loss_s * units_s / output_units).sum(
["height", "lon"]
)
+ (scenario["particle_locations_w"] * bc_data["vmr_w"] * loss_w * units_w / output_units).sum(
["height", "lat"]
)
)
modelled_baseline.attrs["resample_to"] = resample_to
modelled_baseline.attrs["units"] = output_units
modelled_baseline = modelled_baseline.rename("bc_mod")
# Cache output from calculations
if cache:
logger.info("Caching calculated data")
self.modelled_baseline = modelled_baseline
# self.scenario[name] = modelled_obs
else:
self.modelled_baseline = None # Make sure this is reset and not cached
self.scenario = None # Reset this to None after calculation completed
return modelled_baseline
# def _calc_modelled_baseline_long_lived():
# pass
# def _calc_modelled_baseline_short_lived():
# pass
[docs]
def plot_timeseries(self) -> Any:
"""
Plot the observation timeseries data.
Returns:
Plotly Figure
Interactive plotly graph created with observations
"""
self._check_data_is_present(need="obs")
obs = cast(ObsData, self.obs)
fig = obs.plot_timeseries() # Calling method on ObsData class
return fig
[docs]
def plot_comparison(
self,
baseline: Optional[str] = "boundary_conditions",
sources: Optional[Union[str, List]] = None,
resample_to: str = "coarsest",
platform: Optional[str] = None,
cache: bool = True,
recalculate: bool = False,
) -> Any:
"""
Plot comparison between observation and modelled timeseries data.
Args:
baseline: Add baseline to data. One of:
- "boundary_conditions" - Uses added boundary conditions to calculate modelled baseline
- "percentile" - Calculates the 1% value across the whole time period
- None - don't add a baseline and only plot the modelled observations
sources: Sources to use for flux. All will be used and stacked if not specified.
resample_to: Resample option to use for averaging:
- either one of ["coarsest", "obs", "footprint"] to match to the datasets
- or using a valid pandas resample period e.g. "2H".
Default = "coarsest".
platform: Observation platform used to decide whether to resample e.g. "site", "satellite".
cache: Cache this data after calculation. Default = True.
recalculate: Make sure to recalculate this data rather than return from cache. Default = False.
Returns:
Plotly Figure
Interactive plotly graph created with observation and modelled observation data.
"""
# Only import plotly when we need to - not needed if not plotting.
import plotly.graph_objects as go
self._check_data_is_present(need=["obs", "footprint", "flux"])
obs = cast(ObsData, self.obs)
fig = obs.plot_timeseries()
modelled_obs = self.calc_modelled_obs(
sources=sources, resample_to=resample_to, platform=platform, cache=cache, recalculate=recalculate
)
x_data = modelled_obs["time"]
y_data = modelled_obs.data
species = self.species
if sources is None:
sources = self.flux_sources
elif isinstance(sources, str):
sources = [sources]
if sources is not None:
source_str = ", ".join(sources)
label = f"Modelled {species.upper()}: {source_str}"
else:
label = f"Modelled {species.upper()}"
# TODO: Check modelled_obs units and ensure these match to modelled_baseline
# - currently modelled_baseline outputs in 1e-9 (ppb) by default.
if baseline == "boundary_conditions":
if self.bc is not None:
modelled_baseline = self.calc_modelled_baseline(
resample_to=resample_to, platform=platform, cache=cache, recalculate=recalculate
)
y_baseline = modelled_baseline.data
y_data = y_data + y_baseline
else:
logger.warning("Unable to calculate baseline from boundary conditions")
elif baseline == "percentile":
mf = obs.data["mf"]
y_baseline = mf.quantile(1.0, dim="time").values
y_data = y_data + y_baseline
fig.add_trace(go.Scatter(x=x_data, y=y_data, mode="lines", name=label))
return fig
def _indexes_match(dataset_A: Dataset, dataset_B: Dataset) -> bool:
"""
Check if two datasets need to be reindexed_like for combine_datasets
Args:
dataset_A: First dataset to check
dataset_B: Second dataset to check
Returns:
bool: True if indexes match, else False
"""
common_indices = (key for key in dataset_A.indexes.keys() if key in dataset_B.indexes.keys())
for index in common_indices:
if not len(dataset_A.indexes[index]) == len(dataset_B.indexes[index]):
return False
# Check number of values that are not close (testing for equality with floating point)
if index == "time":
# For time override the default to have ~ second precision
rtol = 1e-10
else:
rtol = 1e-5
index_diff = np.sum(
~np.isclose(
dataset_A.indexes[index].values.astype(float),
dataset_B.indexes[index].values.astype(float),
rtol=rtol,
)
)
if not index_diff == 0:
return False
return True
def combine_datasets(
dataset_A: Dataset, dataset_B: Dataset, method: methodType = "ffill", tolerance: Optional[float] = None
) -> Dataset:
"""
Merges two datasets and re-indexes to the first dataset.
If "fp" variable is found within the combined dataset,
the "time" values where the "lat", "lon" dimensions didn't match are removed.
Args:
dataset_A: First dataset to merge
dataset_B: Second dataset to merge
method: One of None, nearest, ffill, bfill.
See xarray.DataArray.reindex_like for list of options and meaning.
Defaults to ffill (forward fill)
tolerance: Maximum allowed tolerance between matches.
Returns:
xarray.Dataset: Combined dataset indexed to dataset_A
"""
if _indexes_match(dataset_A, dataset_B):
dataset_B_temp = dataset_B
else:
# load dataset_B to avoid performance issue (see xarray issue #8945)
dataset_B_temp = dataset_B.load().reindex_like(dataset_A, method=method, tolerance=tolerance)
merged_ds = dataset_A.merge(dataset_B_temp)
if "fp" in merged_ds:
if all(k in merged_ds.fp.dims for k in ("lat", "lon")):
flag = np.where(np.isfinite(merged_ds.fp.mean(dim=["lat", "lon"]).values))
merged_ds = merged_ds[dict(time=flag[0])]
return merged_ds
def match_dataset_dims(
datasets: Sequence[Dataset],
dims: Union[str, Sequence] = [],
method: methodType = "nearest",
tolerance: Union[float, Dict[str, float]] = 1e-5,
) -> List[Dataset]:
"""
Aligns datasets to the selected dimensions within a tolerance.
All datasets will be aligned to the first dataset within the list.
Args:
datasets: List of xarray Datasets. Expect datasets to contain the same dimensions.
dims: Dimensions match between datasets. Can use keyword "all" to match every dimension.
method : Method to use for indexing. Should be one of: ("nearest", "ffill", "bfill")
tolerance: Tolerance value to use when matching coordinate values.
This can be a single value for all dimensions or a dictionary of values to use.
Returns:
List (xarray.Dataset) : Datasets aligned along the matching dimensions.
TODO: Check if this supercedes or replicates _indexes_match() function too closely?
"""
# Nothing to be done if only one (or less) datasets are passed
if len(datasets) <= 1:
return list(datasets)
ds0 = datasets[0]
if isinstance(dims, str):
if dims == "all":
dims = list(ds0.dims)
else:
dims = [dims]
# Extract coordinate values for the first dataset in the list
ds0 = datasets[0]
indexers = {dim: ds0[dim] for dim in dims}
if isinstance(tolerance, float):
tolerance = {dim: tolerance for dim in dims}
# Align datasets along selected dimensions (if not already identical)
datasets_aligned = [ds0]
for ds in datasets[1:]:
for dim, compare_coord in indexers.items():
try:
coord = ds[dim]
except KeyError:
raise ValueError(f"Dataset missing dimension: {dim}")
else:
if not coord.equals(compare_coord):
ds = ds.reindex({dim: compare_coord}, method=method, tolerance=tolerance[dim])
datasets_aligned.append(ds)
return datasets_aligned
# ResType = Union[np.timedelta64, float, np.floating, np.integer]
def calc_dim_resolution(dataset: Dataset, dim: str = "time") -> Any:
"""
Calculates the average frequency along a given dimension.
Args:
dataset : Dataset. Must contain the specified dimension
dim : Dimension name
Returns:
np.timedelta64 / np.float / np.int : Resolution with input dtype
NaT : If unsuccessful and input dtype is np.timedelta64
NaN : If unsuccessful for all other dtypes.
"""
try:
resolution = dataset[dim].diff(dim=dim).mean().item()
except ValueError:
if np.issubdtype(dataset[dim].dtype, np.datetime64):
resolution = np.timedelta64("NaT")
else:
resolution = np.NaN
else:
if np.issubdtype(dataset[dim].dtype, np.datetime64):
# Extract units from original datetime string and use to recreate timedelta64
unit = dataset[dim].dtype.name.lstrip("timedelta64").lstrip("[").rstrip("]")
resolution = np.timedelta64(resolution, unit)
return resolution
def stack_datasets(datasets: Sequence[Dataset], dim: str = "time", method: methodType = "ffill") -> Dataset:
"""
Stacks multiple datasets based on the input dimension. By default this is time
and this will be aligned to the highest resolution / frequency
(smallest difference betweeen coordinate values).
At the moment, the two datasets must have identical coordinate values for all
other dimensions and variable names for these to be stacked.
Args:
datasets : Sequence of input datasets
dim : Name of dimension to stack along. Default = "time"
method: Method to use when aligning the datasets. Default = "ffill"
Returns:
Dataset : Stacked dataset
TODO: Could update this to only allow DataArrays to be included to reduce the phase
space here.
"""
if len(datasets) == 1:
dataset = datasets[0]
return dataset
data_frequency = [calc_dim_resolution(ds, dim) for ds in datasets]
index_highest_freq = min(range(len(data_frequency)), key=data_frequency.__getitem__)
data_highest_freq = datasets[index_highest_freq]
coords_to_match = data_highest_freq[dim]
for i, data in enumerate(datasets):
data_match = data.reindex({dim: coords_to_match}, method=method)
if i == 0:
data_stacked = data_match
data_stacked.attrs = {}
else:
data_stacked += data_match
return data_stacked
def check_units(data_var: DataArray, default: float) -> float:
"""
Check "units" attribute within a DataArray. Expect this to be a float
or possible to convert to a float.
If not present, use default value.
"""
attrs = data_var.attrs
if "units" in attrs:
units_from_attrs = attrs["units"]
if not isinstance(units_from_attrs, float):
try:
units = float(units_from_attrs)
except ValueError:
raise ValueError(f"Cannot extract {units_from_attrs} value as float")
else:
units = default
return units
# def footprints_data_merge(data: Union[dict, ObsData],
# domain: str,
# met_model: Optional[str] = None,
# load_flux: bool = True,
# load_bc: bool = True,
# calc_timeseries: bool = True,
# calc_bc: bool = True,
# HiTRes: bool = False,
# site_modifier: Dict[str, str] = {},
# height: Optional[str] = None,
# emissions_name: Optional[str] = None,
# fp_directory: Optional[Union[Path, str]] = None,
# flux_directory: Optional[Union[Path, str]] = None,
# bc_directory: Optional[Union[Path, str]] = None,
# resample_to_data: bool = False,
# species_footprint: Optional[str] = None,
# chunks: bool = False,
# verbose: bool = True,
# ) -> Any:
# """
# This will be a wrapper for footprints_data_merge function from acrg_name.name file written
# """
# # Write this a wrapper for footprints_data_merge function from acrg_name.name file
# print("The footprint_data_merge() wrapper function will be deprecated.")
# print("Please use the ModelScenario class to set up your data")
# print("Then call the model.footprints_data_merge() method e.g.")
# print(" model = ModelScenario(site, species, inlet, network, domain, ...)")
# print(" combined_data = model.footprints_data_merge()")
# # resample_to_data --> resample_to
# # HiTRes --> linked to species as co2
# # directories --> could link to adding new data to object store?
# # height --> inlet
# # species_footprint --> links to overall species?
# # site_modifier --> link to footprint name - different site name for footprint?
# Blueprint from Issue #42 created for this
# class LPDM():
# def __init__(species, domain, flux, bc):
# self.species=species
# self.domain=domain
# #etc...
# self.flux=get_flux(species, domain, flux)
# self.bc=get_bc(species, domain, bc)
# # dictionary of obs datasets + footprints?
# self.obs = {}
# self.footprints = {}
# def obs_get(self, site, average=None):
# self.obs["site"]=get_observations(self.species, site)
# def obs_footprints_merge(self.obs, site, etc.):
# obs_resampled, fp_resampled = footprints_data_merge(self.obs, site, kwargs_see_above)
# self.obs["site"] = obs_resampled
# self.footprints["site"] = fp_resampled
# def model_mf(self, site):
# # this is obviously not to be taken literally:
# return self.footprints["site"] * self.flux + self.bc
# if __name__ == "__main__":
# import LPDM # Need a better class name?
# mod = LPDM(species="CH4", domain="EUROPE", flux=["ch4_source1", "ch4_source2", "ch4_source3"],
# bc="CAMS")
# for site in sites:
# mod.obs_get(site, average="1H") # Need to be careful about averaging here and/or in fp/data merge step
# mod.obs_footprints_merge(site, model="NAME-UKV", average="1D") # 1D average of combined fp/data dataset
# # Optional
# mod.get_site_met(site, met="ECMWF")
# # output some obs
# ds = mod.obs_out(site)
# mod.obs_plot(site)
# # plot some model output
# mod.footprint_plot(site)
# # predicted mole fraction
# mf_mod = mod.model_mf(site, source="ch4_source1")