Source code for arlmet.file

"""File class for reading and writing ARL meteorology binary files."""

from __future__ import annotations

import os
from collections import OrderedDict
from collections.abc import Iterable, Iterator, Mapping, Sequence
from pathlib import Path
from types import TracebackType
from typing import TYPE_CHECKING, Any, BinaryIO, Literal, cast

if TYPE_CHECKING:
    from typing_extensions import override
else:

    def override(f: object) -> object:
        return f


import numpy as np
import numpy.typing as npt
import pandas as pd
from xarray.backends import CachingFileManager

from arlmet._time import ensure_timestamp
from arlmet.grid import Grid, Projection
from arlmet.header import record_length_from_grid
from arlmet.index import IndexRecord
from arlmet.record import DataRecord, _require_mode
from arlmet.recordset import RecordSet, VariableAccessor
from arlmet.sampling import sample_points_from_file
from arlmet.vertical import VerticalAxis

if TYPE_CHECKING:
    import xarray as xr


[docs] class File: """ Read or write an ARL meteorology file. Parameters ---------- path : path-like Location of the ARL file on disk. mode : {"r", "w"}, default "r" File mode. Read mode scans the file immediately; write mode expects the caller to provide ``source``, ``grid``, and ``vertical_axis`` before creating records. source : str, optional Four-character ARL source identifier used when writing. grid : Grid, optional Horizontal grid metadata used when writing. vertical_axis : VerticalAxis, optional Vertical axis metadata used when writing. Attributes ---------- path : pathlib.Path Filesystem path for the ARL file. mode : {"r", "w"} Active file mode. times : list[pandas.Timestamp] Sorted valid times discovered in the file. source : str ARL source identifier. grid : Grid Horizontal grid metadata. vertical_axis : VerticalAxis Vertical coordinate metadata. variables : VariableAccessor Lazy accessor for variable-wise views inherited from RecordCollection. Methods ------- create_grid(...) Build and attach a Grid when writing a new file. create_recordset(time, forecast=None) Create a writable RecordSet for one valid time. sample_points(points, variables, ...) Interpolate fields at arbitrary lon/lat/z sample points. close() Flush pending writes and release the file handle. Examples -------- >>> import arlmet >>> with arlmet.File("met.arl") as met: ... met.times[0] Timestamp('2024-07-18 00:00:00') """ def __init__( self, path: str | os.PathLike[str], mode: Literal["r", "w"] = "r", source: str | None = None, grid: Grid | None = None, vertical_axis: VerticalAxis | None = None, ): # File attrs self.path = Path(path) self.mode: Literal["r", "w"] = mode if self.mode not in ("r", "w"): raise ValueError("Mode must be 'r' (read) or 'w' (write).") # Open the binary file handle bmode = self.mode + "b" self._manager = CachingFileManager(open, self.path, mode=bmode) self._handle: BinaryIO | None = None # Must be consistent throughout the file self._source: str | None = source self._grid: Grid | None = grid self._vaxis: VerticalAxis | None = vertical_axis # Initialize recordsets as an ordered dict to preserve time order # Mapping: time -> RecordSet self._recordsets: OrderedDict[pd.Timestamp, RecordSet] = OrderedDict() self._diff_parents: dict[str, str] = {} self.variables = VariableAccessor(self) # Scan the file to populate recordsets in read mode if self.mode != "w": self._scan() @property def handle(self) -> BinaryIO: if self._handle is None: # Hot record read/write paths hit this repeatedly, so keep one # acquired handle per File instead of reentering the manager. # xarray's CachingFileManager.acquire() returns IO[Any]; opening in # binary mode guarantees BinaryIO at runtime. self._handle = cast(BinaryIO, self._manager.acquire()) return self._handle @property def size(self) -> int: return self.path.stat().st_size @property def source(self) -> str: if self._source is None: raise ValueError("Source has not been set for this File.") return self._source @source.setter def source(self, value: str): _require_mode(self, "w") self._source = value @property def grid(self) -> Grid: if self._grid is None: raise ValueError("Grid has not been set for this File.") return self._grid @grid.setter def grid(self, value: Grid): _require_mode(self, "w") if not isinstance(value, Grid): raise TypeError("grid must be a Grid instance.") self._grid = value @property def vertical_axis(self) -> VerticalAxis: if self._vaxis is None: raise ValueError("Vertical axis has not been set for this File.") return self._vaxis @vertical_axis.setter def vertical_axis(self, value: VerticalAxis): _require_mode(self, "w") if not isinstance(value, VerticalAxis): raise TypeError("vertical_axis must be a VerticalAxis instance.") self._vaxis = value @property def times(self) -> list[pd.Timestamp]: """Return a sorted list of timestamps in the file.""" return sorted(self._recordsets.keys()) @property def records(self) -> list[DataRecord]: """List of all DataRecords in the file across all RecordSets.""" return [ record for recordset in self._recordsets.values() for record in recordset.records ] @property def record_length(self) -> int: return record_length_from_grid(self.grid)
[docs] def create_grid( self, nx: int, ny: int, pole_lat: float, pole_lon: float, tangent_lat: float, tangent_lon: float, grid_size: float, orientation: float, cone_angle: float, sync_x: float, sync_y: float, sync_lat: float, sync_lon: float, ) -> Grid: """ Create and attach the horizontal grid metadata for a writable file. Parameters ---------- nx : int Number of grid points in the x direction. ny : int Number of grid points in the y direction. pole_lat, pole_lon : float Projection pole definition from the ARL index record. tangent_lat, tangent_lon : float Reference latitude and longitude that define the projection. grid_size : float Grid spacing in kilometres at the projection reference point. orientation : float Rotation of the grid y-axis relative to true north. cone_angle : float Projection cone angle used for stereographic, Lambert, or Mercator grids. sync_x, sync_y : float One-based grid coordinates of the synchronization point. sync_lat, sync_lon : float Geographic coordinates of the synchronization point. Returns ------- Grid The created grid instance, also stored on the file. """ _require_mode(self, "w") if self._grid is not None: raise ValueError("Grid has already been set for this File.") # Build projection proj = Projection( pole_lat=pole_lat, pole_lon=pole_lon, tangent_lat=tangent_lat, tangent_lon=tangent_lon, grid_size=grid_size, orientation=orientation, cone_angle=cone_angle, sync_x=sync_x, sync_y=sync_y, sync_lat=sync_lat, sync_lon=sync_lon, ) # Create grid grid = Grid(projection=proj, nx=nx, ny=ny) self._grid = grid return grid
def _create_recordset( self, position: int, source: str | None, grid: Grid | None, time: pd.Timestamp, *, forecast: int | None = None, ) -> RecordSet: """Internal factory method to create a new RecordSet.""" if time in self._recordsets: raise ValueError(f"A RecordSet for time {time} already exists.") if source is not None and self._source != source: raise ValueError("Source mismatch when creating RecordSet.") if grid is not None and self._grid != grid: raise ValueError("Grid mismatch when creating RecordSet.") rs = RecordSet(file=self, position=position, time=time, forecast=forecast) self._recordsets[time] = rs return rs
[docs] def create_recordset( self, time: pd.Timestamp | str, *, forecast: int | None = None ) -> RecordSet: """ Create a writable RecordSet for one valid time. Parameters ---------- time : pandas.Timestamp or compatible datetime-like Valid time for the new record set. forecast : int, optional Forecast hour for the index record header. HYSPLIT docs are unclear on this, but conversion code appears to use the forecast hour from the first variable specified in the config file. This is brittle in `arlmet`s case, so we chose to either allow specifying it here or an index's forecast hour will be set to the minimum forecast hour among its variables (defaulting to -1 when all variables are missing data). Returns ------- RecordSet Writable record set associated with ``time``. """ _require_mode(self, "w") if self.source is None or self.grid is None: raise ValueError("Source and Grid must be set to create RecordSets.") position = -1 # New recordsets have no on-disk position yet source = grid = None # skip checks in _create_recordset ts = ensure_timestamp(time) return self._create_recordset( position=position, source=source, grid=grid, time=ts, forecast=forecast )
def register_diff_binding(self, diff_name: str, parent_name: str) -> None: """Record and validate the explicit parent binding for a generated DIF name.""" _require_mode(self, "w") if not diff_name.startswith("DIF"): raise ValueError( f"Generated diff record names must start with 'DIF', got '{diff_name}'." ) bound_parent = self._diff_parents.get(diff_name) if bound_parent is not None and bound_parent != parent_name: raise ValueError( f"Difference record '{diff_name}' is already bound to parent " f"'{bound_parent}', not '{parent_name}'." ) self._diff_parents[diff_name] = parent_name def add_record( self, time: pd.Timestamp | str, variable: str, *, level: int, forecast: int | None = None, data: npt.ArrayLike | None = None, ) -> DataRecord: """Add one writable DataRecord, creating its RecordSet if needed.""" _require_mode(self, "w") time = ensure_timestamp(time) if time in self._recordsets: recordset = self._recordsets[time] else: recordset = self.create_recordset(time) # Check if data is missing or effectively empty is_empty = data is None if not is_empty: # Convert to numpy to handle xarray, pandas, or lists uniformly arr = np.asanyarray(data) # Check if array is empty or all elements are NaN if arr.size == 0 or np.all(pd.isna(arr)): is_empty = True if is_empty: if forecast is None: forecast = -1 elif forecast != -1: # Warn if a forecast hour is provided for missing data, since it will be ignored raise ValueError("Forecast must be -1 for missing data.") elif forecast is None: # Raise if data is valid but no forecast was supplied raise ValueError("forecast must be supplied when data is present") return recordset.create_datarecord( variable=variable, level=level, forecast=forecast, data=data, ) def _scan(self) -> None: """Populate RecordSet objects by walking the on-disk index records.""" # Scan the file to populate recordsets in read mode fh = self.handle while fh.tell() < self.size: # Get starting position of each recordset position = fh.tell() # Parse index record try: index = IndexRecord.from_position(fh, position=position) except EOFError: break # End of file # Set source when reading the first index record if self._source is None: self._source = index.source # Set grid when reading the first index record if self._grid is None: self._grid = index.grid # Set vertical axis when reading the first index record if self._vaxis is None: self._vaxis = index.vertical_axis elif self._vaxis != index.vertical_axis: raise ValueError("Vertical axis mismatch between index records.") # Create a RecordSet for this index record (time) rs = self._create_recordset( position=position, source=index.source, grid=index.grid, time=index.time, forecast=index.forecast, ) # Skip to the end of the index record record_length = self.record_length fh.seek(position + record_length) # Read data records for this index record position = fh.tell() # start of data records prev_dr = None for lvl in index.levels: for var in lvl.variables: checksum = lvl.variables[var].checksum reserved = lvl.variables[var].reserved if var.startswith("DIF"): # Assign as diff record to previous data record if prev_dr is None: raise ValueError( f"Difference record found for variable '{var}' " f"at position {position} without a preceding data record." ) prev_dr._create_diff( position=position, variable=var, checksum=checksum, reserved=reserved, ) else: # Create data record dr = rs._create_datarecord( position=position, variable=var, level=lvl.level, checksum=checksum, reserved=reserved, ) # Keep track of previous data record for diff assignment prev_dr = dr position += record_length # go to next record # Move file pointer to the start of the next index record fh.seek(position)
[docs] def close(self) -> None: """Flush pending writes and close the managed binary file handle.""" try: if self.mode == "w": for rs in self._recordsets.values(): if rs.position == -1: if len(rs) == 0: continue rs._flush() self.handle.flush() finally: # Close the file manager — this releases the underlying file handle. # Any mmap objects created from it become invalid and are GC'd automatically. self._manager.close() self._handle = None
[docs] def sample_points( self, points: pd.DataFrame | Mapping[str, Any], variables: str | Iterable[str], *, time: pd.Timestamp | str | None = None, z_kind: str = "pressure", method: str = "linear", ) -> pd.DataFrame: """ Sample fields from this file at arbitrary lon/lat/z points. Parameters ---------- points : Any Table-like object with ``lon``, ``lat``, ``z``, and optionally ``time`` columns. variables : str or iterable of str One or more ARL variables to interpolate. time : pandas.Timestamp or str, optional Default or override time when ``points`` does not include a ``time`` column. z_kind : {"pressure", "native", "agl", "msl"}, default "pressure" Interpretation of the ``z`` coordinate. method : {"linear", "nearest"}, default "linear" Horizontal interpolation method. Returns ------- pandas.DataFrame Copy of ``points`` with one result column per requested variable. Examples -------- >>> import pandas as pd >>> import arlmet >>> pts = pd.DataFrame({"lon": [-111.9], "lat": [40.7], "z": [850.0]}) >>> with arlmet.File("met.arl") as met: ... met.sample_points(pts, ["UWND", "VWND"]) """ return sample_points_from_file( self, points, variables, time=time, z_kind=z_kind, method=method, )
def to_dataset( self, *, drop_variables: Sequence[str] | None = None, bbox: tuple[float, float, float, float] | None = None, levels: list[int] | tuple[int, ...] | None = None, ) -> xr.Dataset: """Project this file into the simplified analysis Dataset representation.""" from arlmet.xarray.dataset import _build_dataset_from_file return _build_dataset_from_file( self, drop_variables=drop_variables, bbox=bbox, levels=levels, ) def __getitem__(self, key: str | int | pd.Timestamp) -> RecordSet: if isinstance(key, str): # Allow lookup by string/int time representation key = ensure_timestamp(key) elif isinstance(key, int): # Allow lookup by positional index key = list(self._recordsets.keys())[key] return self._recordsets[key] def __iter__(self) -> Iterator[pd.Timestamp]: return iter(self._recordsets) def __len__(self) -> int: return len(self._recordsets) def __contains__(self, key: object) -> bool: try: ts = ensure_timestamp(key) except Exception: return False return ts in self._recordsets @override def __repr__(self) -> str: grid_str = ( f"{self._grid.nx}\u00d7{self._grid.ny}" if self._grid is not None else "None" ) levels_str = ( str(len(self._vaxis._levels)) if self._vaxis is not None else "None" ) return ( f"File({self.path.name!r}, mode={self.mode!r}, " f"times={len(self)}, grid={grid_str}, levels={levels_str})" ) def __enter__(self): return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: self.close()