"""Meteorology source configuration, file lookup, and staging."""
from __future__ import annotations
import logging
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
import pandas as pd
from stilt.errors import ConfigValidationError, MeteorologyError
if TYPE_CHECKING:
from arlmet.sources import MeteorologySource as ArlmetSource
logger = logging.getLogger(__name__)
[docs]
class MetID(str):
"""Identifier for one meteorology stream, used to key receptor footprints."""
def __new__(cls, name: str):
if "_" in name:
raise ValueError(
"MetID cannot contain underscores, which are reserved for delimiting sim_id components."
)
return super().__new__(cls, name)
def _build_arlmet_source(name: str, kwargs: dict[str, Any]) -> ArlmetSource:
"""
Construct an arlmet MeteorologySource by name, with optional kwargs.
Raises ImportError (pointing to stilt[cloud]) if fsspec/s3fs are absent
when the caller actually tries to download. The import here is lazy so that
arlmet's core (subsetting) works without the cloud extra.
"""
try:
import arlmet.sources as _src
except ImportError as exc: # pragma: no cover
raise ImportError(
"arlmet is required for source-mode meteorology. "
"Install with: pip install pystilt[cloud]"
) from exc
# Build registry dynamically from arlmet's public surface so new sources
# are automatically available without changes here.
registry: dict[str, type] = {
cls.name: cls # type: ignore[attr-defined]
for attr in _src.__all__
if (cls := getattr(_src, attr, None)) is not None
and isinstance(cls, type)
and issubclass(cls, _src.MeteorologySource)
and cls is not _src.MeteorologySource
}
if name not in registry:
raise ConfigValidationError(
f"Unknown arlmet source {name!r}. Available: {sorted(registry)}."
)
return registry[name](**kwargs)
[docs]
class MetStream:
"""
Runtime handler that resolves and stages met files for one named met stream.
In *archive mode* (``source_type=None``) it globs an existing local
directory using ``file_format`` / ``file_tres`` to discover required files.
In *source mode* (``source_type`` set to an arlmet source name) it delegates
file resolution and optional on-download cropping to an
``arlmet.MeteorologySource`` instance, then stages the result.
Subgridding of local archive files is handled inside ``_stage_files`` via
``arlmet.extract_subset``, caching cropped copies in ``subgrid_dir``.
"""
def __init__(
self,
met_id: MetID | str,
directory: Path | str,
file_format: str | None = None,
file_tres: pd.Timedelta | str | None = None,
n_min: int = 1,
source_type: str | None = None,
source_kwargs: dict[str, Any] | None = None,
backend: str = "s3",
subgrid_enable: bool = False,
subgrid_bounds=None,
subgrid_buffer: float = 0.2,
subgrid_levels: int | None = None,
subgrid_dir: Path | None = None,
):
self.id = MetID(met_id)
self.directory = Path(directory).expanduser().resolve()
self.file_format = file_format
self.file_tres = pd.to_timedelta(file_tres) if file_tres is not None else None
self.n_min = int(n_min)
self.source_type = source_type
self.source_kwargs = source_kwargs or {}
self.backend = backend
self.subgrid_enable = subgrid_enable
self.subgrid_bounds = subgrid_bounds
self.subgrid_buffer = subgrid_buffer
self.subgrid_levels = subgrid_levels
self.subgrid_dir = (
Path(subgrid_dir).expanduser().resolve() if subgrid_dir else None
)
# Lazily constructed arlmet source instance (source mode only)
self._arlmet_source: ArlmetSource | None = None
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _dedupe_matched_files(paths: list[Path]) -> list[Path]:
"""Resolve symlinks and collapse duplicate archive entries."""
return sorted(
dict.fromkeys(path.resolve() for path in paths),
key=lambda path: path.name,
)
def _get_arlmet_source(self) -> ArlmetSource:
"""Return (and cache) the arlmet source instance."""
if self._arlmet_source is None:
assert self.source_type is not None
self._arlmet_source = _build_arlmet_source(
self.source_type, self.source_kwargs
)
return self._arlmet_source
def _effective_bbox(self) -> tuple[float, float, float, float]:
"""Return (west, south, east, north) bbox with buffer applied."""
b = self.subgrid_bounds
buf = self.subgrid_buffer
if b is None:
raise ValueError("subgrid_bounds is required to compute effective bbox.")
if buf is None or buf < 0:
raise ValueError("subgrid_buffer must be a non-negative number.")
return (b.xmin - buf, b.ymin - buf, b.xmax + buf, b.ymax + buf)
def _resolved_subgrid_dir(self) -> Path:
"""Return the subgrid cache directory, auto-picking directory/subgrid."""
return (
self.subgrid_dir
if self.subgrid_dir is not None
else self.directory / "subgrid"
)
def _level_indices(self) -> list[int] | None:
"""Return level indices to keep, or None to keep all."""
if self.subgrid_levels is None:
return None
return list(range(self.subgrid_levels))
# ------------------------------------------------------------------
# File resolution
# ------------------------------------------------------------------
def _fetch_from_source(self, r_time: pd.Timestamp, n_hours: int) -> list[Path]:
"""Resolve required files via an arlmet source (download / cached download)."""
sim_end = r_time + pd.Timedelta(hours=n_hours)
t_start: pd.Timestamp = min(r_time, sim_end) # type: ignore[assignment]
t_end: pd.Timestamp = max(r_time, sim_end) # type: ignore[assignment]
bbox = self._effective_bbox() if self.subgrid_enable else None
source = self._get_arlmet_source()
try:
files = source.fetch(
t_start,
t_end,
local_dir=self.directory,
backend=self.backend,
bbox=bbox,
)
except ImportError as exc:
raise ImportError(
f"{exc}\n\n"
"Downloading meteorology requires the cloud extra. "
"Install with: pip install pystilt[cloud]"
) from exc
n_files = len(files)
if n_files == 0 or n_files < self.n_min:
raise MeteorologyError(
f"Insufficient meteorological files found. "
f"Found: {n_files}, Required: {self.n_min}."
)
return files
[docs]
def required_files(self, r_time, n_hours: int) -> list[Path]:
"""Return source met files required to cover one simulation time window."""
_r_time = cast(pd.Timestamp, pd.Timestamp(r_time))
if self.source_type is not None:
return self._fetch_from_source(_r_time, n_hours)
# Archive-glob mode
sim_end = _r_time + pd.Timedelta(hours=n_hours)
earlier = min(_r_time, sim_end)
later = max(_r_time, sim_end)
met_start = earlier.floor(self.file_tres) # type: ignore[arg-type]
met_end = later
if n_hours < 0:
met_end_ceil = later.ceil(self.file_tres) # type: ignore[arg-type]
if later < met_end_ceil:
met_end = met_end_ceil
met_times = pd.date_range(met_start, met_end, freq=self.file_tres)
patterns = list(dict.fromkeys(t.strftime(self.file_format) for t in met_times)) # type: ignore[arg-type]
files: list[Path] = []
missing: list[str] = []
for pattern in patterns:
matches = [
p
for p in self.directory.rglob(f"{pattern}*")
if p.is_file() and ".lock" not in p.name
]
if matches:
files.extend(matches)
else:
missing.append(pattern)
files = self._dedupe_matched_files(files)
n_files = len(files)
if n_files == 0 or n_files < self.n_min:
detail = ""
if missing:
examples = ", ".join(missing[:3])
detail = f" Patterns not found in {self.directory}: {examples}."
raise MeteorologyError(
f"Insufficient meteorological files found. "
f"Found: {n_files}, Required: {self.n_min}.{detail}"
)
if missing:
examples = ", ".join(missing[:3])
logger.warning(
"Met patterns not found (simulation may lack temporal coverage): %s",
examples,
)
return files
[docs]
def stage_files_for_simulation(
self,
*,
r_time,
n_hours: int,
target_dir: Path | str,
) -> list[Path]:
"""Resolve and stage met files into a simulation-local compute directory."""
return self._stage_files(
self.required_files(r_time=r_time, n_hours=n_hours),
target_dir=target_dir,
)
def _stage_files(self, files: list[Path], target_dir: Path | str) -> list[Path]:
"""
Materialize met files into a compute-local directory via link-or-copy.
When subgrid_enable is True and we are in archive mode (source_type is
None), each source file is first subsetted via arlmet.extract_subset into
subgrid_dir (shared cache), then the cached copy is linked/copied into the
per-simulation target. In source mode, files are already cropped on fetch.
"""
# Resolve subgridded paths for archive-mode subsetting
if self.subgrid_enable and self.source_type is None:
files = self._subset_archive_files(files)
target = Path(target_dir)
target.mkdir(parents=True, exist_ok=True)
staged: list[Path] = []
staged_sources: dict[Path, Path] = {}
for src in files:
src = Path(src)
resolved_src = src.resolve()
if src.parent == target:
if src not in staged_sources:
staged_sources[src] = resolved_src
staged.append(src)
continue
dst = target / src.name
existing = staged_sources.get(dst)
if existing is not None:
if existing != resolved_src:
logger.warning(
"met source has duplicate basename %s at %s and %s; staging %s",
src.name,
existing,
resolved_src,
existing,
)
continue
staged_sources[dst] = resolved_src
if dst.exists() or dst.is_symlink():
staged.append(dst)
continue
try:
dst.symlink_to(resolved_src)
except OSError:
shutil.copy2(src, dst)
staged.append(dst)
return staged
def _subset_archive_files(self, files: list[Path]) -> list[Path]:
"""Subset archive files via arlmet.extract_subset, caching in subgrid_dir."""
try:
from arlmet import extract_subset
except ImportError as exc: # pragma: no cover
raise ImportError(
"arlmet is required for met subsetting. "
"Install with: pip install pystilt"
) from exc
subgrid_dir = self._resolved_subgrid_dir()
subgrid_dir.mkdir(parents=True, exist_ok=True)
bbox = self._effective_bbox()
levels = self._level_indices()
subsetted: list[Path] = []
for src in files:
cache_path = subgrid_dir / src.name
if not cache_path.exists():
logger.info("Subsetting %s → %s", src.name, cache_path)
extract_subset(src, cache_path, bbox=bbox, levels=levels)
subsetted.append(cache_path)
return subsetted