Source code for openghg.objectstore._local_store

from __future__ import annotations
import glob
import json
import os
import threading
from pathlib import Path
import shutil
import logging
from openghg.types import ObjectStoreError
from openghg.util import read_local_config


rlock = threading.RLock()

__all__ = [
    "delete_object",
    "delete_objects",
    "get_user_objectstore_path",
    "get_tutorial_store_path",
    "get_all_object_names",
    "get_object_names",
    "get_object",
    "get_object_data_path",
    "set_object",
    "move_object",
    "set_object_from_json",
    "set_object_from_file",
    "get_object_from_json",
    "exists",
]

logger = logging.getLogger("openghg.objectstore")
logger.setLevel(logging.DEBUG)  # Have to set level for logger as well as handler


def get_readable_buckets() -> dict[str, str]:
    """Get a dictionary of readable buckets - {store_name: store_path, ...}

    Returns:
        dict: List of readable buckets
    """
    if os.getenv("OPENGHG_TUT_STORE") is not None:
        return {"tutorial_store": str(get_tutorial_store_path())}

    config = read_local_config()
    object_stores = config["object_store"]

    return {
        store_name: data["path"] for store_name, data in object_stores.items() if "r" in data["permissions"]
    }


def get_writable_buckets() -> dict[str, str]:
    """Get a dictionary of writable buckets - {store_name: store_path, ...}

    Returns:
        dict: Dictionary of buckets this user can write to
    """
    config = read_local_config()
    object_stores = config["object_store"]

    return {
        store_name: data["path"] for store_name, data in object_stores.items() if "w" in data["permissions"]
    }


def get_writable_bucket(name: str | None = None) -> str:
    """Get the path to a writable bucket, passing in the name of a bucket if
    more than one writable bucket available.

    Args:
        name: Name of writable bucket
    Returns:
        str: Path to writable bucket
    """
    if os.getenv("OPENGHG_TUT_STORE") is not None:
        return str(get_tutorial_store_path())

    writable_buckets = get_writable_buckets()

    if not writable_buckets:
        raise ObjectStoreError("No writable object stores found. Check configuration file.")

    if name is None and len(writable_buckets) == 1:
        return next(iter(writable_buckets.values()))
    elif name is not None:
        try:
            bucket_path = writable_buckets[name]
        except KeyError:
            raise ObjectStoreError(
                f"Invalid object store name, stores you can write to are: {', '.join(writable_buckets)}"
            )
        return bucket_path
    else:
        raise ObjectStoreError(
            f"More than one writable store, stores you can write to are: {', '.join(writable_buckets)}."
        )


