Source code for stilt.observations.receptors

"""Observation-to-receptor conversion helpers."""

from __future__ import annotations

import math

import numpy as np

from stilt.config import VerticalReference
from stilt.observations.geometry import LineOfSight
from stilt.observations.observation import Observation
from stilt.receptors import ColumnReceptor, MultiPointReceptor, PointReceptor, Receptor

_EARTH_RADIUS_M = 6_371_000.0


[docs] def build_point_receptor( observation: Observation, *, altitude: float | None = None, ) -> PointReceptor: """ Build a point receptor for one observation. If ``altitude`` is omitted, the observation altitude is used. """ receptor_altitude = observation.altitude if altitude is None else altitude if receptor_altitude is None: raise ValueError( "A point receptor altitude is required. Pass `altitude=` or set " "`Observation.altitude`." ) return PointReceptor( time=observation.time, longitude=observation.longitude, latitude=observation.latitude, altitude=receptor_altitude, altitude_ref=observation.altitude_ref, )
[docs] def build_column_receptor( observation: Observation, *, bottom: float, top: float, altitude_ref: VerticalReference = "agl", ) -> ColumnReceptor: """Build a vertical-column receptor centered on one observation.""" return ColumnReceptor( time=observation.time, longitude=observation.longitude, latitude=observation.latitude, bottom=bottom, top=top, altitude_ref=altitude_ref, )
def build_multipoint_receptor( observation: Observation, *, points: list[tuple[float, float, float]], altitude_ref: VerticalReference = "agl", ) -> MultiPointReceptor: """Build a multipoint receptor from explicit release points.""" r = Receptor.from_points(observation.time, points, altitude_ref=altitude_ref) if not isinstance(r, MultiPointReceptor): raise ValueError( "build_multipoint_receptor requires at least 2 distinct points." ) return r def _sample_los_altitudes( line_of_sight: LineOfSight, *, surface_altitude: float | None = None, max_altitude: float | None = None, ) -> list[float]: """Return altitude samples for one LOS definition.""" if line_of_sight.altitude_levels: altitudes = [float(v) for v in line_of_sight.altitude_levels] else: assert line_of_sight.start_altitude is not None assert line_of_sight.end_altitude is not None if line_of_sight.count is not None: if line_of_sight.count < 2: raise ValueError("LineOfSight.count must be >= 2.") altitudes = np.linspace( line_of_sight.start_altitude, line_of_sight.end_altitude, line_of_sight.count, ).tolist() else: assert line_of_sight.frequency is not None if line_of_sight.frequency <= 0: raise ValueError("LineOfSight.frequency must be positive.") altitude_range = abs( line_of_sight.end_altitude - line_of_sight.start_altitude ) count = max(2, int(round(altitude_range / line_of_sight.frequency)) + 1) altitudes = np.linspace( line_of_sight.start_altitude, line_of_sight.end_altitude, count, ).tolist() lower_bound = 0.0 if line_of_sight.altitude_ref == "agl" else None if surface_altitude is not None and line_of_sight.altitude_ref == "msl": lower_bound = surface_altitude if lower_bound is not None: altitudes = [v for v in altitudes if v >= lower_bound] if max_altitude is not None: altitudes = [v for v in altitudes if v <= max_altitude] if not altitudes: raise ValueError("LOS sampling produced no receptor points after clipping.") return altitudes def _resolved_los_surface_altitude( observation: Observation, line_of_sight: LineOfSight, *, surface_altitude: float | None = None, ) -> float | None: """Return the effective lower clipping altitude for one LOS.""" if surface_altitude is not None: return surface_altitude if line_of_sight.surface_altitude is not None: return line_of_sight.surface_altitude if ( line_of_sight.altitude_ref == "msl" and observation.altitude_ref == "msl" and observation.altitude is not None ): return float(observation.altitude) return None def _resolved_los_max_altitude( line_of_sight: LineOfSight, *, model_top_altitude: float | None = None, ) -> float | None: """Return the effective upper clipping altitude for one LOS.""" if model_top_altitude is not None: return model_top_altitude return line_of_sight.max_altitude def _offset_lon_lat( *, longitude: float, latitude: float, east_m: float, north_m: float, ) -> tuple[float, float]: """Move a point in local tangent-plane ENU coordinates.""" lat_rad = math.radians(latitude) dlat = north_m / _EARTH_RADIUS_M dlon = east_m / (_EARTH_RADIUS_M * math.cos(lat_rad)) return ( longitude + math.degrees(dlon), latitude + math.degrees(dlat), )
[docs] def build_slant_receptor( observation: Observation, *, line_of_sight: LineOfSight | None = None, surface_altitude: float | None = None, model_top_altitude: float | None = None, ) -> MultiPointReceptor: """ Build a slant multipoint receptor from observation LOS geometry. When explicit clipping values are omitted, the builder uses the strongest available hints: - anchor altitude: ``LineOfSight.anchor_altitude`` -> ``Observation.altitude`` - surface clipping for MSL LOS: explicit ``surface_altitude`` -> ``LineOfSight.surface_altitude`` -> ``Observation.altitude`` when the observation altitude is also MSL - model-top clipping: explicit ``model_top_altitude`` -> ``LineOfSight.max_altitude`` """ los = line_of_sight or observation.line_of_sight if los is None: raise ValueError( "A slant receptor requires Observation.line_of_sight or an explicit " "`line_of_sight=` argument." ) if observation.viewing is None: raise ValueError("A slant receptor requires Observation.viewing geometry.") if observation.viewing.viewing_zenith_angle is None: raise ValueError( "A slant receptor requires ViewingGeometry.viewing_zenith_angle." ) if observation.viewing.viewing_azimuth_angle is None: raise ValueError( "A slant receptor requires ViewingGeometry.viewing_azimuth_angle." ) viewing_zenith = float(observation.viewing.viewing_zenith_angle) viewing_azimuth = float(observation.viewing.viewing_azimuth_angle) if viewing_zenith < 0 or viewing_zenith >= 90: raise ValueError( "Slant receptor viewing_zenith_angle must be in [0, 90) degrees." ) altitudes = _sample_los_altitudes( los, surface_altitude=_resolved_los_surface_altitude( observation, los, surface_altitude=surface_altitude, ), max_altitude=_resolved_los_max_altitude( los, model_top_altitude=model_top_altitude, ), ) anchor_altitude = ( los.anchor_altitude if los.anchor_altitude is not None else observation.altitude if observation.altitude is not None else altitudes[0] ) zenith_rad = math.radians(viewing_zenith) azimuth_rad = math.radians(viewing_azimuth) points: list[tuple[float, float, float]] = [] for altitude in altitudes: delta_altitude = altitude - anchor_altitude horizontal_distance = delta_altitude * math.tan(zenith_rad) east_m = horizontal_distance * math.sin(azimuth_rad) north_m = horizontal_distance * math.cos(azimuth_rad) longitude, latitude = _offset_lon_lat( longitude=observation.longitude, latitude=observation.latitude, east_m=east_m, north_m=north_m, ) points.append((longitude, latitude, altitude)) r = Receptor.from_points(observation.time, points, altitude_ref=los.altitude_ref) assert isinstance(r, MultiPointReceptor) return r