Source code for stilt.storage.store

"""Output store backends."""

from __future__ import annotations

import os
import posixpath
import shutil
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Protocol, runtime_checkable

import fsspec

from .files import ProjectFiles, SimulationFiles
from .layout import uri_join

if TYPE_CHECKING:
    from stilt.simulation import Simulation


def _normalize_output_dir(output_dir: str | Path) -> str:
    """Return a stable output-root string for local paths and URIs."""
    raw = str(output_dir)
    if "://" in raw:
        return raw.rstrip("/")
    return str(Path(raw).resolve())


@runtime_checkable
class Store(Protocol):
    """Output file access independent of the local compute workspace."""

    def read_bytes(self, key: str) -> bytes:
        """Return the raw bytes stored under one canonical output key."""
        ...

    def write_bytes(self, key: str, data: bytes) -> None:
        """Write raw bytes under one canonical output key."""
        ...

    def exists(self, key: str) -> bool:
        """Return whether one canonical output key currently exists."""
        ...

    def list_prefix(self, prefix: str) -> list[str]:
        """Return canonical keys that exist below one store-relative prefix."""
        ...

    def local_path(self, key: str) -> Path:
        """Return a local filesystem path for one output key."""
        ...

    def publish_simulation(self, sim: Simulation) -> None:
        """Publish the standard output files produced by one simulation."""
        ...


class LocalStore:
    """
    Output store backed by the local filesystem.

    Uses atomic tmp-then-replace for file writes and relative symlinks for
    local-only flat alias views under ``simulations/particles`` and
    ``simulations/footprints``.

    Parameters
    ----------
    output_dir
        Root directory that stores the project's published outputs.
    """

    def __init__(self, output_dir: str | Path) -> None:
        self.output_dir = Path(output_dir).resolve()

    def _path(self, key: str) -> Path:
        """Return the absolute local path for one canonical output key."""
        return self.output_dir / key.strip("/")

    def read_bytes(self, key: str) -> bytes:
        """Return the raw bytes stored under one canonical output key."""
        return self._path(key).read_bytes()

    def write_bytes(self, key: str, data: bytes) -> None:
        """Write raw bytes under one canonical output key."""
        path = self._path(key)
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_bytes(data)

    def exists(self, key: str) -> bool:
        """Return whether one canonical output key exists locally."""
        return self._path(key).exists()

    def list_prefix(self, prefix: str) -> list[str]:
        """Return canonical keys that exist below one local prefix."""
        base = self._path(prefix)
        if not base.exists():
            return []
        if base.is_file():
            return [prefix.strip("/")]
        return sorted(
            str(p.relative_to(self.output_dir)) for p in base.rglob("*") if p.is_file()
        )

    def local_path(self, key: str) -> Path:
        """Return the local filesystem path for one canonical output key."""
        return self._path(key)

    def publish_file(self, local_path: str | Path, key: str) -> None:
        """
        Atomically copy one local file into the store under *key*.

        Writes to a sibling ``.tmp`` first then ``Path.replace`` onto the
        final key — same-directory rename is atomic on POSIX, so concurrent
        readers never observe a partial file.
        """
        src = Path(local_path)
        if not src.exists():
            return
        target = self._path(key)
        target.parent.mkdir(parents=True, exist_ok=True)
        if src.resolve() == target.resolve():
            return
        tmp = target.with_suffix(target.suffix + ".tmp")
        try:
            shutil.copy2(src, tmp)
            tmp.replace(target)
        finally:
            if tmp.exists():
                tmp.unlink(missing_ok=True)

    def _publish_symlink(self, src: Path, key: str) -> None:
        """
        Atomically symlink *src* into the store under *key*.

        Writes a relative symlink to a sibling ``.tmp`` first, then
        ``Path.replace`` onto the final key, so readers never see an
        unlinked-then-recreated window during republish.
        """
        target = self._path(key)
        target.parent.mkdir(parents=True, exist_ok=True)
        if src.resolve() == target.resolve():
            return
        tmp = target.with_suffix(target.suffix + ".tmp")
        if tmp.exists() or tmp.is_symlink():
            tmp.unlink()
        try:
            tmp.symlink_to(os.path.relpath(src, start=target.parent))
            tmp.replace(target)
        finally:
            if tmp.exists() or tmp.is_symlink():
                tmp.unlink(missing_ok=True)

    def publish_simulation(self, sim: Simulation) -> None:
        """Publish the standard outputs produced by one simulation."""
        files = SimulationFiles(sim.directory, str(sim.id))
        log_path = files.log_path
        traj_path = files.trajectory_path
        error_path = files.error_trajectory_path

        def _publish(src: Path, canonical_key: str, *index_keys: str) -> None:
            """Publish one file and optional flat alias keys if it exists."""
            if not src.exists():
                return
            self.publish_file(src, canonical_key)
            canonical_local = self.local_path(canonical_key)
            for key in index_keys:
                self._publish_symlink(canonical_local, key)

        _publish(log_path, files.key(log_path))
        _publish(
            traj_path,
            files.key(traj_path),
            ProjectFiles.particle_index_key(traj_path.name),
        )
        _publish(error_path, files.key(error_path))
        for footprint in sorted(sim.directory.glob(f"{files.sim_id}*_foot.nc")):
            _publish(
                footprint,
                files.key(footprint),
                ProjectFiles.footprint_index_key(footprint.name),
            )
        for marker in sorted(sim.directory.glob(f"{files.sim_id}*_foot.empty")):
            _publish(marker, files.key(marker))