[docs] def get_tutorial_store_path() -> Path: """Get the path to the local tutorial store Returns: pathlib.Path: Path of tutorial store """ return get_user_objectstore_path() / "tutorial_store"
# @lru_cache def get_user_objectstore_path() -> Path: """Returns the path of the user's local object store Returns: pathlib.Path: Path of object store """ config = read_local_config() return Path(config["object_store"]["user"]["path"]) def get_objectstore_info() -> dict: """Read the local config file and return the data of each of the object stores the user has access to. Returns: dict: Dictionary of object store data """ config = read_local_config() return config["object_store"]
[docs] def get_all_object_names(bucket: str, prefix: str | None = None, without_prefix: bool = False) -> list: """Returns the names of all objects in the passed bucket Args: bucket: Bucket path prefix: Prefix for keys without_prefix: If True don't use prefix Returns: list: List of object names """ root = bucket if prefix is not None: root = f"{bucket}/{prefix}" root_len = len(bucket) + 1 if without_prefix is not None: prefix_len = len(str(prefix)) subdir_names = glob.glob(f"{root}*") object_names = [] while True: names = subdir_names subdir_names = [] for name in names: if name.endswith("._data"): # remove the ._data at the end name = name[root_len:-6] while name.endswith("/"): name = name[0:-1] if without_prefix: name = name[prefix_len:] while name.startswith("/"): name = name[1:] if len(name) > 0: object_names.append(name) elif os.path.isdir(name): subdir_names += glob.glob(f"{name}/*") if len(subdir_names) == 0: break return object_names
[docs] def delete_object(bucket: str, key: str) -> None: """Remove object at key in bucket Args: bucket: Bucket path key: Key to data in bucket Returns: None """ key = f"{bucket}/{key}._data" try: os.remove(key) except FileNotFoundError: pass
def delete_objects(bucket: str, prefix: str) -> None: """Remove objects with key prefix Args: bucket: Bucket path prefix: Key prefix Returns: None """ key = Path(bucket, prefix) with rlock: shutil.rmtree(path=key, ignore_errors=True)
[docs] def get_object_names(bucket: str, prefix: str | None = None) -> list[str]: """List all the keys in the object store Args: bucket: Bucket containing data Returns: list: List of keys in object store """ return get_all_object_names(bucket=bucket, prefix=prefix)
[docs] def get_object(bucket: str, key: str) -> bytes: """Gets the object at key in the passed bucket Args: bucket: Bucket containing data key: Key for data in bucket Returns: bytes: Binary data from the store """ with rlock: filepath = Path(f"{bucket}/{key}._data") if filepath.exists(): return filepath.read_bytes() else: raise ObjectStoreError(f"No object at key '{key}'")
def move_object(bucket: str, src_key: str, dst_key: str) -> None: """Move data from one place to another Args: bucket: Bucket path src_key: Source key dest_key: Destination key Returns: None """ src = Path(f"{bucket}/{src_key}._data") dst = Path(f"{bucket}/{dst_key}._data") with rlock: shutil.move(src=src, dst=dst) def move_objects(bucket: str, src_prefix: str, dst_prefix: str) -> None: """Move all keys with a certain prefix. Any data in the destination folder will be deleted. Args: bucket: Bucket path src_prefix: Source prefix dst_prefix: Destination prefix Returns: None """ src = Path(bucket, src_prefix) dst = Path(bucket, dst_prefix) with rlock: if dst.exists(): shutil.rmtree(dst) shutil.move(src=src, dst=dst) def get_object_data_path(bucket: str, key: str) -> Path: """Get path to object data at key in passed bucket. Args: bucket: Bucket containing data key: Key for data in bucket Returns: Path to data """ filepath = Path(f"{bucket}/{key}._data") if filepath.exists(): return filepath else: raise ObjectStoreError(f"No object at key '{key}'") def get_object_lock_path(bucket: str, key: str) -> Path: """Get path to object lock file at key in passed bucket. Args: bucket: Bucket containing data key: Key for data in bucket Returns: Path to object lock file """ lock_path = Path(f"{bucket}/{key}._data.lock") if not lock_path.parent.exists(): lock_path.parent.mkdir(parents=True) return lock_path
[docs] def set_object(bucket: str, key: str, data: bytes) -> None: """Store data in bucket at key Args: bucket: Bucket path key: Key to store data in bucket data: Data in string form Returns: None """ filename = f"{bucket}/{key}._data" with rlock: try: with open(filename, "wb") as f: f.write(data) except FileNotFoundError: dir = "/".join(filename.split("/")[0:-1]) os.makedirs(dir, exist_ok=True) with open(filename, "wb") as f: f.write(data)
[docs] def set_object_from_json(bucket: str, key: str, data: str | dict) -> None: """Set JSON data in the object store Args: bucket: Bucket for data storage key: Key for data in bucket data: JSON serialised data string Returns: None """ data_bytes = json.dumps(data).encode("utf-8") set_object(bucket=bucket, key=key, data=data_bytes)
[docs] def set_object_from_file(bucket: str, key: str, filename: str | Path) -> None: """Set the contents of file at filename to key in bucket Args: bucket: Bucket path key: Key to for data filename (str, pathlib.Path): Filename/path Returns: None """ set_object(bucket=bucket, key=key, data=open(filename, "rb").read())
[docs] def get_object_from_json(bucket: str, key: str) -> dict[str, str | dict]: """Return an object constructed from JSON stored at key. Args: bucket: Bucket containing data key: Key for data in bucket Returns: dict: Dictionary """ data: str | bytes = get_object(bucket, key).decode("utf-8") data_dict: dict = json.loads(data) return data_dict
[docs] def exists(bucket: str, key: str) -> bool: """Checks if there is an object in the object store with the given key Args: bucket: Bucket containing data key: Prefix for key in object store Returns: bool: True if key exists in store """ names = get_all_object_names(bucket=bucket, prefix=key) return len(names) > 0
[docs] def get_bucket(name: str | None = None) -> str: """Find and return the local object store path. This will return the path to the user's local object store if no name is given. Args: name: Object store name in config file Returns: str: Path to object store """ config = read_local_config() if name is not None: try: return config["object_store"][name]["path"] except KeyError: raise ObjectStoreError("Invalid object store name.") tutorial_store = os.getenv("OPENGHG_TUT_STORE") if tutorial_store is not None: return str(get_tutorial_store_path()) local_store = get_user_objectstore_path() return str(local_store)
def clear_object_store() -> None: """Delete the object store. This will only delete a local object store and not a group level or other store. You will be asked for input to confirm the path. Returns: None """ local_store = str(get_user_objectstore_path()) logger.warning(f"You have requested to delete {local_store}.") confirmed_path = input("Please enter the full path of the store: ") if confirmed_path == local_store: shutil.rmtree(local_store, ignore_errors=True) else: logger.warning("Cannot delete object store.") def get_folder_size(folder_path: str | Path) -> int: """Get the total size of a folder See https://stackoverflow.com/a/75101666/1303032 Args: folder_path: Path to folder Returns: int: Total size of folder in bytes """ total = 0 for root, _, files in os.walk(folder_path): for file in files: path = Path(root) / file total += path.stat().st_size return total
[docs] def query_store() -> dict: """Create a dictionary that can be used to visualise the object store Returns: dict: Dictionary for data to be shown in force graph """ raise NotImplementedError from openghg.store import ObsSurface from openghg.store.base import Datasource obs = ObsSurface.load() datasource_uuids = obs.datasources() datasources = (Datasource.load(uuid=uuid, shallow=True) for uuid in datasource_uuids) data = {} for d in datasources: metadata = d.metadata() result = { "site": metadata["site"], "species": metadata["species"], "instrument": metadata.get("instrument", "Unknown"), "network": metadata.get("network", "Unknown"), "inlet": metadata.get("inlet", "Unknown"), } data[d.uuid()] = result return data