"""Binary codecs for ARL index records: VarInfo, LvlInfo, and IndexRecord."""
from collections import OrderedDict
from collections.abc import Iterable, Sequence
from dataclasses import dataclass, field
from typing import Any, BinaryIO, ClassVar
import pandas as pd
from arlmet._time import ensure_timestamp
from arlmet.grid import Grid, Projection
from arlmet.header import Header, format_fixed_width_float
from arlmet.vertical import VerticalAxis
def _derive_index_forecast(
record_forecasts: Iterable[int], explicit_forecast: int | None = None
) -> int:
"""
Derive an index-record forecast from per-record forecasts.
If an explicit forecast is provided, use it directly. Otherwise, use the
lowest non-negative forecast from the records, or -1 if all are missing.
"""
if explicit_forecast is not None:
return explicit_forecast
nonnegative_forecasts = sorted(
int(value) for value in record_forecasts if int(value) >= 0
)
return nonnegative_forecasts[0] if nonnegative_forecasts else -1
@dataclass
class VarInfo:
"""Checksum and reserved-byte metadata for one variable within an index record."""
checksum: int
reserved: str
@dataclass
class LvlInfo:
"""
Information about a single vertical level.
Parameters
----------
level : int
Level index.
height : float
Height in units of the vertical coordinate system.
variables : dict[str, VarInfo]
Dictionary mapping variable names to VarInfo (checksum and reserved).
"""
level: int
height: float
variables: OrderedDict[str, VarInfo]
[docs]
@dataclass
class IndexRecord:
"""
Index record describing one ARL time step and its contained variables.
Parameters
----------
header : Header
The header information for the index record.
source : str
Source identifier (4 characters).
forecast : int
Forecast hour.
minutes : int
Minutes after the hour.
pole_lat : float
Pole latitude position of the grid projection.
For lat-lon grids: max latitude of the grid.
pole_lon : float
Pole longitude position of the grid projection.
For lat-lon grids: max longitude of the grid.
tangent_lat : float
Reference latitude at which the grid spacing is defined.
For conical and mercator projections, this is the latitude
at which the grid touches the surface.
For lat-lon grids: grid spacing in degrees latitude.
tangent_lon : float
Reference longitude at which the grid spacing is defined.
For conical and mercator projections, this is the longitude
at which the grid touches the surface.
For lat-lon grids: grid spacing in degrees longitude.
grid_size : float
Grid spacing in km at the reference position.
For lat-lon grids: value of zero signals that the grid is a lat-lon grid.
orientation : float
Angle at the reference point made by the y-axis and the local direction of north.
For lat-lon grids: 0
cone_angle : float
Angle between the axis and the surface of the cone.
For regular projections it equals the latitude at which the grid is tangent to the earth's surface.
Stereographic: ±90, Mercator: 0, Lambert Conformal: 0 ~ 90
For lat-lon grids: 0
sync_x : float
Grid x-coordinate used to equate a position on the grid with a position on earth.
This is a unitless grid index (FORTRAN 1-based).
sync_y : float
Grid y-coordinate used to equate a position on the grid with a position on earth.
This is a unitless grid index (FORTRAN 1-based).
sync_lat : float
Earth latitude corresponding to the grid position (sync_x, sync_y).
For lat-lon grids: latitude of the (0,0) grid point position.
sync_lon : float
Earth longitude corresponding to the grid position (sync_x, sync_y).
For lat-lon grids: longitude of the (0,0) grid point position.
nx : int
Number of grid points in the x-direction (columns).
ny : int
Number of grid points in the y-direction (rows).
nz : int
Number of vertical levels.
vertical_flag : int
Vertical coordinate system type (1=sigma, 2=pressure, 3=terrain, 4=hybrid).
index_length : int
Total length of the index record in bytes, including fixed and variable portions.
levels : sequence of LvlInfo
Variable manifests for each stored vertical level.
Attributes
----------
N_BYTES_FIXED : int
Number of bytes in the fixed portion of the index record (108 bytes).
time : pd.Timestamp
The valid time of the record, calculated from the header time and minutes.
Methods
-------
from_position(file, position)
Read and parse an index record from a file handle.
tobytes()
Serialize the exact used bytes of the index record.
to_record_bytes(record_size)
Serialize the index record padded to one ARL record.
Examples
--------
>>> from arlmet.header import Header
>>> from arlmet.index import IndexRecord
>>> header = Header(
... year=2024,
... month=7,
... day=18,
... hour=0,
... forecast=0,
... level=0,
... grid=(0, 0),
... variable="INDX",
... exponent=0,
... precision=0.0,
... initial_value=0.0,
... )
>>> isinstance(header.time, pd.Timestamp)
True
"""
header: Header = field(repr=False)
source: str
forecast: int
minutes: int
pole_lat: float
pole_lon: float
tangent_lat: float
tangent_lon: float
grid_size: float
orientation: float
cone_angle: float
sync_x: float
sync_y: float
sync_lat: float
sync_lon: float
reserved: float
nx: int
ny: int
nz: int
vertical_flag: int
index_length: int
levels: Sequence[LvlInfo]
N_BYTES_FIXED: ClassVar[int] = 108
[docs]
@classmethod
def from_position(cls, file: BinaryIO, position: int) -> "IndexRecord":
"""
Create an IndexRecord by reading from a file at a specific position.
Parameters
----------
file : file-like object
The file to read from.
position : int
The byte position in the file where the index record starts.
Returns
-------
IndexRecord
The parsed IndexRecord object.
"""
file.seek(position)
header_bytes = file.read(Header.N_BYTES)
if not header_bytes:
raise EOFError(
f"Reached end of file while reading header at position {position}"
)
header = Header.from_bytes(header_bytes)
if header.variable != "INDX":
raise ValueError(
f"Expected 'INDX' record at position {position}, found '{header.variable}'"
)
fixed = IndexRecord.parse_fixed(data=file.read(IndexRecord.N_BYTES_FIXED))
extended = file.read(fixed["index_length"] - IndexRecord.N_BYTES_FIXED)
levels = IndexRecord.parse_extended(data=extended, nz=fixed["nz"])
return IndexRecord(header=header, **fixed, levels=levels)
def serialize_fixed(self, index_length: int | None = None) -> bytes:
"""Serialize the fixed 108-byte portion of the index record."""
if index_length is None:
index_length = self.index_length
values = [
self.pole_lat,
self.pole_lon,
self.tangent_lat,
self.tangent_lon,
self.grid_size,
self.orientation,
self.cone_angle,
self.sync_x,
self.sync_y,
self.sync_lat,
self.sync_lon,
self.reserved,
]
proj = "".join(format_fixed_width_float(value, 7) for value in values)
fixed = (
f"{self.source:<4}"
f"{self.forecast:3d}"
f"{self.minutes:2d}"
f"{proj}"
f"{self.nx:3d}"
f"{self.ny:3d}"
f"{self.nz:3d}"
f"{self.vertical_flag:2d}"
f"{index_length:4d}"
)
if len(fixed) != self.N_BYTES_FIXED:
raise ValueError(
f"Fixed index serialization produced {len(fixed)} bytes, expected {self.N_BYTES_FIXED}."
)
return fixed.encode("ascii")
def serialize_extended(self) -> bytes:
"""Serialize the variable-length level/variable portion of the index record."""
chunks: list[str] = []
for level in self.levels:
chunks.append(format_fixed_width_float(level.height, 6))
chunks.append(f"{len(level.variables):2d}")
for name, info in level.variables.items():
reserved = (info.reserved or " ")[:1]
chunks.append(f"{name:<4}{info.checksum:3d}{reserved}")
return "".join(chunks).encode("ascii")
[docs]
def tobytes(self) -> bytes:
"""Serialize the exact used bytes of the index record, including its header."""
extended = self.serialize_extended()
index_length = self.N_BYTES_FIXED + len(extended)
self.index_length = index_length
fixed = self.serialize_fixed(index_length=index_length)
return self.header.tobytes() + fixed + extended
[docs]
def to_record_bytes(self, record_length: int) -> bytes:
"""Serialize the index record padded to one full ARL record."""
raw = self.tobytes()
if len(raw) > record_length:
raise ValueError(
f"Index record uses {len(raw)} bytes, which exceeds record length {record_length}."
)
return raw.ljust(record_length, b" ")
@staticmethod
def parse_fixed(data: bytes) -> dict[str, Any]:
"""
Parse the fixed 108-byte portion of an index record from raw bytes.
Parameters
----------
data : bytes
Raw bytes containing the fixed portion of the index record.
108 bytes expected.
Returns
-------
dict
Parsed fields as a dictionary.
"""
if len(data) < IndexRecord.N_BYTES_FIXED:
raise ValueError(
f"IndexRecord fixed portion must be at least {IndexRecord.N_BYTES_FIXED} bytes, "
f"got {len(data)}"
)
fixed = data[: IndexRecord.N_BYTES_FIXED].decode("ascii", errors="ignore")
fields: dict[str, Any] = {}
fields["source"] = fixed[:4].strip()
fields["forecast"] = int(fixed[4:7].strip())
fields["minutes"] = int(fixed[7:9].strip())
proj_section = fixed[9 : 9 + 12 * 7]
proj_names = [
"pole_lat",
"pole_lon",
"tangent_lat",
"tangent_lon",
"grid_size",
"orientation",
"cone_angle",
"sync_x",
"sync_y",
"sync_lat",
"sync_lon",
"reserved",
]
for i in range(12):
start = i * 7
end = start + 7
val = float(proj_section[start:end].strip())
if val > 180:
val = -(360 - val)
fields[proj_names[i]] = val
grid_section = fixed[93:102]
fields["nx"] = int(grid_section[0:3].strip())
fields["ny"] = int(grid_section[3:6].strip())
fields["nz"] = int(grid_section[6:9].strip())
fields["vertical_flag"] = int(fixed[102:104].strip())
fields["index_length"] = int(fixed[104:108].strip())
return fields
@staticmethod
def parse_extended(data: bytes, nz: int) -> list[LvlInfo]:
"""
Parse the variable-length portion of an index record.
Parameters
----------
data : bytes
Raw bytes containing the extended portion of the index record.
nz : int
Number of vertical levels (from the fixed portion).
Returns
-------
list[LvlInfo]
List of LvlInfo objects for each vertical level.
"""
extended = data.decode("ascii", errors="ignore")
lvls = []
cursor = 0
for i in range(nz):
height = float(extended[cursor : cursor + 6].strip())
vars: OrderedDict[str, VarInfo] = OrderedDict()
num_vars = int(extended[cursor + 6 : cursor + 8].strip())
for j in range(num_vars):
start = cursor + 8 + j * 8
end = start + 8
name = extended[start : start + 4].strip()
vars[name] = VarInfo(
checksum=int(extended[start + 4 : start + 7].strip()),
reserved=extended[start + 7 : end].strip(),
)
lvls.append(LvlInfo(level=i, height=height, variables=vars))
cursor += 8 + num_vars * 8
return lvls
@property
def time(self) -> pd.Timestamp:
"""Valid time calculated from header time plus minutes offset."""
return ensure_timestamp(self.header.time + pd.Timedelta(minutes=self.minutes))
@property
def total_nx(self) -> int:
"""Total x grid points including thousands from header grid letters."""
return self.nx + self.header.grid[0]
@property
def total_ny(self) -> int:
"""Total y grid points including thousands from header grid letters."""
return self.ny + self.header.grid[1]
@property
def grid(self) -> Grid:
"""Construct a Grid from the index record's projection parameters."""
proj = Projection(
pole_lat=self.pole_lat,
pole_lon=self.pole_lon,
tangent_lat=self.tangent_lat,
tangent_lon=self.tangent_lon,
grid_size=self.grid_size,
orientation=self.orientation,
cone_angle=self.cone_angle,
sync_x=self.sync_x,
sync_y=self.sync_y,
sync_lat=self.sync_lat,
sync_lon=self.sync_lon,
)
return Grid(projection=proj, nx=self.total_nx, ny=self.total_ny)
@property
def vertical_axis(self) -> VerticalAxis:
"""Construct the vertical axis definition from the index record."""
return VerticalAxis(
flag=self.vertical_flag,
levels=[lvl.height for lvl in self.levels],
offset=self.reserved,
)