Source code for arlmet.ops.concat

"""
Concatenate ARL meteorology files into a single ARL file.

ARL files are flat streams of fixed-size records, so joining several files is a
byte-level append — the same operation as ``cat a.arl b.arl > out.arl``. This is
the standard way to combine short (e.g. 6-hourly) met files into longer (e.g.
daily) files: HYSPLIT limits a simulation to at most 12 meteorological input
files when a single grid is specified, so longer per-file coverage is the only
way to span a long run. See the HYSPLIT user guide, "Compilation Limits":
https://www.ready.noaa.gov/hysplitusersguide/S441.htm

The original idea and the HRRR use case come from Derek Mallia's
``concat_hrrr_daily.py`` script.
"""

from __future__ import annotations

import os
import shutil
from collections import defaultdict
from collections.abc import Iterable
from pathlib import Path

import pandas as pd

from arlmet.file import File
from arlmet.index import IndexRecord

__all__ = ["concat", "concat_by_time"]


[docs] def concat( sources: Iterable[str | os.PathLike[str]], destination: str | os.PathLike[str], *, sort: bool = True, ) -> File: """ Concatenate multiple ARL files into a single ARL file. Each input is appended to the output byte-for-byte, preserving every record (including diff records and checksums) exactly. The inputs are first scanned to ensure they share one grid and vertical axis and do not repeat valid times, since a concatenated ARL file must be a single coherent record stream. Parameters ---------- sources : iterable of path-like Input ARL files to join. Must contain at least one path. A bare string or path is rejected — wrap a single file in a list. destination : path-like Output ARL file path. Overwrites any existing file. Must not be one of ``sources``. sort : bool, default True Order the inputs by their earliest valid time before joining, so the output is chronological regardless of input order. When False, inputs are joined in the order given (like ``cat``). Returns ------- File The newly written file, opened in read mode. Close it when done (or use it as a context manager). Callers that only need the file on disk may ignore the return value. Raises ------ ValueError If ``sources`` is empty, if ``destination`` is also a source, if any source is empty, if the inputs disagree on grid or vertical axis, or if the same valid time appears in more than one input. Examples -------- Join three 6-hourly HRRR files into one daily file: >>> import arlmet >>> arlmet.concat( ... ["20240101_00_hrrr", "20240101_06_hrrr", "20240101_12_hrrr"], ... "20240101_hrrr", ... ) Combine every 6-hourly file for one day discovered by glob (``sort=True`` orders them by valid time, so glob order does not matter): >>> import glob >>> arlmet.concat(glob.glob("20240101_*_hrrr"), "20240101_hrrr") """ # A bare str/PathLike is iterable (over characters / not at all), which would # silently do the wrong thing — reject it explicitly. if isinstance(sources, (str, bytes, os.PathLike)): raise TypeError( "sources must be an iterable of paths, not a single path. " "Wrap a single file in a list: concat([path], destination)." ) source_paths = [Path(p) for p in sources] if not source_paths: raise ValueError("concat requires at least one source file.") destination = Path(destination) destination_resolved = destination.resolve() if any(p.resolve() == destination_resolved for p in source_paths): raise ValueError( f"destination {destination} is also one of the sources; " "concatenating a file onto itself is not allowed." ) ordered_paths = _scan_sources(source_paths, sort=sort) with open(destination, "wb") as out: for path in ordered_paths: with open(path, "rb") as src: shutil.copyfileobj(src, out) return File(destination)
def _scan_sources(source_paths: list[Path], *, sort: bool) -> list[Path]: """ Read each source's index records to validate compatibility and order by time. Returns the paths in write order. Raises if any source is empty, the grids or vertical axes disagree, or a valid time is shared across sources. """ scanned = [] for path in source_paths: with File(path) as src: times = src.times if not times: # An empty file never set a grid/axis, so check before reading them. raise ValueError(f"Source file {path} contains no records.") grid = src.grid axis = src.vertical_axis scanned.append((path, times, grid, axis)) # source_paths is non-empty (checked by concat), so scanned[0] exists. reference_path, _, reference_grid, reference_axis = scanned[0] for path, _times, grid, axis in scanned[1:]: if grid != reference_grid: raise ValueError( f"Grid mismatch: {path} has grid {grid.nx}x{grid.ny}, " f"incompatible with {reference_path} " f"({reference_grid.nx}x{reference_grid.ny}). " "Concatenated ARL files must share a single grid." ) if axis != reference_axis: raise ValueError( f"Vertical axis mismatch: {path} (flag {axis.flag}, " f"{len(axis.levels)} levels) is incompatible with " f"{reference_path} (flag {reference_axis.flag}, " f"{len(reference_axis.levels)} levels). Concatenated ARL " "files must share a single vertical axis." ) if sort: # times is sorted by File.times, so times[0] is each file's earliest. scanned.sort(key=lambda item: item[1][0]) _reject_duplicate_times([(path, times) for path, times, _, _ in scanned]) return [path for path, _, _, _ in scanned] def _reject_duplicate_times(scanned: list[tuple[Path, list[pd.Timestamp]]]) -> None: """Raise if any valid time appears in more than one source.""" owner: dict[pd.Timestamp, Path] = {} for path, times in scanned: for time in times: if time in owner: raise ValueError( f"Valid time {time} appears in both {owner[time]} and " f"{path}. Concatenated ARL files must not repeat valid times: " "arlmet cannot read a file with duplicate times and HYSPLIT " "behavior on repeated times is undefined." ) owner[time] = path
[docs] def concat_by_time( directory: str | os.PathLike[str], output_directory: str | os.PathLike[str], freq: str = "1D", *, pattern: str = "*", time_range: tuple[str | pd.Timestamp, str | pd.Timestamp] | None = None, template: str = "{time:%Y%m%d}_arl", sort: bool = True, ) -> list[Path]: """ Group every ARL file in a directory by valid time and concatenate each group. Each input is assigned to a time bin from its first valid time — read from the file's index record, not parsed from its name — floored to ``freq``. All files in a bin are concatenated into one output file. This is the batch form of :func:`concat`: e.g. turning a directory of 6-hourly HRRR files into one file per day. Parameters ---------- directory : path-like Directory to scan for input ARL files (non-recursive). output_directory : path-like Directory to write the concatenated files into. Created if missing. Should differ from ``directory``. freq : str, default "1D" Fixed-frequency pandas offset alias giving the size of each output chunk: ``"1D"`` = one file per day, ``"6h"`` = one per six hours, etc. Each input is binned by its first valid time floored to this frequency, so ``freq`` should be at least as long as any single input file's span. pattern : str, default "*" Glob (relative to ``directory``) selecting input files. Scope it to ARL files; every match must be a readable ARL file. time_range : tuple of (start, end), optional Inclusive ``(start, end)`` filter on each file's first valid time. Files whose first time falls outside the range are skipped. template : str, default "{time:%Y%m%d}_arl" ``str.format`` template for output filenames, given the bin start time as ``time`` (a ``pandas.Timestamp``), e.g. ``"{time:%Y%m%d}_hrrr"``. It must encode enough resolution to keep bins distinct at ``freq``. sort : bool, default True Passed through to :func:`concat` for each group. Returns ------- list[pathlib.Path] The written output paths, one per non-empty time bin, in time order. Raises ------ ValueError If ``pattern`` matches no files, or a matched file cannot be read as ARL. :func:`concat`'s grid/axis and duplicate-time checks also apply within each group. Examples -------- Turn a directory of 6-hourly HRRR files into one file per day: >>> import arlmet >>> arlmet.concat_by_time( ... "hrrr/", ... "daily/", ... freq="1D", ... pattern="*_hrrr", ... template="{time:%Y%m%d}_hrrr", ... ) """ directory = Path(directory) output_directory = Path(output_directory) candidates = sorted(p for p in directory.glob(pattern) if p.is_file()) if not candidates: raise ValueError(f"No files matched pattern {pattern!r} in {directory}.") time_filter: tuple[pd.Timestamp, pd.Timestamp] | None = None if time_range is not None: time_filter = (pd.Timestamp(time_range[0]), pd.Timestamp(time_range[1])) groups: dict[pd.Timestamp, list[Path]] = defaultdict(list) for path in candidates: first_time = _peek_first_time(path) if time_filter is not None and not ( time_filter[0] <= first_time <= time_filter[1] ): continue groups[first_time.floor(freq)].append(path) output_directory.mkdir(parents=True, exist_ok=True) outputs: list[Path] = [] for bin_start in sorted(groups): out_path = output_directory / template.format(time=bin_start) # concat returns an open File; we only need it written, so close it. with concat(groups[bin_start], out_path, sort=sort): pass outputs.append(out_path) return outputs
def _peek_first_time(path: Path) -> pd.Timestamp: """ Read only the first index record to get a file's earliest valid time. Much cheaper than ``File(path).times`` on large multi-time files: it reads one index record instead of seeking to every index record in the file. """ with open(path, "rb") as handle: try: return IndexRecord.from_position(handle, 0).time except Exception as exc: raise ValueError( f"Could not read an ARL index record from {path}: {exc}. " "Scope `pattern` so it only matches ARL files." ) from exc