Source code for openghg.dataobjects._datahandler
from collections import defaultdict
import copy
from openghg.store.base import Datasource
from openghg.store.spec import define_data_type_classes
from openghg.store import load_metastore
import logging
import tinydb
from typing import DefaultDict, Dict, List, Set, Optional, Union
logger = logging.getLogger("openghg.dataobjects")
logger.setLevel(logging.DEBUG) # Have to set level for logger as well as handler
[docs]class DataHandler:
[docs] def __init__(self, metadata: Optional[Dict[str, Dict]] = None):
self.metadata = metadata if metadata is not None else {}
self._backup: DefaultDict[str, Dict[str, Dict]] = defaultdict(dict)
self._latest = "latest"
def __bool__(self) -> bool:
return bool(self.metadata)
def _check_datatypes(self, uuid: Union[str, List]) -> str:
"""Check the UUIDs are correct and ensure they all
belong to a single data type
Args:
uuid: UUID(s) to check
Returns:
None
"""
if not isinstance(uuid, list):
uuid = [uuid]
invalid_keys = [k for k in uuid if k not in self.metadata]
if invalid_keys:
raise ValueError(f"Invalid UUIDs: {invalid_keys}")
# We should only have one data type
data_types: Set[str] = {self.metadata[i]["data_type"] for i in uuid}
if not data_types:
raise ValueError("Unable to read data_type from metadata.")
if len(data_types) > 1:
raise ValueError(
f"We can only modify Datasources of a single data type at once. We currently have {data_types}"
)
return data_types.pop()
[docs] def refresh(self) -> None:
"""Force refresh the internal metadata store with data from the object store.
Returns:
None
"""
from openghg.retrieve import search
uuids = list(self.metadata.keys())
res = search(uuid=uuids)
self.metadata = res.metadata
[docs] def restore(self, uuid: str, version: Union[str, int] = "latest") -> None:
"""Restore a backed-up version of a Datasource's metadata.
Args:
uuid: UUID of Datasource to retrieve
version: Version of metadata to restore
Returns:
None
"""
if version == "latest":
version = self._latest
version = str(version)
dtype = self._check_datatypes(uuid=uuid)
data_objs = define_data_type_classes()
metakey = data_objs[dtype]._metakey
backup = self._backup[uuid][version]
self.metadata[uuid] = backup
with load_metastore(key=metakey) as store:
store.remove(tinydb.where("uuid") == uuid)
store.insert(backup)
d = Datasource.load(uuid=uuid)
d._metadata = backup
[docs] def view_backup(self, uuid: Optional[str] = None, version: Optional[str] = None) -> Dict:
"""View backed-up metadata for all Datasources
or a single Datasource if a UUID is passed in.
Args:
uuid: UUID of Datasource
Returns:
dict: Dictionary of versioned metadata
"""
if uuid is not None:
if version is not None:
version = str(version)
return self._backup[uuid][version]
return self._backup[uuid]
else:
return self._backup
[docs] def update_metadata(
self,
uuid: Union[List, str],
to_update: Optional[Dict] = None,
to_delete: Union[str, List, None] = None,
) -> None:
"""Update the metadata associated with data. This takes UUIDs of Datasources and updates
the associated metadata. If you want to delete some metadata
Args:
uuid: UUID(s) of Datasources to be updated.
to_update: Dictionary of metadata to add/update. New key/value pairs will be added.
If the key already exists in the metadata the value will be updated.
to_delete: Key(s) to delete from the metadata
Returns:
None
"""
from tinydb.operations import delete as tinydb_delete
if to_update is None and to_delete is None:
return None
# Add in ability to delete metadata keys
if not isinstance(uuid, list):
uuid = [uuid]
dtype = self._check_datatypes(uuid=uuid)
data_objs = define_data_type_classes()
metakey = data_objs[dtype]._metakey
with load_metastore(key=metakey) as store:
for u in uuid:
d = Datasource.load(uuid=u, shallow=True)
# Save a backup of the metadata for now
found_record = store.search(tinydb.where("uuid") == u)
current_metadata = found_record[0]
version = str(len(self._backup[u].keys()) + 1)
self._latest = version
self._backup[u][version] = copy.deepcopy(dict(current_metadata))
# To update this object's records
internal_copy = copy.deepcopy(dict(current_metadata))
n_records = len(self._backup[u][version])
# Do a quick check to make sure we're not being asked to delete all the metadata
if to_delete is not None:
if "uuid" in to_delete:
raise ValueError("Cannot delete the UUID key.")
if len(to_delete) == n_records:
raise ValueError("We can't remove all the metadata associated with this Datasource.")
for k in to_delete:
d._metadata.pop(k)
internal_copy.pop(k)
try:
store.update_multiple(
[(tinydb_delete(k), tinydb.where("uuid") == u) for k in to_delete]
)
except KeyError:
raise ValueError(
"Unable to remove keys from metadaa store, please ensure they exist."
)
if to_update is not None:
if "uuid" in to_update:
raise ValueError("Cannot update the UUID.")
d._metadata.update(to_update)
internal_copy.update(to_update)
response = store.update(to_update, tinydb.where("uuid") == u)
if not response:
raise ValueError("Unable to update metadata, possible metadata sync error.")
d.save()
# Update the metadata stored internally so we're up to date
self.metadata[u] = internal_copy
logger.info(f"Modified metadata for {u}.")
[docs] def delete_datasource(self, uuid: Union[List, str]) -> None:
"""Delete a Datasource in the object store.
At the moment we only support deleting the complete Datasource.
NOTE: Make sure you really want to delete the Datasource(s)
Args:
uuid: UUID(s) of objects to delete
Returns:
None
"""
from openghg.objectstore import delete_object, get_bucket
# Add in ability to delete metadata keys
if not isinstance(uuid, list):
uuid = [uuid]
bucket = get_bucket()
dtype = self._check_datatypes(uuid=uuid)
data_objs = define_data_type_classes()
data_obj = data_objs[dtype].load()
metakey = data_obj._metakey
with load_metastore(key=metakey) as store:
for u in uuid:
# First remove the data from the metadata store
store.remove(tinydb.where("uuid") == u)
# Delete all the data associated with a Datasource
d = Datasource.load(uuid=u)
d.delete_all_data()
# Then delete the Datasource itself
key = d.key()
delete_object(bucket=bucket, key=key)
# Remove from the list of Datasources the object knows about
data_obj.remove_datasource(uuid=u)
print(f"Deleted Datasource with UUID {u}.")
data_obj.save()