from collections import defaultdict
import copy
import logging
from typing import DefaultDict, Dict, List, Set, Optional, Union
from openghg.store.base import Datasource
from openghg.objectstore.metastore import open_metastore
from openghg.objectstore import get_writable_bucket, get_writable_buckets
from openghg.types import ObjectStoreError
logger = logging.getLogger("openghg.dataobjects")
logger.setLevel(logging.DEBUG) # Have to set level for logger as well as handler
[docs]
class DataManager:
[docs]
def __init__(self, metadata: Dict[str, Dict], store: str):
# We don't want the object store in this metadata as we want it to be the
# unadulterated metadata to properly reflect what's stored.
self.metadata = self._clean_metadata(metadata=metadata)
self._store = store
self._bucket = get_writable_bucket(name=store)
self._backup: DefaultDict[str, Dict[str, Dict]] = defaultdict(dict)
self._latest = "latest"
[docs]
def __str__(self) -> str:
return str(self.metadata)
def __bool__(self) -> bool:
return bool(self.metadata)
def _clean_metadata(self, metadata: Dict) -> Dict:
"""Ensures the metadata we give to the user is the metadata
stored in the metastore and the Datasource and hasn't been modified by the
search function. Currently this just removes the object_store key
Args:
metadata: Dictionary of metadata, we expect
Returns:
dict: Metadata without specific keys
"""
metadata = copy.deepcopy(metadata)
for m in metadata.values():
try:
del m["object_store"]
except KeyError:
pass
return metadata
def _check_datatypes(self, uuid: Union[str, List]) -> str:
"""Check the UUIDs are correct and ensure they all
belong to a single data type
Args:
uuid: UUID(s) to check
Returns:
None
"""
if not isinstance(uuid, list):
uuid = [uuid]
invalid_keys = [k for k in uuid if k not in self.metadata]
if invalid_keys:
raise ValueError(f"Invalid UUIDs: {invalid_keys}")
# We should only have one data type
data_types: Set[str] = {self.metadata[i]["data_type"] for i in uuid}
if not data_types:
raise ValueError("Unable to read data_type from metadata.")
if len(data_types) > 1:
raise ValueError(
f"We can only modify Datasources of a single data type at once. We currently have {data_types}"
)
return data_types.pop()
[docs]
def refresh(self) -> None:
"""Force refresh the internal metadata store with data from the object store.
Returns:
None
"""
from openghg.retrieve import search
uuids = list(self.metadata.keys())
res = search(uuid=uuids)
# We don't want the object store in this metadata as we want it to be the
# unadulterated metadata to properly reflect what's stored.
for m in res.metadata.values():
try:
del m["object_store"]
except KeyError:
pass
self.metadata = self._clean_metadata(metadata=res.metadata)
[docs]
def restore(self, uuid: str, version: Union[str, int] = "latest") -> None:
"""Restore a backed-up version of a Datasource's metadata.
Args:
uuid: UUID of Datasource to retrieve
version: Version of metadata to restore
Returns:
None
"""
if version == "latest":
version = self._latest
version = str(version)
dtype = self._check_datatypes(uuid=uuid)
with open_metastore(data_type=dtype, bucket=self._bucket) as metastore:
backup = self._backup[uuid][version]
self.metadata[uuid] = backup
metastore.delete({"uuid": uuid})
metastore.insert(backup)
d = Datasource(bucket=self._bucket, uuid=uuid)
d._metadata = backup
d.save()
[docs]
def view_backup(self, uuid: Optional[str] = None, version: Optional[str] = None) -> Dict:
"""View backed-up metadata for all Datasources
or a single Datasource if a UUID is passed in.
Args:
uuid: UUID of Datasource
Returns:
dict: Dictionary of versioned metadata
"""
if uuid is not None:
if version is not None:
version = str(version)
return self._backup[uuid][version]
return self._backup[uuid]
else:
return self._backup
[docs]
def delete_datasource(self, uuid: Union[List, str]) -> None:
"""Delete Datasource(s) in the object store.
At the moment we only support deleting the complete Datasource.
NOTE: Make sure you really want to delete the Datasource(s)
Args:
uuid: UUID(s) of objects to delete
Returns:
None
"""
from openghg.objectstore import delete_object
# Add in ability to delete metadata keys
if not isinstance(uuid, list):
uuid = [uuid]
dtype = self._check_datatypes(uuid=uuid)
with open_metastore(bucket=self._bucket, data_type=dtype) as metastore:
for uid in uuid:
# First remove the data from the metadata store
metastore.delete({"uuid": uid})
# Delete all the data associated with a Datasource and the
# data in its zarr store.
d = Datasource(bucket=self._bucket, uuid=uid)
d.delete_all_data()
# Then delete the Datasource itself
delete_object(bucket=self._bucket, key=d.key())
logger.info(f"Deleted Datasource with UUID {uid}.")
def data_manager(data_type: str, store: str, **kwargs: Dict) -> DataManager:
"""Lookup the data / metadata you'd like to modify.
Args:
data_type: Type of data, for example surface, flux, footprint
store: Name of store
kwargs: Any pair of keyword arguments for searching
Returns:
DataManager: A handler object to help modify the metadata
"""
from openghg.dataobjects import DataManager
from openghg.retrieve import search
writable_stores = get_writable_buckets()
if store not in writable_stores:
raise ObjectStoreError(f"You do not have permission to write to the {store} store.")
res = search(data_type=data_type, store=store, **kwargs)
metadata = res.metadata
return DataManager(metadata=metadata, store=store)