Source code for

""" Handles the recombination of dataframes stored in the object store
    into the data requested by the user

from typing import Dict, List, Optional, Union

import numpy as np
import xarray as xr
from xarray.core.coordinates import DatasetCoordinates

__all__ = ["recombine_multisite", "recombine_datasets"]

[docs] def recombine_multisite(keys: Dict, sort: Optional[bool] = True) -> Dict: """Recombine the keys from multiple sites into a single Dataset for each site Args: site_keys: A dictionary of lists of keys, keyed by site sort: Sort the resulting Dataset by the time dimension Returns: dict: Dictionary of xarray.Datasets """ raise NotImplementedError
# result = {} # for key, key_list in keys.items(): # result[key] = recombine_datasets(keys=key_list, sort=sort) # return result
[docs] def recombine_datasets( bucket: str, keys: List[str], sort: Optional[bool] = True, attrs_to_check: Optional[Dict[str, str]] = None, elevate_inlet: bool = False, ) -> xr.Dataset: """Combines datasets stored separately in the object store into a single dataset Args: bucket: Object store bucket to retrieve data from keys: List of object store keys sort: Sort the resulting Dataset by the time dimension, defaults to False attrs_to_check: Attributes to check for duplicates. If duplicates are present a new data variable will be created containing the values from each dataset If a dictionary is passed, the attribute(s) will be retained and the new value assigned. If a list/string is passed, the attribute(s) will be removed. elevate_inlet: Force the elevation of the inlet attribute Returns: xarray.Dataset: Combined Dataset """ from import Datasource from xarray import concat as xr_concat if not keys: raise ValueError("No data keys passed.") data = [Datasource.load_dataset(bucket=bucket, key=k) for k in keys] # Check if we've got multiple inlet heights inlets_to_check = check_inlets(data=data, elevate_inlet=elevate_inlet) if attrs_to_check is None: attrs_to_check = {} attrs_to_check.update(inlets_to_check) # For specified attributes (e.g. "inlet") # elevate duplicates to data variables within each Dataset if attrs_to_check: # if isinstance(attrs_to_check, dict): attributes = list(attrs_to_check.keys()) replace_values = list(attrs_to_check.values()) # TODO - GJ - 2022-02-22 - I'm not sure we need to many different ways of passing in inlets to check here? # elif isinstance(attrs_to_check, str): # attributes = [attrs_to_check] # replace_values = [""] # else: # attributes = attrs_to_check # replace_values = [""] * len(attributes) data = elevate_duplicate_attrs(ds_list=data, attributes=attributes, elevate_inlet=elevate_inlet) # Concatenate datasets along time dimension if len(data) > 1: combined = xr_concat(data, dim="time") else: combined = data[0] # Replace/remove incorrect attributes # - xr.concat will only take value from first dataset if duplicated if attrs_to_check: for attr, value in zip(attributes, replace_values): if attr in combined: # Only update if attr was elevated to a data variable if value: combined.attrs[attr] = value else: combined.attrs.pop(attr) if sort: combined = combined.sortby("time") # This is modified from unique, index, count = np.unique(combined.time, return_counts=True, return_index=True) n_dupes = unique[count > 1].size # dupes = unique[count > 1] if n_dupes > 5: raise ValueError("Large number of duplicate timestamps, check data overlap.") # print(f"\n\nNumber of dupes: {n_dupes}") # Using isel is a memory hungry operation, there's no point doing it if we don't have any dupes # if n_dupes > 0: # combined = combined.isel(time=index) # Only keep the unique values if we have dupes # if index.size != combined.time.size: # combined = combined.isel(time=index) if sort: combined = combined.sortby("time") return combined
def create_array_from_value( value: str, coords: Union[DatasetCoordinates, Dict[str, DatasetCoordinates]], # type: ignore name: Union[str, None] = None, ) -> xr.DataArray: """ Create a new xarray.DataArray object containing a single value repeated for each coordinate. Args: value: Value to be repeated within the DataArray object coords: Co-ordinates to use for this new DataArray. name: Name to give the variable within the DataArray Returns: DataArray """ if isinstance(coords, xr.core.coordinates.DatasetCoordinates): names = list(coords.keys()) dims = tuple(len(coords[n]) for n in names) elif isinstance(coords, dict): dims = tuple(len(coord) for coord in list(coords.values())) else: dims = (len(coords),) variable = np.tile(value, dims) data_variable = xr.DataArray(variable, coords=coords, name=name) return data_variable def elevate_duplicate_attrs( ds_list: List[xr.Dataset], attributes: Union[str, List[str]], elevate_inlet: bool ) -> List[xr.Dataset]: """ For a list of Datasets, if the specified attributes are being repeated these will be added as new data variables to each Dataset. Args: ds_list: List of xarray Datasets attributes: Attribute values to check within the Datasets. If None is passed the original dataset list will be returned. elevate_inlet: Force the elevation of inlet Returns: list: List of updated Dataset objects """ if not isinstance(attributes, list): attributes = [attributes] for attr in attributes: # Pull the attributes out of the datasets - usually inlet values for ranked data data_attrs = [ds.attrs[attr] for ds in ds_list if attr in ds.attrs] # If we have more than one unique value we update the Dataset by adding a new variable # This is useful with ranked inlets so we can easily know which inlet a measurement was taken from if len(set(data_attrs)) > 1 or (attr == "inlet" and elevate_inlet): for i, ds in enumerate(ds_list): value = ds.attrs[attr] coords = ds.coords new_variable = create_array_from_value(value=value, coords=coords, name=attr) updated_ds = ds.assign({attr: new_variable}) ds_list[i] = updated_ds return ds_list def check_inlets(data: List[xr.Dataset], elevate_inlet: bool) -> Dict: """Check the inlets of the data to be processed Args: data: List of Datasets Returns: dict: Dictionary with single or multiple inlet replacement value """ inlets = set() for dataset in data: try: inlets.add(dataset.attrs["inlet"]) except KeyError: pass if len(inlets) > 1: attrs = {"inlet": "multiple"} else: if elevate_inlet: attrs = {"inlet": inlets.pop()} else: attrs = {} return attrs