[docs] class FsspecStore: """ Output store backed by an ``fsspec`` remote filesystem. Suitable for object stores (``s3://``, ``gs://``, ``abfs://``) and pseudo-remote backends (``memory://``, ``http://``). Remote stores publish only canonical ``simulations/by-id`` outputs; flat alias views are local filesystem conveniences handled by ``LocalStore``. Parameters ---------- output_dir Remote or local URI root used for published outputs. cache_dir Optional local cache directory for ``local_path()`` downloads. """ def __init__( self, output_dir: str | Path, cache_dir: str | Path | None = None, ) -> None: self.output_dir = _normalize_output_dir(output_dir) self.fs, self.root = fsspec.core.url_to_fs(self.output_dir) self._cache_dir = Path(cache_dir) if cache_dir is not None else None def _cache_storage(self) -> Path: """Return the local cache directory used for remote ``local_path()`` calls.""" if self._cache_dir is None: self._cache_dir = Path(tempfile.mkdtemp(prefix="pystilt_cache_")) self._cache_dir.mkdir(parents=True, exist_ok=True) return self._cache_dir def _full_key(self, key: str) -> str: """Join a store-relative key onto the filesystem-specific root prefix.""" clean = key.strip("/") root = str(self.root).rstrip("/") if not root: return clean return uri_join(root, clean) def _key_uri(self, key: str) -> str: """Return a fully-qualified URI/path for use with ``fsspec.open_local()``.""" clean = key.strip("/") if not clean: return self.output_dir joined = uri_join(self.output_dir, clean) if "://" in joined: return joined return str(Path(joined).resolve()) def _relative_key(self, full_key: str) -> str: """Strip the store root from a filesystem-reported path.""" root = str(self.root).rstrip("/") if root and full_key.startswith(root + "/"): return full_key[len(root) + 1 :] return full_key
[docs] def read_bytes(self, key: str) -> bytes: """Return the raw bytes stored under one canonical output key.""" return self.fs.cat(self._full_key(key))
[docs] def write_bytes(self, key: str, data: bytes) -> None: """Write raw bytes under one canonical output key.""" full_key = self._full_key(key) parent = posixpath.dirname(full_key) if parent: self.fs.makedirs(parent, exist_ok=True) with self.fs.open(full_key, "wb") as handle: handle.write(data)
[docs] def exists(self, key: str) -> bool: """Return whether one canonical output key exists in the backend.""" return self.fs.exists(self._full_key(key))
[docs] def list_prefix(self, prefix: str) -> list[str]: """Return canonical keys that exist below one backend prefix.""" full_prefix = self._full_key(prefix) if not self.fs.exists(full_prefix): return [] return sorted(self._relative_key(path) for path in self.fs.find(full_prefix))
[docs] def local_path(self, key: str) -> Path: """Return a local path for a output key, downloading and caching as needed.""" local = fsspec.open_local( f"simplecache::{self._key_uri(key)}", simplecache={"cache_storage": str(self._cache_storage())}, ) if not isinstance(local, str): raise TypeError(f"Expected one local path for {key!r}, got {type(local)!r}") return Path(local)
[docs] def publish_file(self, local_path: str | Path, key: str) -> None: """Upload one local file into the remote store under *key*.""" src = Path(local_path) if not src.exists(): return full_key = self._full_key(key) parent = posixpath.dirname(full_key) if parent: self.fs.makedirs(parent, exist_ok=True) self.fs.put_file(str(src), full_key)
[docs] def publish_simulation(self, sim: Simulation) -> None: """Publish the standard outputs produced by one simulation.""" files = SimulationFiles(sim.directory, str(sim.id)) log_path = files.log_path traj_path = files.trajectory_path error_path = files.error_trajectory_path def _publish(src: Path, canonical_key: str) -> None: """Upload one canonical simulation output file if it exists.""" if not src.exists(): return self.publish_file(src, canonical_key) _publish(log_path, files.key(log_path)) _publish(traj_path, files.key(traj_path)) _publish(error_path, files.key(error_path)) for footprint in sorted(sim.directory.glob(f"{files.sim_id}*_foot.nc")): _publish(footprint, files.key(footprint)) for marker in sorted(sim.directory.glob(f"{files.sim_id}*_foot.empty")): _publish(marker, files.key(marker))
def make_store( output_dir: str | Path, *, cache_dir: str | Path | None = None, ) -> Store: """ Return the appropriate ``Store`` backend for *output_dir*. Local filesystem paths → ``LocalStore`` (atomic relative symlinks, no fsspec overhead). Remote URIs (``s3://``, ``gs://``, ``memory://``, …) → ``FsspecStore``. """ raw = str(output_dir) if "://" in raw: return FsspecStore(raw, cache_dir=cache_dir) return LocalStore(raw)