from datetime import date
import logging
import numpy as np
import pandas as pd
from pandas import DataFrame, DateOffset, DatetimeIndex, Timedelta, Timestamp
from xarray import Dataset
import re
from openghg.types import TimePeriod
__all__ = [
"timestamp_tzaware",
"timestamp_now",
"timestamp_epoch",
"daterange_from_str",
"daterange_to_str",
"create_daterange_str",
"create_daterange",
"daterange_overlap",
"combine_dateranges",
"split_daterange_str",
"closest_daterange",
"valid_daterange",
"find_daterange_gaps",
"trim_daterange",
"split_encompassed_daterange",
"daterange_contains",
"sanitise_daterange",
"check_nan",
"check_date",
"first_last_dates",
"time_offset_definition",
"parse_period",
"create_frequency_str",
"time_offset",
"relative_time_offset",
"find_duplicate_timestamps",
"in_daterange",
"evaluate_sampling_period",
]
# TupleTimeType = Tuple[Union[int, float], str]
[docs]
def find_duplicate_timestamps(data: Dataset | DataFrame) -> list:
"""Check for duplicates
Args:
data: Data object to check. Should have a time attribute or index
Returns:
list: A list of duplicates
"""
from numpy import unique
try:
time_data = data.time
except AttributeError:
try:
time_data = data.index
except AttributeError:
raise ValueError("Unable to read time data")
uniq, count = unique(time_data, return_counts=True)
dupes = uniq[count > 1]
return list(dupes)
[docs]
def timestamp_tzaware(timestamp: str | Timestamp) -> Timestamp:
"""Returns the pandas Timestamp passed as a timezone (UTC) aware
Timestamp.
Args:
timestamp (pandas.Timestamp): Timezone naive Timestamp
Returns:
pandas.Timestamp: Timezone aware
"""
if not isinstance(timestamp, Timestamp):
timestamp = Timestamp(timestamp)
if timestamp.tzinfo is None:
return timestamp.tz_localize(tz="UTC")
else:
return timestamp.tz_convert(tz="UTC")
[docs]
def timestamp_now() -> Timestamp:
"""Returns a pandas timezone (UTC) aware Timestamp for the current time.
Returns:
pandas.Timestamp: Timestamp at current time
"""
from pandas import Timestamp
return timestamp_tzaware(Timestamp.now())
[docs]
def timestamp_epoch() -> Timestamp:
"""Returns the UNIX epoch time
1st of January 1970
Returns:
pandas.Timestamp: Timestamp object at epoch
"""
from pandas import Timestamp
return timestamp_tzaware(Timestamp("1970-1-1 00:00:00"))
[docs]
def daterange_overlap(daterange_a: str, daterange_b: str) -> bool:
"""Check if daterange_a is within daterange_b.
Args:
daterange_a (str): Timezone aware daterange string. Example:
2014-01-30-10:52:30+00:00_2014-01-30-13:22:30+00:00
daterange_b (str): As daterange_a
Returns:
bool: True if daterange included
"""
from pandas import Timestamp
split_a = daterange_a.split("_")
split_b = daterange_b.split("_")
start_a = Timestamp(ts_input=split_a[0], tz="UTC")
end_a = Timestamp(ts_input=split_a[1], tz="UTC")
start_b = Timestamp(ts_input=split_b[0], tz="UTC")
end_b = Timestamp(ts_input=split_b[1], tz="UTC")
# For this logic see
# https://stackoverflow.com/a/325964
return bool(start_a <= end_b and end_a >= start_b)
[docs]
def create_daterange(start: Timestamp, end: Timestamp, freq: str | None = "D") -> DatetimeIndex:
"""Create a minute aligned daterange
Args:
start: Start date
end: End date
Returns:
pandas.DatetimeIndex
"""
from pandas import date_range
if start > end:
raise ValueError("Start date is after end date")
start = timestamp_tzaware(start)
end = timestamp_tzaware(end)
return date_range(start=start, end=end, freq=freq)
[docs]
def create_daterange_str(start: str | Timestamp, end: str | Timestamp) -> str:
"""Convert the passed datetimes into a daterange string
for use in searches and Datasource interactions
Args:
start_date: Start date
end_date: End date
Returns:
str: Daterange string
"""
start = timestamp_tzaware(start)
end = timestamp_tzaware(end)
if start > end:
raise ValueError(f"Invalid daterange, start ({start}) > end ({end})")
start = str(start).replace(" ", "-")
end = str(end).replace(" ", "-")
return "_".join((start, end))
[docs]
def daterange_from_str(daterange_str: str, freq: str | None = "D") -> DatetimeIndex:
"""Get a Pandas DatetimeIndex from a string. The created
DatetimeIndex has minute frequency.
Args:
daterange_str (str): Daterange string
of the form 2019-01-01T00:00:00_2019-12-31T00:00:00
Returns:
pandas.DatetimeIndex: DatetimeIndex covering daterange
"""
from pandas import date_range
split = daterange_str.split("_")
# Align the seconds
start = timestamp_tzaware(split[0])
end = timestamp_tzaware(split[1])
return date_range(start=start, end=end, freq=freq)
[docs]
def daterange_to_str(daterange: DatetimeIndex) -> str:
"""Takes a pandas DatetimeIndex created by pandas date_range converts it to a
string of the form 2019-01-01-00:00:00_2019-03-16-00:00:00
Args:
daterange (pandas.DatetimeIndex)
Returns:
str: Daterange in string format
"""
start = str(daterange[0]).replace(" ", "-")
end = str(daterange[-1]).replace(" ", "-")
return "_".join([start, end])
[docs]
def combine_dateranges(dateranges: list[str]) -> list[str]:
"""Combine dateranges
Args:
dateranges: Daterange strings
Returns:
list: List of combined dateranges
Modified from
https://codereview.stackexchange.com/a/69249
"""
if len(dateranges) == 1:
return dateranges
def sort_key(tup: tuple) -> Timestamp:
return tup[0]
intervals = [split_daterange_str(x) for x in dateranges]
sorted_by_lower_bound = sorted(intervals, key=sort_key)
combined: list[Timestamp] = []
for higher in sorted_by_lower_bound:
if not combined:
combined.append(higher)
else:
lower = combined[-1]
# Test for intersection between lower and higher:
# We know via sorting that lower[0] <= higher[0]
if higher[0] <= lower[1]:
upper_bound = max(lower[1], higher[1])
# Replace by combined interval
combined[-1] = (lower[0], upper_bound)
else:
combined.append(higher)
combined_strings = [create_daterange_str(start=a, end=b) for a, b in combined]
return combined_strings
[docs]
def split_daterange_str(
daterange_str: str, date_only: bool = False
) -> tuple[Timestamp | date, Timestamp | date]:
"""Split a daterange string to the component start and end
Timestamps
Args:
daterange_str: Daterange string of the form
date_only: Return only the date portion of the Timestamp, removing
the hours / seconds component
2019-01-01T00:00:00_2019-12-31T00:00:00
Returns:
tuple (Timestamp / datetime.date, Timestamp / datetime.date): Tuple of start, end timestamps / dates
"""
split = daterange_str.split("_")
start = timestamp_tzaware(split[0])
end = timestamp_tzaware(split[1])
if date_only:
start = start.date()
end = end.date()
return start, end
[docs]
def valid_daterange(daterange: str) -> bool:
"""Check if the passed daterange is valid
Args:
daterange: Daterange string
Returns:
bool: True if valid
"""
from openghg.util import split_daterange_str
start, end = split_daterange_str(daterange)
if start >= end:
return False
return True
[docs]
def closest_daterange(to_compare: str, dateranges: str | list[str]) -> str:
"""Finds the closest daterange in a list of dateranges
Args:
to_compare: Daterange (as a string) to compare
dateranges: List of dateranges
Returns:
str: Daterange from dateranges that's the closest in time to to_compare
"""
from openghg.util import split_daterange_str
from pandas import Timedelta
min_start = Timedelta("3650days")
min_end = Timedelta("3650days")
if not isinstance(dateranges, list):
dateranges = [dateranges]
dateranges = sorted(dateranges)
start_comp, end_comp = split_daterange_str(daterange_str=to_compare)
# We want to iterate over the dateranges and first check if they overlap
# if they do, return that daterange
# otherwise check how far apart the
for daterange in dateranges:
# If they're close to overlap the start and end will be close
start, end = split_daterange_str(daterange_str=daterange)
# Check for an overlap
if start <= end_comp and end >= start_comp:
raise ValueError("Overlapping daterange.")
# Find the min between all the starts and all the ends
diff_start_end = abs(start_comp - end)
if diff_start_end < min_start:
min_start = diff_start_end
closest_daterange_start = daterange
diff_end_start = abs(end_comp - start)
if diff_end_start < min_end:
min_end = diff_end_start
closest_daterange_end = daterange
if min_start < min_end:
return closest_daterange_start
else:
return closest_daterange_end
[docs]
def find_daterange_gaps(start_search: Timestamp, end_search: Timestamp, dateranges: list[str]) -> list[str]:
"""Given a start and end date and a list of dateranges find the gaps.
For example given a list of dateranges
example = ['2014-09-02_2014-11-01', '2016-09-02_2018-11-01']
start = timestamp_tzaware("2012-01-01")
end = timestamp_tzaware("2019-09-01")
gaps = find_daterange_gaps(start, end, example)
gaps == ['2012-01-01-00:00:00+00:00_2014-09-01-00:00:00+00:00',
'2014-11-02-00:00:00+00:00_2016-09-01-00:00:00+00:00',
'2018-11-02-00:00:00+00:00_2019-09-01-00:00:00+00:00']
Args:
start_search: Start timestamp
end_search: End timestamp
dateranges: List of daterange strings
Returns:
list: List of dateranges
"""
from openghg.util import pairwise
from pandas import Timedelta
if not dateranges:
return []
sorted_dateranges = sorted(dateranges)
# The difference between the start and end of the successive dateranges
range_gap = Timedelta("1h")
min_gap = Timedelta("30m")
# First find the gap between the start and the end
start_first, _ = split_daterange_str(sorted_dateranges[0])
gaps = []
if start_search < start_first:
gap_start = start_search
gap_end = start_first - range_gap
if gap_end - gap_start > min_gap:
gap = create_daterange_str(start=gap_start, end=gap_end)
gaps.append(gap)
# Then find the gap between the end
_, end_last = split_daterange_str(sorted_dateranges[-1])
if end_search > end_last:
gap_end = end_search
gap_start = end_last + range_gap
if gap_end - gap_start > min_gap:
gap = create_daterange_str(start=gap_start, end=gap_end)
gaps.append(gap)
for a, b in pairwise(sorted_dateranges):
start_a, end_a = split_daterange_str(a)
start_b, end_b = split_daterange_str(b)
# Ignore any that are outside our search window
if end_a < start_search or start_a > end_search:
continue
diff = start_b - end_a
if diff > min_gap:
gap_start = end_a + range_gap
gap_end = start_b - range_gap
gap = create_daterange_str(start=gap_start, end=gap_end)
gaps.append(gap)
else:
pass
gaps.sort()
return gaps
[docs]
def daterange_contains(container: str, contained: str) -> bool:
"""Check if the daterange container contains the daterange contained
Args:
container: Daterange
contained: Daterange
Returns:
bool
"""
start_a, end_a = split_daterange_str(container)
start_b, end_b = split_daterange_str(contained)
return bool(start_a <= start_b and end_b <= end_a)
[docs]
def trim_daterange(to_trim: str, overlapping: str) -> str:
"""Removes overlapping dates from to_trim
Args:
to_trim: Daterange to trim down. Dates that overlap
with overlap_daterange will be removed from to_trim
overlap_daterange: Daterange containing dates we want to trim
from to_trim
Returns:
str: Trimmed daterange
"""
from pandas import Timedelta
if not daterange_overlap(daterange_a=to_trim, daterange_b=overlapping):
raise ValueError(f"Dateranges {to_trim} and {overlapping} do not overlap")
# We need to work out which way round they overlap
start_trim, end_trim = split_daterange_str(to_trim)
start_overlap, end_overlap = split_daterange_str(overlapping)
delta_gap = Timedelta("1s")
# Work out if to_trim is before or after the overlap_daterange
if start_trim < start_overlap and end_overlap > end_trim:
new_end_trim = start_overlap - delta_gap
return create_daterange_str(start=start_trim, end=new_end_trim)
else:
new_start_trim = end_overlap + delta_gap
return create_daterange_str(start=new_start_trim, end=end_trim)
[docs]
def split_encompassed_daterange(container: str, contained: str) -> dict:
"""Checks if one of the passed dateranges contains the other, if so, then
split the larger daterange into three sections.
<---a--->
<---------b----------->
Here b is split into three and we end up with:
<-dr1-><---a---><-dr2->
Args:
daterange_a: Daterange
daterange_b: Daterange
Returns:
dict: Dictionary of results
"""
container_start, container_end = split_daterange_str(daterange_str=container)
contained_start, contained_end = split_daterange_str(daterange_str=contained)
# First check one contains the other
if not (container_start <= contained_start and contained_end <= container_end):
raise ValueError(f"Range {container} does not contain {contained}")
# Gap to add between dateranegs so they don't overlap
delta_gap = Timedelta("1s")
# If the difference is less than this we'll assume they're the same timestamp
tolerance = Timedelta("2h")
results = {}
# If one of them starts at the same point we just want to split the range in two
if abs(contained_start - container_start) < tolerance:
new_contained = create_daterange_str(start=contained_start, end=contained_end)
dr1_start = contained_end + delta_gap
dr1 = create_daterange_str(start=dr1_start, end=container_end)
results["container_start"] = dr1
results["contained"] = new_contained
return results
if abs(contained_end - container_end) < tolerance:
new_contained = create_daterange_str(start=contained_start, end=contained_end)
dr1_end = contained_start - delta_gap
dr1 = create_daterange_str(start=container_start, end=dr1_end)
results["container_start"] = dr1
results["contained"] = new_contained
return results
dr1_start = container_start
dr1_end = contained_start - delta_gap
dr1 = create_daterange_str(start=dr1_start, end=dr1_end)
dr3_start = contained_end + delta_gap
dr3_end = container_end
dr3 = create_daterange_str(start=dr3_start, end=dr3_end)
# Trim a gap off the end of contained
new_contained_end = contained_end - delta_gap
new_contained = create_daterange_str(start=contained_start, end=new_contained_end)
results["container_start"] = dr1
results["contained"] = new_contained
results["container_end"] = dr3
return results
[docs]
def sanitise_daterange(daterange: str) -> str:
"""Make sure the daterange is correct and return
tzaware daterange.
Args:
daterange: Daterange str
Returns:
str: Timezone aware daterange str
"""
start, end = split_daterange_str(daterange)
if start >= end:
raise ValueError("Invalid daterange, start after end date")
return create_daterange_str(start=start, end=end)
[docs]
def check_date(date: str) -> str:
"""Check if a date string can be converted to a pd.Timestamp
and returns NA if not.
Returns a string that can be JSON serialised.
Args:
date: String to test
Returns:
str: Returns NA if not a date, otherwise date string
"""
from pandas import Timestamp, isnull
try:
d = Timestamp(date)
if isnull(d):
return "NA"
return date
except ValueError:
return "NA"
[docs]
def check_nan(data: int | float) -> str | float | int:
"""Check if a number is Nan.
Returns a string that can be JSON serialised.
Args:
data: Number
Returns:
str, float, int: Returns NA if not a number else number
"""
from math import isnan
if isnan(data):
return "NA"
else:
return round(data, 3)
[docs]
def first_last_dates(keys: list) -> tuple[Timestamp, Timestamp]:
"""Find the first and last timestamp from a list of keys
Args:
keys: List of keys
Returns:
tuple: First and last timestamp
"""
def sorting_key(s: str) -> str:
return s.split("/")[-1]
sorted_keys = sorted(keys, key=sorting_key)
first_daterange = sorted_keys[0].split("/")[-1]
first_date = first_daterange.split("_")[0]
last_daterange = sorted_keys[-1].split("/")[-1]
last_date = last_daterange.split("_")[-1]
first = timestamp_tzaware(first_date)
last = timestamp_tzaware(last_date)
return first, last
[docs]
def time_offset_definition() -> dict[str, list]:
"""
Returns synonym definition for time offset inputs.
Accepted inputs are as follows:
- "months": "monthly", "months", "month", "MS"
- "years": "yearly", "years", "annual", "year", "AS", "YS"
- "weeks": "weekly", "weeks", "week", "W"
- "days": "daily", "days", "day", "D"
- "hours": "hourly", "hours", "hour", "hr", "h", "H"
- "minutes": "minutely", "minutes", "minute", "min", "m", "T"
- "seconds": "secondly", "seconds", "second", "sec", "s", "S"
This is to ensure the correct keyword for using the DataOffset and TimeDelta
functions can be supplied (want this to be "years", "months" etc.)
Returns:
dict: containing list of values of synonym values
"""
offset_naming = {
"months": ["monthly", "months", "month", "MS"],
"years": ["yearly", "years", "annual", "year", "AS", "YS", "YS-JAN"],
"weeks": ["weekly", "weeks", "week", "W"],
"days": ["daily", "days", "day", "D"],
"hours": ["hourly", "hours", "hour", "hr", "h", "H"],
"minutes": ["minutely", "minutes", "minute", "min", "m", "T"],
"seconds": ["secondly", "seconds", "second", "sec", "s", "S"],
}
return offset_naming
[docs]
def parse_period(period: str | tuple) -> TimePeriod:
"""
Parses period input and converts to a value, unit pair.
Check time_offset_definition() for accepted input units.
Args:
period: Period of measurements.
Should be one of:
- "yearly", "monthly"
- suitable pandas Offset Alias
- tuple of (value, unit) as would be passed to pandas.Timedelta function
Returns:
TimePeriod: class containing value and associated time period (subclass of NamedTuple)
Examples:
>>> parse_period("12H")
TimePeriod(12, "hours")
>>> parse_period("yearly")
TimePeriod(1, "years")
>>> parse_period("monthly")
TimePeriod(1, "months")
>>> parse_period((1, "minute"))
TimePeriod(1, "minutes")
"""
import re
if isinstance(period, tuple):
if len(period) != 2:
raise ValueError(
"Input for period not recognised: {period}. For tuple input requires (value, unit)."
)
else:
value_in = period[0]
if isinstance(value_in, str):
try:
value: int | float = int(value_in)
except ValueError:
value = float(value_in)
else:
value = int(value_in)
unit = str(period[1])
else:
match = re.match(r"\d+[.]?\d*", period)
if match is not None:
try:
value_str = match.group()
value = int(value_str)
except ValueError:
value = float(value_str)
unit = period.lstrip(value_str).strip() # Strip found value and any whitespace.
else:
value = 1
unit = period
if period == "varies":
unit = "s"
logging.warning("For time period 'varies' value is set `1` and unit is set to `seconds`")
offset_naming = time_offset_definition()
for key, values in offset_naming.items():
if unit in values:
unit = key
break
return TimePeriod(value, unit)
[docs]
def create_frequency_str(
value: int | float | None = None,
unit: str | None = None,
period: str | tuple | None = None,
include_units: bool = True,
) -> str:
"""
Create a suitable frequency string based either a value and unit pair
or a period value. The unit will be made singular if the value is 1.
Check time_offset_definition() for accepted input units.
Args:
value, unit: Value and unit pair to use
period: Suitable input for period (see parse_period() for more details)
Returns:
str : Formatted string
Examples:
>>> create_frequency_str(unit=1, value="hour")
"1 hour"
>>> create_frequency(period="3MS")
"3 months"
>>> create_frequency(period="yearly")
"1 year"
"""
if period is not None:
value, unit = parse_period(period)
if value is None or unit is None:
raise ValueError(f"Unable to derive time value and unit from period: {period}")
elif value is None or unit is None:
raise ValueError("If period is not included, both value and unit must be specified.")
if value == 1:
frequency_str = f"{value} {unit.rstrip('s')}"
else:
frequency_str = f"{value} {unit}"
return frequency_str
[docs]
def time_offset(
value: int | float | None = None,
unit: str | None = None,
period: str | tuple | None = None,
) -> Timedelta:
"""
Create time offset based on inputs. This will return a Timedelta object
and cannot create relative offsets (this includes "weeks", "months", "years").
Args:
value, unit: Value and unit pair to use
period: Suitable input for period (see parse_period() for more details)
Returns:
Timedelta : Time offset object
"""
if period is not None:
value, unit = parse_period(period)
elif value is None or unit is None:
raise ValueError("If period is not included, both value and unit must be specified.")
if unit in ("weeks", "months", "years"):
raise ValueError(
"Unable to calculate time offset with unit of {unit}. Try relative_time_offset() function instead"
)
time_delta = Timedelta(value, unit)
return time_delta
[docs]
def relative_time_offset(
value: int | float | None = None,
unit: str | None = None,
period: str | tuple | None = None,
) -> DateOffset | Timedelta:
"""
Create relative time offset based on inputs. This is based on the pandas
DateOffset and Timedelta functions.
Check time_offset_definition() for accepted input units.
If the input is "years" or "months" a relative offset (DateOffset) will
be created since these are variable units. For example:
- "2013-01-01" + 1 year relative offset = "2014-01-01"
- "2012-05-01" + 2 months relative offset = "2012-07-01"
Otherwise the Timedelta function will be used.
Args:
value, unit: Value and unit pair to use
period: Suitable input for period (see parse_period() for more details)
Returns:
DateOffset/Timedelta : Time offset object, appropriate for the period type
"""
if period is not None:
value, unit = parse_period(period)
elif value is None or unit is None:
raise ValueError("If period is not included, both value and unit must be specified.")
relative_units = ("weeks", "months", "years")
if unit in relative_units:
time_delta = DateOffset(**{unit: value})
else:
time_delta = time_offset(value, unit)
return time_delta
def infer_frequency(timestamps: DatetimeIndex) -> str | None:
"""
For a series of timestamps, see if we can infer a consistent frequency. Must contain >= 2 time points.
For timestamps containing more than 2 values, this uses the pandas.infer_freq function.
For timestamps containing 2 values, the pandas.infer_freq function cannot be used and so this
is inferred manually as follows:
- If < 1 hour - returned as "{INT}s"
- If > 1 hour but < 1 day - returned as "{INT}h"
- If > 1 day but <~ 1 month - returned as "{1DP}D"
- If ~= 1 month in days (i.e. 28, 30 or 31 days) - returned as "MS"
- If ~= 1 year in days (i.e. 365, 366) - returned as "YS"
- Otherwise returns as "{1DP}D"
Args:
timestamps: DatetimeIndex of timestamp values
Returns:
str: A pandas frequency string for the inferred frequency
None: if no consistent frequency can be inferred a None will be returns
Raises:
ValueError: Less than 2 time points are present
"""
timestamps = timestamps.sort_values()
if len(timestamps) < 2:
raise ValueError("Unable to infer frequency from <2 data points")
elif len(timestamps) == 2:
time_point_difference = timestamps[1] - timestamps[0]
total_seconds = time_point_difference.total_seconds()
hour = 3600.0
day = hour * 24.0
month_options = [28, 30, 31]
min_month = min(month_options) * day
year_options = [365, 366]
min_year = min(year_options) * day
if total_seconds < hour:
inferred_period: str | None = f"{int(total_seconds)}s"
elif total_seconds >= hour and total_seconds < day:
inferred_period = f"{int(total_seconds / hour)}h"
elif total_seconds >= day and total_seconds < min_month:
inferred_period = f"{(total_seconds / day):.1f}D"
elif total_seconds >= min_month and total_seconds <= min_year:
for month in month_options:
month_seconds = month * day
rounded_seconds = total_seconds / month_seconds
if np.isclose(rounded_seconds, 1):
inferred_period = "MS"
break
else:
inferred_period = f"{(total_seconds / day):.1f}D"
elif total_seconds >= min_year:
for year in year_options:
year_seconds = year * day
rounded_seconds = total_seconds / year_seconds
if np.isclose(rounded_seconds, 1):
inferred_period = "YS"
break
else:
inferred_period = f"{(total_seconds / day):.1f}D"
else:
inferred_period = pd.infer_freq(timestamps)
return inferred_period
[docs]
def in_daterange(
start_a: str | Timestamp,
end_a: str | Timestamp,
start_b: str | Timestamp,
end_b: str | Timestamp,
) -> bool:
"""Check if two dateranges overlap.
Args:
start: Start datetime
end: End datetime
Returns:
bool: True if overlap
"""
from openghg.util import timestamp_tzaware
start_a = timestamp_tzaware(start_a)
end_a = timestamp_tzaware(end_a)
start_b = timestamp_tzaware(start_b)
end_b = timestamp_tzaware(end_b)
return bool((start_a <= end_b) and (end_a >= start_b))
def dates_overlap(
start_a: str | Timestamp,
end_a: str | Timestamp,
start_b: str | Timestamp,
end_b: str | Timestamp,
) -> bool:
"""Check if two dateranges overlap.
Args:
start: Start datetime
end: End datetime
Returns:
bool: True if overlap
"""
from openghg.util import timestamp_tzaware
start_a = timestamp_tzaware(start_a)
end_a = timestamp_tzaware(end_a)
start_b = timestamp_tzaware(start_b)
end_b = timestamp_tzaware(end_b)
return bool((start_a <= end_b) and (end_a >= start_b))
def dates_in_range(keys: list[str], start_date: Timestamp | str, end_date: Timestamp | str) -> list[str]:
"""Returns the keys in the key list that are between the given dates
Args:
keys: List of daterange keys
start_date: Start date
end_date: End date
Returns:
list: List of keys
"""
start_date = timestamp_tzaware(start_date)
end_date = timestamp_tzaware(end_date)
in_date = []
for key in keys:
start_key, end_key = split_daterange_str(daterange_str=key)
if (start_key <= end_date) and (end_key >= start_date):
in_date.append(key)
return in_date
def evaluate_sampling_period(sampling_period: Timedelta | str | None) -> str | None:
"""
Check the sampling period input and convert this into a string containing the
sampling period in seconds.
Args:
sampling_period: str or Timedelta value for the time to sample.
Returns:
str : Sampling period as a string containing the number of seconds.
TODO: Integrate sampling_period handling into logic for time_period (if practical)
"""
# If we have a sampling period passed we want the number of seconds
if sampling_period is not None:
# Check format of input string matches expected
sampling_period = str(sampling_period)
re_sampling_period = re.compile(r"\d+[.]?\d*\s*[a-zA-Z]+")
check_format = re_sampling_period.search(sampling_period)
# If pattern is not matched this returns a None - indicating string is in incorrect form
if check_format is None:
raise ValueError(
f"Invalid sampling period: '{sampling_period}'. Must be specified as a string with unit (e.g. 1m for 1 minute)."
)
# Check string passed can be evaluated as a Timedelta object and extract this in seconds.
try:
sampling_period_td = Timedelta(sampling_period)
except ValueError as e:
raise ValueError(
f"Could not evaluate sampling period: '{sampling_period}'. Must be specified as a string with valid unit (e.g. 1m for 1 minute)."
) from e
sampling_period = str(float(sampling_period_td.total_seconds()))
# Check if sampling period has resolved to 0 seconds.
if sampling_period == "0.0":
raise ValueError(
f"Sampling period resolves to <= 0.0 seconds. Please check input: '{sampling_period}'"
)
# TODO: May want to add check for NaT or NaN
return sampling_period