from __future__ import annotations
import glob
import json
import os
import threading
from pathlib import Path
import shutil
from typing import Dict, List, Optional, Union
import logging
from openghg.types import ObjectStoreError
from openghg.util import read_local_config
rlock = threading.RLock()
__all__ = [
"delete_object",
"delete_objects",
"get_user_objectstore_path",
"get_tutorial_store_path",
"get_all_object_names",
"get_object_names",
"get_object",
"get_object_data_path",
"set_object",
"move_object",
"set_object_from_json",
"set_object_from_file",
"get_object_from_json",
"exists",
]
logger = logging.getLogger("openghg.objectstore")
logger.setLevel(logging.DEBUG) # Have to set level for logger as well as handler
def get_readable_buckets() -> Dict[str, str]:
"""Get a dictionary of readable buckets - {store_name: store_path, ...}
Returns:
dict: List of readable buckets
"""
if os.getenv("OPENGHG_TUT_STORE") is not None:
return {"tutorial_store": str(get_tutorial_store_path())}
config = read_local_config()
object_stores = config["object_store"]
return {
store_name: data["path"] for store_name, data in object_stores.items() if "r" in data["permissions"]
}
def get_writable_buckets() -> Dict[str, str]:
"""Get a dictionary of writable buckets - {store_name: store_path, ...}
Returns:
dict: Dictionary of buckets this user can write to
"""
config = read_local_config()
object_stores = config["object_store"]
return {
store_name: data["path"] for store_name, data in object_stores.items() if "w" in data["permissions"]
}
def get_writable_bucket(name: Optional[str] = None) -> str:
"""Get the path to a writable bucket, passing in the name of a bucket if
more than one writable bucket available.
Args:
name: Name of writable bucket
Returns:
str: Path to writable bucket
"""
if os.getenv("OPENGHG_TUT_STORE") is not None:
return str(get_tutorial_store_path())
writable_buckets = get_writable_buckets()
if not writable_buckets:
raise ObjectStoreError("No writable object stores found. Check configuration file.")
if name is None and len(writable_buckets) == 1:
return next(iter(writable_buckets.values()))
elif name is not None:
try:
bucket_path = writable_buckets[name]
except KeyError:
raise ObjectStoreError(
f"Invalid object store name, stores you can write to are: {', '.join(writable_buckets)}"
)
return bucket_path
else:
raise ObjectStoreError(
f"More than one writable store, stores you can write to are: {', '.join(writable_buckets)}."
)
[docs]
def get_tutorial_store_path() -> Path:
"""Get the path to the local tutorial store
Returns:
pathlib.Path: Path of tutorial store
"""
return get_user_objectstore_path() / "tutorial_store"
# @lru_cache
def get_user_objectstore_path() -> Path:
"""Returns the path of the user's local object store
Returns:
pathlib.Path: Path of object store
"""
config = read_local_config()
return Path(config["object_store"]["user"]["path"])
def get_objectstore_info() -> Dict:
"""Read the local config file and return the data of each of the object stores the
user has access to.
Returns:
dict: Dictionary of object store data
"""
config = read_local_config()
return config["object_store"]
[docs]
def get_all_object_names(bucket: str, prefix: Optional[str] = None, without_prefix: bool = False) -> List:
"""Returns the names of all objects in the passed bucket
Args:
bucket: Bucket path
prefix: Prefix for keys
without_prefix: If True don't use prefix
Returns:
list: List of object names
"""
root = bucket
if prefix is not None:
root = f"{bucket}/{prefix}"
root_len = len(bucket) + 1
if without_prefix is not None:
prefix_len = len(str(prefix))
subdir_names = glob.glob(f"{root}*")
object_names = []
while True:
names = subdir_names
subdir_names = []
for name in names:
if name.endswith("._data"):
# remove the ._data at the end
name = name[root_len:-6]
while name.endswith("/"):
name = name[0:-1]
if without_prefix:
name = name[prefix_len:]
while name.startswith("/"):
name = name[1:]
if len(name) > 0:
object_names.append(name)
elif os.path.isdir(name):
subdir_names += glob.glob(f"{name}/*")
if len(subdir_names) == 0:
break
return object_names
[docs]
def delete_object(bucket: str, key: str) -> None:
"""Remove object at key in bucket
Args:
bucket: Bucket path
key: Key to data in bucket
Returns:
None
"""
key = f"{bucket}/{key}._data"
try:
os.remove(key)
except FileNotFoundError:
pass
def delete_objects(bucket: str, prefix: str) -> None:
"""Remove objects with key prefix
Args:
bucket: Bucket path
prefix: Key prefix
Returns:
None
"""
key = Path(bucket, prefix)
with rlock:
shutil.rmtree(path=key, ignore_errors=True)
[docs]
def get_object_names(bucket: str, prefix: Optional[str] = None) -> List[str]:
"""List all the keys in the object store
Args:
bucket: Bucket containing data
Returns:
list: List of keys in object store
"""
return get_all_object_names(bucket=bucket, prefix=prefix)
[docs]
def get_object(bucket: str, key: str) -> bytes:
"""Gets the object at key in the passed bucket
Args:
bucket: Bucket containing data
key: Key for data in bucket
Returns:
bytes: Binary data from the store
"""
with rlock:
filepath = Path(f"{bucket}/{key}._data")
if filepath.exists():
return filepath.read_bytes()
else:
raise ObjectStoreError(f"No object at key '{key}'")
def move_object(bucket: str, src_key: str, dst_key: str) -> None:
"""Move data from one place to another
Args:
bucket: Bucket path
src_key: Source key
dest_key: Destination key
Returns:
None
"""
src = Path(f"{bucket}/{src_key}._data")
dst = Path(f"{bucket}/{dst_key}._data")
with rlock:
shutil.move(src=src, dst=dst)
def move_objects(bucket: str, src_prefix: str, dst_prefix: str) -> None:
"""Move all keys with a certain prefix. Any data in the destination folder
will be deleted.
Args:
bucket: Bucket path
src_prefix: Source prefix
dst_prefix: Destination prefix
Returns:
None
"""
src = Path(bucket, src_prefix)
dst = Path(bucket, dst_prefix)
with rlock:
if dst.exists():
shutil.rmtree(dst)
shutil.move(src=src, dst=dst)
def get_object_data_path(bucket: str, key: str) -> Path:
"""Get path to object data at key in passed bucket.
Args:
bucket: Bucket containing data
key: Key for data in bucket
Returns:
Path to data
"""
filepath = Path(f"{bucket}/{key}._data")
if filepath.exists():
return filepath
else:
raise ObjectStoreError(f"No object at key '{key}'")
def get_object_lock_path(bucket: str, key: str) -> Path:
"""Get path to object lock file at key in passed bucket.
Args:
bucket: Bucket containing data
key: Key for data in bucket
Returns:
Path to object lock file
"""
lock_path = Path(f"{bucket}/{key}._data.lock")
if not lock_path.parent.exists():
lock_path.parent.mkdir(parents=True)
return lock_path
[docs]
def set_object(bucket: str, key: str, data: bytes) -> None:
"""Store data in bucket at key
Args:
bucket: Bucket path
key: Key to store data in bucket
data: Data in string form
Returns:
None
"""
filename = f"{bucket}/{key}._data"
with rlock:
try:
with open(filename, "wb") as f:
f.write(data)
except FileNotFoundError:
dir = "/".join(filename.split("/")[0:-1])
os.makedirs(dir, exist_ok=True)
with open(filename, "wb") as f:
f.write(data)
[docs]
def set_object_from_json(bucket: str, key: str, data: Union[str, Dict]) -> None:
"""Set JSON data in the object store
Args:
bucket: Bucket for data storage
key: Key for data in bucket
data: JSON serialised data string
Returns:
None
"""
data_bytes = json.dumps(data).encode("utf-8")
set_object(bucket=bucket, key=key, data=data_bytes)
[docs]
def set_object_from_file(bucket: str, key: str, filename: Union[str, Path]) -> None:
"""Set the contents of file at filename to key in bucket
Args:
bucket: Bucket path
key: Key to for data
filename (str, pathlib.Path): Filename/path
Returns:
None
"""
set_object(bucket=bucket, key=key, data=open(filename, "rb").read())
[docs]
def get_object_from_json(bucket: str, key: str) -> Dict[str, Union[str, Dict]]:
"""Return an object constructed from JSON stored at key.
Args:
bucket: Bucket containing data
key: Key for data in bucket
Returns:
dict: Dictionary
"""
data: Union[str, bytes] = get_object(bucket, key).decode("utf-8")
data_dict: Dict = json.loads(data)
return data_dict
[docs]
def exists(bucket: str, key: str) -> bool:
"""Checks if there is an object in the object store with the given key
Args:
bucket: Bucket containing data
key: Prefix for key in object store
Returns:
bool: True if key exists in store
"""
names = get_all_object_names(bucket=bucket, prefix=key)
return len(names) > 0
[docs]
def get_bucket(name: Optional[str] = None) -> str:
"""Find and return the local object store path. This will return
the path to the user's local object store if no name is given.
Args:
name: Object store name in config file
Returns:
str: Path to object store
"""
config = read_local_config()
if name is not None:
try:
return config["object_store"][name]["path"]
except KeyError:
raise ObjectStoreError("Invalid object store name.")
tutorial_store = os.getenv("OPENGHG_TUT_STORE")
if tutorial_store is not None:
return str(get_tutorial_store_path())
local_store = get_user_objectstore_path()
return str(local_store)
def clear_object_store() -> None:
"""Delete the object store. This will only delete a local object store and not
a group level or other store. You will be asked for input to confirm the path.
Returns:
None
"""
local_store = str(get_user_objectstore_path())
logger.warning(f"You have requested to delete {local_store}.")
confirmed_path = input("Please enter the full path of the store: ")
if confirmed_path == local_store:
shutil.rmtree(local_store, ignore_errors=True)
else:
logger.warning("Cannot delete object store.")
def get_folder_size(folder_path: Union[str, Path]) -> int:
"""Get the total size of a folder
See https://stackoverflow.com/a/75101666/1303032
Args:
folder_path: Path to folder
Returns:
int: Total size of folder in bytes
"""
total = 0
for root, _, files in os.walk(folder_path):
for file in files:
path = Path(root) / file
total += path.stat().st_size
return total
[docs]
def query_store() -> Dict:
"""Create a dictionary that can be used to visualise the object store
Returns:
dict: Dictionary for data to be shown in force graph
"""
raise NotImplementedError
from openghg.store import ObsSurface
from openghg.store.base import Datasource
obs = ObsSurface.load()
datasource_uuids = obs.datasources()
datasources = (Datasource.load(uuid=uuid, shallow=True) for uuid in datasource_uuids)
data = {}
for d in datasources:
metadata = d.metadata()
result = {
"site": metadata["site"],
"species": metadata["species"],
"instrument": metadata.get("instrument", "Unknown"),
"network": metadata.get("network", "Unknown"),
"inlet": metadata.get("inlet", "Unknown"),
}
data[d.uuid()] = result
return data