"""
ARL record parsing and data unpacking.
This module provides classes and functions for parsing ARL file records,
including index records, data records, headers, and the differential unpacking
algorithm. It handles the binary format used in ARL meteorological files.
"""
import string
from collections.abc import Sequence
from dataclasses import dataclass, field
from typing import Any, ClassVar
import numpy as np
import pandas as pd
# ARL meteorological variable definitions
ARL_SURFACE_VARIABLES = {
"U10M": ("U-component of wind at 10 m", "m/s"),
"V10M": ("V-component of wind at 10 m", "m/s"),
"T02M": ("Temperature at 2 m", "K"),
"PBLH": ("Boundary Layer Height", "m"),
"PRSS": ("Pressure at surface", "hPa"),
"MSLP": ("Pressure at mean sea level", "hPa"),
"TMPS": ("Temperature at surface", "K"),
"USTR": ("Friction Velocity", "m/s"),
"TSTR": ("Friction Temperature", "K"),
"RGHS": ("Surface Roughness", "m"),
"UMOF": ("U-Momentum flux", "N/m2"),
"VMOF": ("V-Momentum flux", "N/m2"),
"SHTF": ("Sfc sensible heat flux", "W/m2"),
"LTHF": ("Latent heat flux", "W/m2"),
"DSWF": ("Downward short wave flux", "W/m2"),
"RH2M": ("Relative humidity at 2 m", "%"),
"SPH2": ("Specific humidity at 2 m", "kg/kg"),
"CAPE": ("Convective Available Potential Energy", "J/kg"),
"TCLD": ("Total cloud cover", "%"),
"TPPA": ("Total precipitation for whole dataset", "m"),
"TPPD": ("Total precipitation (24-h)", "m"),
"TPPT": ("Total precipitation (12-h)", "m"),
"TPP6": ("Total precipitation (6-h)", "m"),
"TPP3": ("Total precipitation (3-h)", "m"),
"TPP1": ("Total precipitation (1-h)", "m"),
"PRT6": ("Precipitation Rate (6-h)", "m/minute"),
"PRT3": ("Precipitation Rate (3-h)", "m/minute"),
}
ARL_UPPER_VARIABLES = {
"UWND": ("U wind component (respect to grid)", "m/s"),
"VWND": ("V wind component (respect to grid)", "m/s"),
"HGTS": ("Geopotential height", "gpm"),
"TEMP": ("Temperature", "K"),
"WWND": ("Pressure vertical velocity", "hPa/s"),
"RELH": ("Relative Humidity", "%"),
"SPHU": ("Specific Humidity", "kg/kg"),
"DZDT": ("vertical velocity", "m/s"),
"TKEN": ("turbulent kinetic energy", "m2/s2"),
}
[docs]
def letter_to_thousands(char: str) -> int:
"""
Convert letter to thousands digit for large grids.
A=1000, B=2000, C=3000, etc.
"""
if char in string.ascii_uppercase:
return (string.ascii_uppercase.index(char) + 1) * 1000
return 0
[docs]
def restore_year(yr: str | int):
"""
Convert 2-digit year to 4-digit year.
Parameters
----------
yr : str or int
Year value (2-digit or 4-digit).
Returns
-------
int
4-digit year. Years < 40 are mapped to 2000+yr, otherwise 1900+yr.
Already 4-digit years (>= 1900) are returned unchanged.
"""
yr = int(yr)
if yr >= 1900:
return yr
# This was in hysplit python code
return 2000 + yr if (yr < 40) else 1900 + yr
[docs]
def unpack(
data: bytes | bytearray,
nx: int,
ny: int,
precision: float,
exponent: int,
initial_value: float,
checksum: int | None = None,
) -> np.ndarray:
"""
Unpacks a differentially packed byte array into a 2D numpy array.
This function is a vectorized Python translation of the HYSPLIT
FORTRAN PAKINP subroutine. It uses a differential unpacking scheme
where each value is derived from the previous one. The implementation
is optimized with numpy for high performance.
Parameters
----------
data : bytes or bytearray
Packed input data as a 1D array of bytes.
nx : int
The number of columns in the full data grid.
ny : int
The number of rows in the full data grid.
precision : float
Precision of the packed data. Values with an absolute value smaller
than this will be set to zero.
exponent : int
The packing scaling exponent.
initial_value : float
The initial real value at the grid position (0,0).
checksum : int, optional
If provided, a checksum is calculated over `data` and compared
against this value. A `ValueError` is raised on mismatch.
If None (default), the check is skipped.
Returns
-------
np.ndarray
The unpacked 2D numpy array of shape (ny, nx) and dtype float32.
Raises
------
ValueError
If a `checksum` is provided and the calculated checksum does not match.
"""
# --- Vectorized Unpacking ---
# Calculate the scaling exponent
scexp = 1.0 / (2.0 ** (7 - exponent))
# Convert byte array to a 2D numpy grid and calculate the differential values
grid = np.frombuffer(data, dtype=np.uint8).reshape((ny, nx)).astype(np.float32)
diffs = (grid - 127) * scexp
# The first column is a cumulative sum of its own diffs, starting with initial_value.
# We create an array with initial_value followed by the first column's diffs.
first_col_vals = np.concatenate(([initial_value], diffs[:, 0]))
# The cumulative sum gives the unpacked values for the entire first column.
unpacked_col0 = np.cumsum(first_col_vals)[1:]
# Replace the first column of diffs with these now-unpacked starting values.
diffs[:, 0] = unpacked_col0
# The rest of the grid can now be unpacked by a cumulative sum across the rows (axis=1).
# Each row starts with its correct, fully unpacked value in the first column.
unpacked_grid = np.cumsum(diffs, axis=1)
# Apply the precision check to the final grid.
unpacked_grid[np.abs(unpacked_grid) < precision] = 0.0
# --- Optional: Calculate checksum and verify if checksum is provided ---
if checksum is not None:
# Calculate the rotating checksum over the entire input array
calculated_checksum = 0
for k in range(nx * ny):
calculated_checksum += data[k]
# This logic mimics the FORTRAN: "sum carries over the eighth bit add one"
# It's a form of 1's complement checksum addition.
if calculated_checksum >= 256:
calculated_checksum -= 255
if calculated_checksum != checksum:
raise ValueError(
f"Checksum mismatch: calculated {calculated_checksum}, expected {checksum}"
)
return unpacked_grid
[docs]
@dataclass
class VarInfo:
checksum: int
reserved: str
[docs]
@dataclass
class LvlInfo:
"""
Information about a single vertical level.
Parameters
----------
index : int
Level index.
height : float
Height in units of the vertical coordinate system.
variables : dict[str, VarInfo]
Dictionary mapping variable names to VarInfo (checksum and reserved).
"""
index: int
height: float
variables: dict[str, VarInfo]
[docs]
@dataclass
class IndexRecord:
"""
Represents a complete ARL index record that precedes data records for each time period.
Combines the fixed portion with the variable levels portion.
Parameters
----------
header : Header
The header information for the index record.
source : str
Source identifier (4 characters).
forecast_hour : int
Forecast hour.
minutes : int
Minutes after the hour.
pole_lat : float
Pole latitude position of the grid projection.
For lat-lon grids: max latitude of the grid.
pole_lon : float
Pole longitude position of the grid projection.
For lat-lon grids: max longitude of the grid.
tangent_lat : float
Reference latitude at which the grid spacing is defined.
For conical and mercator projections, this is the latitude
at which the grid touches the surface.
For lat-lon grids: grid spacing in degrees latitude.
tangent_lon : float
Reference longitude at which the grid spacing is defined.
For conical and mercator projections, this is the longitude
at which the grid touches the surface.
For lat-lon grids: grid spacing in degrees longitude.
grid_size : float
Grid spacing in km at the reference position.
For lat-lon grids: value of zero signals that the grid is a lat-lon grid.
orientation : float
Angle at the reference point made by the y-axis and the local direction of north.
For lat-lon grids: 0
cone_angle : float
Angle between the axis and the surface of the cone.
For regular projections it equals the latitude at which the grid is tangent to the earth's surface.
Stereographic: ±90, Mercator: 0, Lambert Conformal: 0 ~ 90
For lat-lon grids: 0
sync_x : float
Grid x-coordinate used to equate a position on the grid with a position on earth.
This is a unitless grid index (FORTRAN 1-based).
sync_y : float
Grid y-coordinate used to equate a position on the grid with a position on earth.
This is a unitless grid index (FORTRAN 1-based).
sync_lat : float
Earth latitude corresponding to the grid position (sync_x, sync_y).
For lat-lon grids: latitude of the (0,0) grid point position.
sync_lon : float
Earth longitude corresponding to the grid position (sync_x, sync_y).
For lat-lon grids: longitude of the (0,0) grid point position.
nx : int
Number of grid points in the x-direction (columns).
ny : int
Number of grid points in the y-direction (rows).
nz : int
Number of vertical levels.
vertical_flag : int
Vertical coordinate system type (1=sigma, 2=pressure, 3=terrain, 4=hybrid).
index_length : int
Total length of the index record in bytes, including fixed and variable portions.
levels : list[LvlInfo]
List of levels, each containing:
- level: Level index (0 to nz-1).
- height: Height of the level in units of the vertical coordinate.
- vars: Dictionary mapping variable names to VarInfo (checksum and reserved).
Attributes
----------
N_BYTES_FIXED : int
Number of bytes in the fixed portion of the index record (108 bytes).
time : pd.Timestamp
The valid time of the record, calculated from the header time and minutes.
"""
header: Header = field(repr=False)
source: str
forecast_hour: int
minutes: int
pole_lat: float
pole_lon: float
tangent_lat: float
tangent_lon: float
grid_size: float
orientation: float
cone_angle: float
sync_x: float
sync_y: float
sync_lat: float
sync_lon: float
reserved: float
nx: int
ny: int
nz: int
vertical_flag: int
index_length: int
levels: Sequence[LvlInfo]
N_BYTES_FIXED: ClassVar[int] = 108
[docs]
@staticmethod
def parse_fixed(data: bytes) -> dict[str, Any]:
"""
Parse the fixed 108-byte portion of an index record from raw bytes.
Parameters
----------
data : bytes
Raw bytes containing the fixed portion of the index record.
108 bytes expected.
Returns
-------
dict
Parsed fields as a dictionary.
"""
if len(data) < IndexRecord.N_BYTES_FIXED:
raise ValueError(
f"IndexRecord fixed portion must be at least {IndexRecord.N_BYTES_FIXED} bytes, got {len(data)}"
)
fields = {}
# Parse the fixed portion of the index record
# Format: (A4)(I3)(I2)(12F7)(3I3)(I2)(I4)
fixed = data[: IndexRecord.N_BYTES_FIXED].decode("ascii", errors="ignore")
fields["source"] = fixed[:4].strip()
fields["forecast_hour"] = int(fixed[4:7].strip())
fields["minutes"] = int(fixed[7:9].strip())
# Parse 12 floating point values (each 7 characters)
proj_section = fixed[9 : 9 + 12 * 7] # 12 * 7 = 84 characters
proj_names = [
"pole_lat",
"pole_lon",
"tangent_lat",
"tangent_lon",
"grid_size",
"orientation",
"cone_angle",
"sync_x",
"sync_y",
"sync_lat",
"sync_lon",
"reserved",
]
for i in range(12):
start = i * 7
end = start + 7
val = float(proj_section[start:end].strip())
if val > 180:
# Adjust longitudes greater than 180 degrees
val = -(360 - val)
fields[proj_names[i]] = val
# Parse grid dimensions (3 integers, 3 characters each)
grid_section = fixed[93:102]
fields["nx"] = int(grid_section[0:3].strip())
fields["ny"] = int(grid_section[3:6].strip())
fields["nz"] = int(grid_section[6:9].strip())
# Parse vertical level information
fields["vertical_flag"] = int(fixed[102:104].strip())
fields["index_length"] = int(fixed[104:108].strip())
return fields
[docs]
@staticmethod
def parse_extended(data: bytes, nz: int) -> list[LvlInfo]:
"""
Parse the variable-length portion of an index record
containing level information.
Parameters
----------
data : bytes
Raw bytes containing the extended portion of the index record.
nz : int
Number of vertical levels (from the fixed portion).
Returns
-------
list[LevelInfo]
List of LevelInfo objects for each vertical level.
"""
extended = data.decode("ascii", errors="ignore")
lvls = []
# Loop through levels to extract variable info
cursor = 0
for i in range(nz):
height = float(
extended[cursor : cursor + 6].strip()
) # in units of vertical flag
vars = {}
num_vars = int(extended[cursor + 6 : cursor + 8].strip())
# Loop through variables for this level
for j in range(num_vars):
start = cursor + 8 + j * 8
end = start + 8
name = extended[start : start + 4].strip()
vars[name] = VarInfo(
checksum=int(extended[start + 4 : start + 7].strip()),
reserved=extended[start + 7 : end].strip(), # usually blank
)
lvls.append(LvlInfo(index=i, height=height, variables=vars))
cursor += 8 + num_vars * 8 # Move cursor to next level
return lvls
@property
def time(self) -> pd.Timestamp:
"""
Get the valid time for this index record.
Returns
-------
pd.Timestamp
Valid time calculated from header time plus minutes offset.
"""
return self.header.time + pd.Timedelta(minutes=self.minutes)
@property
def total_nx(self) -> int:
"""
Total number of grid points in the x-direction,
including any additional thousands from the grid letters.
Returns
-------
int
Total number of grid points in x-direction.
"""
return self.nx + self.header.grid[0]
@property
def total_ny(self) -> int:
"""
Total number of grid points in the y-direction,
including any additional thousands from the grid letters.
Returns
-------
int
Total number of grid points in y-direction.
"""
return self.ny + self.header.grid[1]
[docs]
@dataclass
class DataRecord:
"""
Represents a data record containing packed meteorological data.
Parameters
----------
header : Header
The header for this data record.
data : bytes
The packed binary data.
"""
header: Header
data: bytes = field(repr=False)
@property
def variable(self) -> str:
"""
Get the variable name for this data record.
Returns
-------
str
Variable name from the header.
"""
return self.header.variable
@property
def level(self) -> int:
"""
Get the vertical level for this data record.
Returns
-------
int
Level index from the header.
"""
return self.header.level
@property
def forecast(self) -> int:
"""
Get the forecast hour for this data record.
Returns
-------
int
Forecast hour from the header.
"""
return self.header.forecast
[docs]
def unpack(self, nx, ny) -> np.ndarray:
"""
Unpack the data record into a 2D numpy array.
Parameters
----------
nx : int
Number of grid points in the x-direction.
ny : int
Number of grid points in the y-direction.
Returns
-------
np.ndarray
Unpacked 2D numpy array of shape (ny, nx).
"""
return unpack(
data=self.data,
nx=nx,
ny=ny,
precision=self.header.precision,
exponent=self.header.exponent,
initial_value=self.header.initial_value,
)