-
Notifications
You must be signed in to change notification settings - Fork 1
Plan Radiative
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Port the CERES smoke radiative analysis from PlumeSentinelAI into DAVINCI as a standalone davinci_monet/radiative/ subpackage, generalized for arbitrary events (Sep 2020 West Coast, Australian fires 2019-2020, FIREX-AQ).
Architecture: Pipeline-inspired subpackage with config → loaders → processing → plots → runner flow. Uses DAVINCI's Pydantic config, NCAR style system, and Typer CLI but does NOT use the model-vs-obs pairing pipeline. The RT module comes over as-is from PlumeSentinelAI.
Tech Stack: xarray, numpy, pandas, matplotlib, cartopy, netCDF4, requests, pydantic, typer
Design doc: Plan-Radiative-Design in the wiki
davinci_monet/radiative/__init__.py — Public API exports
davinci_monet/radiative/config.py — Pydantic schemas (RadiativeConfig and children)
davinci_monet/radiative/rt.py — Delta-Eddington two-stream (ported as-is)
davinci_monet/radiative/loaders/__init__.py — Loader exports
davinci_monet/radiative/loaders/ceres.py — CERES EBAF + SYN1deg (local + OPeNDAP)
davinci_monet/radiative/loaders/merra2.py — MERRA-2 aerosol loading + regrid
davinci_monet/radiative/loaders/aeronet.py — AERONET CSV reader with site/domain filtering
davinci_monet/radiative/processing.py — Anomaly, smoke AOD, regridding utilities
davinci_monet/radiative/plots/__init__.py — Plot dispatcher
davinci_monet/radiative/plots/event_fields.py — Multi-panel TOA event maps
davinci_monet/radiative/plots/anomaly_maps.py — Event-minus-background anomaly maps
davinci_monet/radiative/plots/scatter.py — SW vs AOD scatter + daily correlation
davinci_monet/radiative/plots/site_timeseries.py — Per-site dual-axis time-series
davinci_monet/radiative/plots/surface_impact.py — Surface SW maps + method comparison
davinci_monet/radiative/runner.py — Orchestrates load → process → plot
davinci_monet/tests/test_radiative_rt.py — RT module tests (ported as-is)
davinci_monet/tests/test_radiative_config.py — Config validation tests
davinci_monet/tests/test_radiative_processing.py — Processing function tests
davinci_monet/tests/test_radiative_loaders.py — Loader tests (synthetic data)
davinci_monet/tests/test_radiative_plots.py — Plot smoke tests (synthetic data)
davinci_monet/tests/test_radiative_runner.py — Runner integration test
davinci_monet/cli/app.py — Register `radiative` subcommand group
Files:
- Create:
davinci_monet/radiative/__init__.py - Create:
davinci_monet/radiative/rt.py - Create:
davinci_monet/tests/test_radiative_rt.py
This is a direct port — the RT code comes over unchanged from PlumeSentinelAI. The test file is adapted to use proper imports instead of sys.path hacking.
- Step 1: Create the radiative package with
__init__.py
# davinci_monet/radiative/__init__.py
"""Smoke radiative analysis subpackage.
Provides tools for analyzing wildfire smoke radiative effects using
CERES, MERRA-2, and AERONET data. This is a standalone analysis
workflow that uses DAVINCI's config, styling, and CLI infrastructure.
"""- Step 2: Copy
rt.pyfrom PlumeSentinelAI
Copy ~/EarthSystem/PlumeSentinelAI/scripts/rt.py to davinci_monet/radiative/rt.py. The file should be identical to the source — no changes needed. It contains:
-
delta_eddington_2stream(tau, omega, g, mu0, albedo)— two-stream solver -
rayleigh_tau(lambda_um, p_hpa)— Rayleigh optical depth -
smoke_aod_at_wavelength(aod_550, lambda_um, angstrom)— Angstrom scaling -
combine_aerosol_rayleigh(tau_aer, ssa_aer, g_aer, tau_ray)— bulk properties -
daily_mean_coszen(lat_deg, doy)— solar geometry -
Step 3: Create the test file
Port ~/EarthSystem/PlumeSentinelAI/scripts/test_rt.py to davinci_monet/tests/test_radiative_rt.py. Change the import from sys.path hack to proper package import. Remove the standalone _run_all() runner (pytest handles this).
# davinci_monet/tests/test_radiative_rt.py
"""Unit tests for the radiative transfer module.
Ported from PlumeSentinelAI/scripts/test_rt.py.
"""
from __future__ import annotations
import numpy as np
from davinci_monet.radiative.rt import (
combine_aerosol_rayleigh,
daily_mean_coszen,
delta_eddington_2stream,
rayleigh_tau,
smoke_aod_at_wavelength,
)
ATOL = 1e-6
# ── Two-stream: energy conservation ─────────────────────────────────────────
def test_energy_conservation_basic():
"""R + T + A = 1 for a range of inputs."""
cases = [
(0.001, 0.92, 0.65, 0.5, 0.0),
(0.5, 0.92, 0.65, 0.5, 0.0),
(3.0, 0.92, 0.65, 0.5, 0.0),
(10.0, 0.92, 0.65, 0.5, 0.0),
(100.0, 0.0, 0.0, 0.5, 0.0),
(0.5, 1.0, 0.0, 0.5, 0.0),
(0.5, 0.92, 0.65, 0.8, 0.0),
(0.5, 0.92, 0.65, 0.3, 0.0),
(1.0, 0.85, 0.70, 0.6, 0.0),
]
for tau, omega, g, mu0, alb in cases:
R, T, A = delta_eddington_2stream(tau, omega, g, mu0, alb)
total = R + T + A
assert abs(total - 1.0) < ATOL, (
f"Energy not conserved: τ={tau} ω={omega} g={g} μ₀={mu0} "
f"R={R:.6f} T={T:.6f} A={A:.6f} sum={total:.6f}"
)
def test_energy_conservation_with_albedo():
"""R + T + A = 1 with nonzero surface albedo."""
cases = [
(0.5, 0.92, 0.65, 0.5, 0.1),
(0.5, 0.92, 0.65, 0.5, 0.3),
(1.0, 0.92, 0.65, 0.5, 0.5),
(2.0, 0.88, 0.68, 0.6, 0.2),
]
for tau, omega, g, mu0, alb in cases:
R, T, A = delta_eddington_2stream(tau, omega, g, mu0, alb)
total = R + T + A
assert abs(total - 1.0) < ATOL, (
f"Energy not conserved with albedo: τ={tau} α={alb} "
f"sum={total:.6f}"
)
def test_energy_conservation_arrays():
"""Energy conservation for array inputs."""
tau = np.array([0.1, 0.5, 1.0, 3.0, 10.0])
R, T, A = delta_eddington_2stream(tau, 0.92, 0.65, 0.5)
total = R + T + A
assert np.allclose(total, 1.0, atol=ATOL), f"Array energy: {total}"
# ── Two-stream: physical limits ─────────────────────────────────────────────
def test_thin_layer_transmits():
"""Very thin layer should transmit nearly all light."""
R, T, A = delta_eddington_2stream(0.001, 0.92, 0.65, 0.5)
assert T > 0.99, f"Thin layer T={T:.4f}, expected > 0.99"
assert R < 0.01, f"Thin layer R={R:.4f}, expected < 0.01"
def test_thick_absorber_absorbs():
"""Thick purely absorbing layer should absorb everything."""
R, T, A = delta_eddington_2stream(100.0, 0.0, 0.0, 0.5)
assert A > 0.99, f"Thick absorber A={A:.4f}, expected > 0.99"
assert T < 0.01, f"Thick absorber T={T:.4f}, expected < 0.01"
def test_conservative_no_absorption():
"""Conservative scattering (ω=1) should have zero absorption."""
R, T, A = delta_eddington_2stream(0.5, 0.999999, 0.0, 0.5)
assert abs(A) < 0.01, f"Conservative A={A:.6f}, expected ≈ 0"
def test_thick_smoke_moderate_transmission():
"""τ=3 with SSA=0.92 should still transmit ~30%."""
R, T, A = delta_eddington_2stream(3.0, 0.92, 0.65, 0.5)
assert 0.15 < T < 0.50, f"Thick smoke T={T:.4f}, expected 0.15–0.50"
assert 0.20 < R < 0.50, f"Thick smoke R={R:.4f}, expected 0.20–0.50"
def test_higher_mu_more_transmission():
"""Higher μ₀ (sun higher) → shorter path → more transmission."""
_, T_low, _ = delta_eddington_2stream(1.0, 0.92, 0.65, 0.3)
_, T_high, _ = delta_eddington_2stream(1.0, 0.92, 0.65, 0.8)
assert T_high > T_low, (
f"T(μ₀=0.8)={T_high:.4f} should > T(μ₀=0.3)={T_low:.4f}"
)
def test_albedo_increases_reflection():
"""Surface albedo should increase total reflected flux."""
R0, _, _ = delta_eddington_2stream(0.5, 0.92, 0.65, 0.5, albedo=0.0)
R3, _, _ = delta_eddington_2stream(0.5, 0.92, 0.65, 0.5, albedo=0.3)
assert R3 > R0, f"R(α=0.3)={R3:.4f} should > R(α=0)={R0:.4f}"
def test_zero_tau():
"""Zero optical depth → perfect transmission."""
R, T, A = delta_eddington_2stream(0.0, 0.92, 0.65, 0.5)
assert T > 0.999, f"Zero τ: T={T:.4f}"
assert R < 0.001, f"Zero τ: R={R:.4f}"
def test_nonnegative_outputs():
"""R, T, A should all be non-negative for physical inputs."""
taus = [0.01, 0.1, 0.5, 1.0, 3.0, 10.0]
for tau in taus:
R, T, A = delta_eddington_2stream(tau, 0.92, 0.65, 0.5)
assert R >= -ATOL, f"R={R:.6f} < 0 at τ={tau}"
assert T >= -ATOL, f"T={T:.6f} < 0 at τ={tau}"
assert A >= -ATOL, f"A={A:.6f} < 0 at τ={tau}"
# ── Spectral utilities ───────────────────────────────────────────────────────
def test_rayleigh_wavelength_dependence():
"""Rayleigh τ should decrease strongly with wavelength."""
t_uv = rayleigh_tau(0.35)
t_vis = rayleigh_tau(0.55)
t_nir = rayleigh_tau(1.0)
assert t_uv > t_vis > t_nir, "Rayleigh should decrease with λ"
assert t_uv / t_nir > 50, "UV/NIR Rayleigh ratio should be > 50"
def test_rayleigh_pressure_scaling():
"""Rayleigh τ should scale linearly with pressure."""
t_full = rayleigh_tau(0.55, p_hpa=1013.25)
t_half = rayleigh_tau(0.55, p_hpa=506.625)
assert abs(t_half / t_full - 0.5) < 0.001
def test_smoke_aod_angstrom():
"""Smoke AOD should increase at shorter wavelengths."""
aod_uv = smoke_aod_at_wavelength(1.0, 0.35)
aod_vis = smoke_aod_at_wavelength(1.0, 0.55)
aod_nir = smoke_aod_at_wavelength(1.0, 1.0)
assert aod_uv > aod_vis > aod_nir
assert abs(aod_vis - 1.0) < 0.01
def test_combine_aerosol_rayleigh_conservative():
"""Adding Rayleigh (SSA=1, g=0) should increase SSA and decrease g."""
tau_aer, ssa_aer, g_aer = 0.5, 0.88, 0.68
tau_ray = rayleigh_tau(0.50)
tau_t, ssa_t, g_t = combine_aerosol_rayleigh(tau_aer, ssa_aer, g_aer, tau_ray)
assert tau_t > tau_aer, "Total τ should exceed aerosol τ"
assert ssa_t > ssa_aer, "Adding conservative scatterer should raise SSA"
assert g_t < g_aer, "Adding isotropic scatterer should lower g"
# ── Solar geometry ───────────────────────────────────────────────────────────
def test_coszen_equinox_equator():
"""At equinox, equator should have μ̄ ≈ 1/π ≈ 0.318."""
mu = daily_mean_coszen(0.0, 81)
assert abs(mu - 1.0 / np.pi) < 0.02, f"Equator equinox μ̄={mu:.4f}"
def test_coszen_latitude_gradient():
"""Higher latitude → lower μ̄ in summer."""
mu_low = daily_mean_coszen(30.0, 249)
mu_high = daily_mean_coszen(50.0, 249)
assert mu_low > mu_high, "Lower latitude should have higher μ̄"
def test_coszen_positive():
"""μ̄ should always be positive for our domain."""
lats = np.arange(30, 53, 1.0)
for doy in range(249, 260):
mu = daily_mean_coszen(lats, doy)
assert np.all(mu > 0), f"Negative μ̄ at doy={doy}"- Step 4: Run tests
Run: pytest davinci_monet/tests/test_radiative_rt.py -v
Expected: All 18 tests PASS
- Step 5: Commit
git add davinci_monet/radiative/__init__.py davinci_monet/radiative/rt.py davinci_monet/tests/test_radiative_rt.py
git commit -m "feat(radiative): port RT module and tests from PlumeSentinelAI"Files:
-
Create:
davinci_monet/radiative/config.py -
Create:
davinci_monet/tests/test_radiative_config.py -
Step 1: Write the config test file
# davinci_monet/tests/test_radiative_config.py
"""Tests for radiative analysis configuration schema."""
from __future__ import annotations
from datetime import datetime
import pytest
from davinci_monet.radiative.config import (
AeronetConfig,
CeresConfig,
EventConfig,
Merra2Config,
RadiativeConfig,
SurfaceImpactConfig,
)
class TestEventConfig:
def test_valid_event(self) -> None:
cfg = EventConfig(
name="west-coast-2020",
start_time=datetime(2020, 9, 5),
end_time=datetime(2020, 9, 15),
domain=(-135.0, -105.0, 30.0, 52.0),
)
assert cfg.name == "west-coast-2020"
assert cfg.background_window == 3 # default
def test_end_before_start_raises(self) -> None:
with pytest.raises(ValueError, match="end_time must be after start_time"):
EventConfig(
name="bad",
start_time=datetime(2020, 9, 15),
end_time=datetime(2020, 9, 5),
domain=(-135.0, -105.0, 30.0, 52.0),
)
def test_custom_background_window(self) -> None:
cfg = EventConfig(
name="test",
start_time=datetime(2020, 9, 5),
end_time=datetime(2020, 9, 15),
domain=(-135.0, -105.0, 30.0, 52.0),
background_window=5,
)
assert cfg.background_window == 5
class TestCeresConfig:
def test_local_source(self) -> None:
cfg = CeresConfig(
product="syn1deg",
source="local",
files="/data/ceres/*.nc",
)
assert cfg.product == "syn1deg"
assert cfg.source == "local"
def test_opendap_source(self) -> None:
cfg = CeresConfig(product="ebaf", source="opendap")
assert cfg.files is None
def test_local_requires_files(self) -> None:
with pytest.raises(ValueError, match="files.*required.*local"):
CeresConfig(product="syn1deg", source="local", files=None)
class TestRadiativeConfig:
def test_minimal_config(self) -> None:
"""CERES-only config (no MERRA-2, no AERONET)."""
cfg = RadiativeConfig(
event=EventConfig(
name="test",
start_time=datetime(2020, 9, 5),
end_time=datetime(2020, 9, 15),
domain=(-135.0, -105.0, 30.0, 52.0),
),
ceres=CeresConfig(product="syn1deg", source="local", files="/data/*.nc"),
plots=["toa_event_fields"],
output_dir="/tmp/output",
)
assert cfg.merra2 is None
assert cfg.aeronet is None
assert cfg.surface_impact is None
def test_full_config(self) -> None:
"""Full config with all optional sections."""
cfg = RadiativeConfig(
event=EventConfig(
name="west-coast-2020",
start_time=datetime(2020, 9, 5),
end_time=datetime(2020, 9, 15),
domain=(-135.0, -105.0, 30.0, 52.0),
),
ceres=CeresConfig(product="syn1deg", source="local", files="/data/ceres/*.nc"),
merra2=Merra2Config(files="/data/merra2/*.nc4"),
aeronet=AeronetConfig(files="/data/aeronet/*.csv", sites=["Fresno_2"]),
surface_impact=SurfaceImpactConfig(ssa=0.90, asymmetry=0.60),
plots=["toa_event_fields", "anomaly_maps", "sw_vs_aod_scatter"],
output_dir="/tmp/output",
)
assert cfg.merra2 is not None
assert cfg.surface_impact.ssa == 0.90
def test_surface_impact_requires_merra2(self) -> None:
with pytest.raises(ValueError, match="surface_impact requires merra2"):
RadiativeConfig(
event=EventConfig(
name="test",
start_time=datetime(2020, 9, 5),
end_time=datetime(2020, 9, 15),
domain=(-135.0, -105.0, 30.0, 52.0),
),
ceres=CeresConfig(product="syn1deg", source="local", files="/data/*.nc"),
surface_impact=SurfaceImpactConfig(),
plots=["surface_impact"],
output_dir="/tmp/output",
)
def test_invalid_plot_type(self) -> None:
with pytest.raises(ValueError, match="Invalid plot"):
RadiativeConfig(
event=EventConfig(
name="test",
start_time=datetime(2020, 9, 5),
end_time=datetime(2020, 9, 15),
domain=(-135.0, -105.0, 30.0, 52.0),
),
ceres=CeresConfig(product="syn1deg", source="local", files="/data/*.nc"),
plots=["nonexistent_plot_type"],
output_dir="/tmp/output",
)- Step 2: Run tests to verify they fail
Run: pytest davinci_monet/tests/test_radiative_config.py -v
Expected: FAIL — ModuleNotFoundError: No module named 'davinci_monet.radiative.config'
- Step 3: Implement the config schema
# davinci_monet/radiative/config.py
"""Pydantic configuration schemas for smoke radiative analysis."""
from __future__ import annotations
from datetime import datetime
from typing import Literal
from pydantic import BaseModel, ConfigDict, model_validator
VALID_PLOT_TYPES = frozenset({
"toa_event_fields",
"anomaly_maps",
"sw_vs_aod_scatter",
"site_timeseries",
"surface_impact",
})
class EventConfig(BaseModel):
"""Event definition: time window, domain, and background baseline."""
model_config = ConfigDict(extra="forbid")
name: str
start_time: datetime
end_time: datetime
domain: tuple[float, float, float, float] # (west, east, south, north)
background_window: int = 3 # days before start_time for anomaly baseline
@model_validator(mode="after")
def _check_time_order(self) -> EventConfig:
if self.end_time <= self.start_time:
msg = "end_time must be after start_time"
raise ValueError(msg)
return self
class CeresConfig(BaseModel):
"""CERES data source configuration."""
model_config = ConfigDict(extra="forbid")
product: Literal["ebaf", "syn1deg"]
source: Literal["local", "opendap"]
files: str | None = None
variables: list[str] | None = None
@model_validator(mode="after")
def _check_local_has_files(self) -> CeresConfig:
if self.source == "local" and not self.files:
msg = "files is required when source is local"
raise ValueError(msg)
return self
class Merra2Config(BaseModel):
"""MERRA-2 aerosol data configuration."""
model_config = ConfigDict(extra="forbid")
files: str
smoke_species: list[str] = ["OCEXTTAU", "BCEXTTAU"]
class AeronetConfig(BaseModel):
"""AERONET site observation configuration."""
model_config = ConfigDict(extra="forbid")
files: str
sites: list[str] | None = None # None = all sites within domain
class SurfaceImpactConfig(BaseModel):
"""Semi-empirical surface impact RT parameters."""
model_config = ConfigDict(extra="forbid")
method: list[Literal["merra2", "semi_empirical"]] = [
"merra2",
"semi_empirical",
]
ssa: float = 0.92
asymmetry: float = 0.65
class RadiativeConfig(BaseModel):
"""Top-level radiative analysis configuration."""
model_config = ConfigDict(extra="forbid")
event: EventConfig
ceres: CeresConfig
merra2: Merra2Config | None = None
aeronet: AeronetConfig | None = None
surface_impact: SurfaceImpactConfig | None = None
plots: list[str]
output_dir: str
@model_validator(mode="after")
def _check_constraints(self) -> RadiativeConfig:
if self.surface_impact is not None and self.merra2 is None:
msg = "surface_impact requires merra2 configuration"
raise ValueError(msg)
invalid = set(self.plots) - VALID_PLOT_TYPES
if invalid:
msg = f"Invalid plot types: {invalid}. Valid: {sorted(VALID_PLOT_TYPES)}"
raise ValueError(msg)
return self- Step 4: Run tests
Run: pytest davinci_monet/tests/test_radiative_config.py -v
Expected: All 8 tests PASS
- Step 5: Commit
git add davinci_monet/radiative/config.py davinci_monet/tests/test_radiative_config.py
git commit -m "feat(radiative): add Pydantic configuration schema"Files:
-
Create:
davinci_monet/radiative/processing.py -
Create:
davinci_monet/tests/test_radiative_processing.py -
Step 1: Write processing tests
# davinci_monet/tests/test_radiative_processing.py
"""Tests for radiative analysis processing functions."""
from __future__ import annotations
import numpy as np
import pytest
import xarray as xr
from davinci_monet.radiative.processing import (
compute_anomalies,
compute_background,
derive_smoke_aod,
regrid_nearest,
semi_empirical_surface_dimming,
)
@pytest.fixture
def daily_dataset() -> xr.Dataset:
"""Synthetic CERES-like daily dataset: 5 days, 10x10 grid."""
np.random.seed(42)
times = np.arange(5)
lats = np.linspace(30, 52, 10)
lons = np.linspace(-135, -105, 10)
return xr.Dataset(
{
"toa_sw": (["time", "lat", "lon"], np.random.uniform(100, 300, (5, 10, 10))),
"aod": (["time", "lat", "lon"], np.random.uniform(0, 2, (5, 10, 10))),
},
coords={"time": times, "lat": lats, "lon": lons},
)
class TestComputeBackground:
def test_mean_of_first_n_days(self, daily_dataset: xr.Dataset) -> None:
bg = compute_background(daily_dataset, window=3)
expected = daily_dataset.isel(time=slice(0, 3)).mean(dim="time")
xr.testing.assert_allclose(bg, expected)
def test_window_larger_than_data_uses_all(self, daily_dataset: xr.Dataset) -> None:
bg = compute_background(daily_dataset, window=10)
expected = daily_dataset.mean(dim="time")
xr.testing.assert_allclose(bg, expected)
class TestComputeAnomalies:
def test_anomaly_shape(self, daily_dataset: xr.Dataset) -> None:
bg = compute_background(daily_dataset, window=3)
anom = compute_anomalies(daily_dataset, bg)
assert anom.dims == daily_dataset.dims
assert set(anom.data_vars) == set(daily_dataset.data_vars)
def test_anomaly_values(self, daily_dataset: xr.Dataset) -> None:
bg = compute_background(daily_dataset, window=3)
anom = compute_anomalies(daily_dataset, bg)
# First 3 days should have anomaly near zero on average
first3_mean = anom.isel(time=slice(0, 3)).mean().to_array().values
assert np.all(np.abs(first3_mean) < 50) # reasonable tolerance
class TestDeriveSmokeAod:
def test_sum_of_species(self) -> None:
ds = xr.Dataset({
"OCEXTTAU": (["lat", "lon"], np.ones((5, 5)) * 0.3),
"BCEXTTAU": (["lat", "lon"], np.ones((5, 5)) * 0.1),
})
smoke = derive_smoke_aod(ds, species=["OCEXTTAU", "BCEXTTAU"])
np.testing.assert_allclose(smoke.values, 0.4)
def test_single_species(self) -> None:
ds = xr.Dataset({
"OCEXTTAU": (["lat", "lon"], np.ones((5, 5)) * 0.3),
})
smoke = derive_smoke_aod(ds, species=["OCEXTTAU"])
np.testing.assert_allclose(smoke.values, 0.3)
class TestRegridNearest:
def test_coarsen(self) -> None:
"""Regrid fine grid to coarser grid via nearest-neighbor."""
src_lat = np.linspace(30, 52, 44) # ~0.5° spacing
src_lon = np.linspace(-135, -105, 48) # ~0.625° spacing
src = xr.DataArray(
np.ones((44, 48)),
dims=["lat", "lon"],
coords={"lat": src_lat, "lon": src_lon},
)
tgt_lat = np.linspace(30, 52, 22) # ~1° spacing
tgt_lon = np.linspace(-135, -105, 30)
result = regrid_nearest(src, tgt_lat, tgt_lon)
assert result.shape == (22, 30)
np.testing.assert_allclose(result.values, 1.0)
def test_preserves_pattern(self) -> None:
"""Check that a lat-gradient pattern survives regridding."""
src_lat = np.linspace(0, 10, 20)
src_lon = np.linspace(0, 10, 20)
data = np.broadcast_to(src_lat[:, None], (20, 20)).copy()
src = xr.DataArray(data, dims=["lat", "lon"],
coords={"lat": src_lat, "lon": src_lon})
tgt_lat = np.linspace(0, 10, 10)
tgt_lon = np.linspace(0, 10, 10)
result = regrid_nearest(src, tgt_lat, tgt_lon)
# Values should increase with latitude index
assert np.all(np.diff(result.values.mean(axis=1)) > 0)
class TestSemiEmpiricalDimming:
def test_zero_aod_no_dimming(self) -> None:
dimming = semi_empirical_surface_dimming(
smoke_aod=np.array([0.0]), toa_insol=np.array([300.0]),
)
np.testing.assert_allclose(dimming, 0.0, atol=1e-10)
def test_dimming_increases_with_aod(self) -> None:
aod = np.array([0.1, 0.5, 1.0, 3.0])
dimming = semi_empirical_surface_dimming(
smoke_aod=aod, toa_insol=np.full(4, 300.0),
)
# Dimming should be negative and increasingly negative
assert np.all(dimming < 0)
assert np.all(np.diff(dimming) < 0)
def test_custom_ssa(self) -> None:
aod = np.array([1.0])
insol = np.array([300.0])
d_high_ssa = semi_empirical_surface_dimming(aod, insol, ssa=0.99, asymmetry=0.65)
d_low_ssa = semi_empirical_surface_dimming(aod, insol, ssa=0.80, asymmetry=0.65)
# Higher SSA = more scattering, less absorption, but less surface dimming
# because more forward-scattered light reaches surface
assert abs(d_high_ssa[0]) < abs(d_low_ssa[0])- Step 2: Run tests to verify they fail
Run: pytest davinci_monet/tests/test_radiative_processing.py -v
Expected: FAIL — ModuleNotFoundError
- Step 3: Implement the processing module
# davinci_monet/radiative/processing.py
"""Processing functions for smoke radiative analysis.
Anomaly computation, smoke AOD derivation, regridding, and
semi-empirical surface dimming estimation.
"""
from __future__ import annotations
import numpy as np
import xarray as xr
def compute_background(ds: xr.Dataset, window: int) -> xr.Dataset:
"""Compute background mean from the first ``window`` time steps.
Parameters
----------
ds : xr.Dataset
Dataset with a ``time`` dimension.
window : int
Number of leading time steps to average.
Returns
-------
xr.Dataset
Time-averaged background (no time dimension).
"""
n = min(window, ds.sizes["time"])
return ds.isel(time=slice(0, n)).mean(dim="time")
def compute_anomalies(ds: xr.Dataset, background: xr.Dataset) -> xr.Dataset:
"""Subtract background from each time step.
Parameters
----------
ds : xr.Dataset
Full event dataset with ``time`` dimension.
background : xr.Dataset
Background mean (no ``time`` dimension).
Returns
-------
xr.Dataset
Anomaly dataset with same shape as ``ds``.
"""
return ds - background
def derive_smoke_aod(ds: xr.Dataset, species: list[str]) -> xr.DataArray:
"""Sum specified AOD species to get smoke AOD.
Parameters
----------
ds : xr.Dataset
Dataset containing the species variables.
species : list[str]
Variable names to sum (e.g., ``["OCEXTTAU", "BCEXTTAU"]``).
Returns
-------
xr.DataArray
Smoke AOD field.
"""
result = ds[species[0]].copy()
for s in species[1:]:
result = result + ds[s]
return result
def regrid_nearest(
source: xr.DataArray,
target_lat: np.ndarray,
target_lon: np.ndarray,
) -> xr.DataArray:
"""Nearest-neighbor regrid a 2D field to a new lat/lon grid.
Parameters
----------
source : xr.DataArray
Source data with ``lat`` and ``lon`` dimensions.
target_lat : np.ndarray
Target latitude array.
target_lon : np.ndarray
Target longitude array.
Returns
-------
xr.DataArray
Regridded data on target grid.
"""
src_lat = source.lat.values
src_lon = source.lon.values
data = source.values
out = np.empty((len(target_lat), len(target_lon)), dtype=data.dtype)
for i, tlat in enumerate(target_lat):
li = np.argmin(np.abs(src_lat - tlat))
for j, tlon in enumerate(target_lon):
lj = np.argmin(np.abs(src_lon - tlon))
out[i, j] = data[li, lj]
return xr.DataArray(
out,
dims=["lat", "lon"],
coords={"lat": target_lat, "lon": target_lon},
)
def semi_empirical_surface_dimming(
smoke_aod: np.ndarray,
toa_insol: np.ndarray,
ssa: float = 0.92,
asymmetry: float = 0.65,
) -> np.ndarray:
"""Estimate surface SW dimming from smoke AOD using Beer-Lambert.
Parameters
----------
smoke_aod : array_like
Smoke aerosol optical depth at 550 nm.
toa_insol : array_like
TOA insolation (W/m²), used as baseline S₀.
ssa : float
Single-scattering albedo (default 0.92).
asymmetry : float
Asymmetry parameter (default 0.65).
Returns
-------
ndarray
Surface SW dimming (negative = dimming, W/m²).
"""
smoke_aod = np.asarray(smoke_aod, dtype=np.float64)
toa_insol = np.asarray(toa_insol, dtype=np.float64)
mu = 0.5 # effective daily-mean cosine of solar zenith
extinct = 1.0 - np.exp(-smoke_aod / mu)
fwd_scatter_frac = ssa * (1.0 + asymmetry) / 2.0
return -toa_insol * extinct * (1.0 - fwd_scatter_frac)- Step 4: Run tests
Run: pytest davinci_monet/tests/test_radiative_processing.py -v
Expected: All 10 tests PASS
- Step 5: Commit
git add davinci_monet/radiative/processing.py davinci_monet/tests/test_radiative_processing.py
git commit -m "feat(radiative): add processing module (anomalies, regridding, surface dimming)"Files:
-
Create:
davinci_monet/radiative/loaders/__init__.py -
Create:
davinci_monet/radiative/loaders/ceres.py -
Create:
davinci_monet/radiative/loaders/merra2.py -
Create:
davinci_monet/radiative/loaders/aeronet.py -
Create:
davinci_monet/tests/test_radiative_loaders.py -
Step 1: Write loader tests (synthetic data, no network)
# davinci_monet/tests/test_radiative_loaders.py
"""Tests for radiative analysis data loaders using synthetic data."""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
import pytest
import xarray as xr
from davinci_monet.radiative.loaders.aeronet import load_aeronet
from davinci_monet.radiative.loaders.ceres import load_ceres_local
from davinci_monet.radiative.loaders.merra2 import load_merra2
@pytest.fixture
def synthetic_ceres_dir(tmp_path: Path) -> Path:
"""Create synthetic CERES SYN1deg daily files."""
lats = np.arange(89.5, -90, -1.0) # 180 lats, descending
lons = np.arange(-179.5, 180, 1.0) # 360 lons
for day in range(5, 10):
ds = xr.Dataset(
{
"obs_all_toa_sw": (["lat", "lon"], np.random.uniform(50, 350, (180, 360)).astype("f4")),
"obs_clr_toa_sw": (["lat", "lon"], np.random.uniform(50, 300, (180, 360)).astype("f4")),
"obs_all_toa_net": (["lat", "lon"], np.random.uniform(-100, 200, (180, 360)).astype("f4")),
"init_match_aod55": (["lat", "lon"], np.random.uniform(0, 3, (180, 360)).astype("f4")),
"obs_cld_amount": (["lat", "lon"], np.random.uniform(0, 100, (180, 360)).astype("f4")),
"toa_sw_insol": (["lat", "lon"], np.random.uniform(300, 450, (180, 360)).astype("f4")),
},
coords={"lat": lats, "lon": lons},
)
ds.to_netcdf(tmp_path / f"CERES_SYN1deg-Day_2020090{day}.nc")
return tmp_path
@pytest.fixture
def synthetic_merra2_dir(tmp_path: Path) -> Path:
"""Create synthetic MERRA-2 aerosol files."""
lats = np.arange(-90, 90.5, 0.5)
lons = np.arange(-180, 180, 0.625)
times = np.arange(8) # 8 hourly time steps
for day in range(5, 10):
ds = xr.Dataset(
{
"TOTEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 1, (8, len(lats), len(lons))).astype("f4")),
"OCEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 0.5, (8, len(lats), len(lons))).astype("f4")),
"BCEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 0.1, (8, len(lats), len(lons))).astype("f4")),
},
coords={"time": times, "lat": lats, "lon": lons},
)
ds.to_netcdf(tmp_path / f"MERRA2_401.tavg1_2d_aer_Nx.2020090{day}.nc4")
return tmp_path
@pytest.fixture
def synthetic_aeronet_csv(tmp_path: Path) -> Path:
"""Create synthetic AERONET CSV."""
rows = []
for site in ["Fresno_2", "UCSB", "NASA_Ames"]:
for day in range(5, 10):
for hour in [10, 12, 14]:
rows.append({
"time": f"2020-09-0{day} {hour}:00:00",
"site": site,
"AOD_500nm": np.random.uniform(0.1, 2.0),
"latitude": 36.8 if site == "Fresno_2" else 34.4 if site == "UCSB" else 37.4,
"longitude": -119.8 if site == "Fresno_2" else -119.9 if site == "UCSB" else -122.1,
})
df = pd.DataFrame(rows)
out = tmp_path / "aeronet.csv"
df.to_csv(out, index=False)
return out
class TestCeresLoader:
def test_load_local_syn1deg(self, synthetic_ceres_dir: Path) -> None:
ds = load_ceres_local(
files=str(synthetic_ceres_dir / "CERES_SYN1deg-Day_*.nc"),
domain=(-135.0, -105.0, 30.0, 52.0),
)
assert "obs_all_toa_sw" in ds.data_vars
assert "time" in ds.dims
assert ds.sizes["time"] == 5
# Check domain subsetting
assert float(ds.lat.min()) >= 30.0
assert float(ds.lat.max()) <= 52.0
assert float(ds.lon.min()) >= -135.0
assert float(ds.lon.max()) <= -105.0
def test_load_with_variable_filter(self, synthetic_ceres_dir: Path) -> None:
ds = load_ceres_local(
files=str(synthetic_ceres_dir / "CERES_SYN1deg-Day_*.nc"),
domain=(-135.0, -105.0, 30.0, 52.0),
variables=["obs_all_toa_sw", "init_match_aod55"],
)
assert set(ds.data_vars) == {"obs_all_toa_sw", "init_match_aod55"}
class TestMerra2Loader:
def test_load_and_daily_mean(self, synthetic_merra2_dir: Path) -> None:
ds = load_merra2(
files=str(synthetic_merra2_dir / "MERRA2_401.tavg1_2d_aer_Nx.*.nc4"),
domain=(-135.0, -105.0, 30.0, 52.0),
smoke_species=["OCEXTTAU", "BCEXTTAU"],
)
assert "time" in ds.dims
assert "SMOKEAOD" in ds.data_vars
# Should be daily means (no hourly time dimension within each day)
# Each day is one time step
assert ds.sizes["time"] == 5
class TestAeronetLoader:
def test_load_all_sites(self, synthetic_aeronet_csv: Path) -> None:
df = load_aeronet(
files=str(synthetic_aeronet_csv),
domain=(-135.0, -105.0, 30.0, 52.0),
)
assert len(df) > 0
assert "time" in df.columns
assert "site" in df.columns
assert "aod" in df.columns
def test_filter_by_sites(self, synthetic_aeronet_csv: Path) -> None:
df = load_aeronet(
files=str(synthetic_aeronet_csv),
domain=(-135.0, -105.0, 30.0, 52.0),
sites=["Fresno_2"],
)
assert set(df["site"].unique()) == {"Fresno_2"}- Step 2: Run tests to verify they fail
Run: pytest davinci_monet/tests/test_radiative_loaders.py -v
Expected: FAIL — ModuleNotFoundError
- Step 3: Create
loaders/__init__.py
# davinci_monet/radiative/loaders/__init__.py
"""Data loaders for smoke radiative analysis."""
from davinci_monet.radiative.loaders.aeronet import load_aeronet
from davinci_monet.radiative.loaders.ceres import load_ceres_local
from davinci_monet.radiative.loaders.merra2 import load_merra2
__all__ = ["load_aeronet", "load_ceres_local", "load_merra2"]- Step 4: Implement CERES loader
# davinci_monet/radiative/loaders/ceres.py
"""CERES data loaders for local files and OPeNDAP.
Supports EBAF (monthly) and SYN1deg (daily) products.
"""
from __future__ import annotations
import glob
import struct
from datetime import date, timedelta
from pathlib import Path
import numpy as np
import xarray as xr
from davinci_monet.logging import get_logger
logger = get_logger(__name__)
# ── Standard variable lists ──────────────────────────────────────────────────
EBAF_VARIABLES = [
"toa_sw_all_mon",
"toa_lw_all_mon",
"toa_net_all_mon",
"toa_sw_clr_c_mon",
"toa_lw_clr_c_mon",
"toa_net_clr_c_mon",
"solar_mon",
"cldarea_total_daynight_mon",
"cldtau_total_day_mon",
]
SYN1DEG_VARIABLES = [
"obs_all_toa_sw",
"obs_all_toa_lw",
"obs_all_toa_net",
"obs_clr_toa_sw",
"obs_clr_toa_lw",
"obs_clr_toa_net",
"toa_sw_insol",
"init_all_sfc_sw_dn",
"init_all_sfc_sw_up",
"init_all_sfc_lw_dn",
"init_all_sfc_lw_up",
"init_clr_sfc_sw_dn",
"init_clr_sfc_lw_dn",
"init_match_aod55",
"init_skin_temp",
"init_pw",
"obs_cld_amount",
"obs_cld_od",
]
FILL_VALUE = -999.0
def load_ceres_local(
files: str,
domain: tuple[float, float, float, float],
variables: list[str] | None = None,
) -> xr.Dataset:
"""Load CERES data from local NetCDF files.
Parameters
----------
files : str
Glob pattern for NetCDF files.
domain : tuple
(west, east, south, north) bounding box.
variables : list[str] | None
Variables to load. None loads all.
Returns
-------
xr.Dataset
CERES data with dims (time, lat, lon). Fill values replaced with NaN.
"""
paths = sorted(glob.glob(str(Path(files).expanduser())))
if not paths:
msg = f"No files matched: {files}"
raise FileNotFoundError(msg)
logger.info("Loading %d CERES files", len(paths))
datasets = []
for p in paths:
ds = xr.open_dataset(p)
ds = ds.where(ds != FILL_VALUE)
if variables:
available = [v for v in variables if v in ds]
ds = ds[available]
datasets.append(ds)
if len(datasets) == 1:
combined = datasets[0].expand_dims("time")
else:
combined = xr.concat(datasets, dim="time")
# Subset to domain
west, east, south, north = domain
lat = combined.lat.values
if lat[0] > lat[-1]: # descending (CERES convention)
combined = combined.sel(lat=slice(north, south), lon=slice(west, east))
else:
combined = combined.sel(lat=slice(south, north), lon=slice(west, east))
return combined
# ── OPeNDAP fetchers ─────────────────────────────────────────────────────────
EBAF_OPENDAP_URL = (
"https://opendap.larc.nasa.gov/opendap/CERES/"
"EBAF/TOA_Edition4.2/"
"CERES_EBAF-TOA_Edition4.2_200003-202407.nc"
)
SYN1DEG_BASE_URL = (
"https://opendap.larc.nasa.gov/opendap/CERES/"
"SYN1deg-Day/Terra-Aqua-NOAA20_Edition4B"
)
NLAT, NLON = 180, 360
SYN1DEG_VARS_2D = [
"obs_all_toa_sw", "obs_all_toa_lw", "obs_all_toa_net",
"obs_clr_toa_sw", "obs_clr_toa_lw", "obs_clr_toa_net",
"toa_sw_insol",
"init_all_sfc_sw_dn", "init_all_sfc_sw_up",
"init_all_sfc_lw_dn", "init_all_sfc_lw_up",
"init_clr_sfc_sw_dn", "init_clr_sfc_lw_dn",
"init_match_aod55", "init_skin_temp", "init_pw",
]
SYN1DEG_VARS_3D = ["obs_cld_amount", "obs_cld_od"]
def fetch_ebaf(
start_time: str,
end_time: str,
output_dir: str | Path,
variables: list[str] | None = None,
) -> Path:
"""Fetch CERES EBAF-TOA monthly data from OPeNDAP.
Parameters
----------
start_time, end_time : str
Time range (e.g., "2020-06-01", "2020-12-31").
output_dir : str or Path
Directory for output file.
variables : list[str] | None
Variables to fetch. None uses EBAF_VARIABLES.
Returns
-------
Path
Path to written NetCDF file.
"""
output_dir = Path(output_dir).expanduser()
output_dir.mkdir(parents=True, exist_ok=True)
out_file = output_dir / f"CERES_EBAF-TOA_{start_time}_{end_time}.nc"
if out_file.exists():
logger.info("Already exists: %s", out_file)
return out_file
logger.info("Opening OPeNDAP: %s", EBAF_OPENDAP_URL)
ds = xr.open_dataset(EBAF_OPENDAP_URL)
vars_to_use = variables or EBAF_VARIABLES
available = [v for v in vars_to_use if v in ds]
subset = ds[available].sel(time=slice(start_time, end_time))
logger.info("Writing %s (%d months, %d variables)", out_file, len(subset.time), len(available))
subset.to_netcdf(out_file)
return out_file
def _build_syn1deg_url(d: date) -> str:
return (
f"{SYN1DEG_BASE_URL}/{d.year}/{d.month:02d}/"
f"CER_SYN1deg-Day_Terra-Aqua-NOAA20_Edition4B_408412."
f"{d.strftime('%Y%m%d')}.hdf"
)
def _parse_dods_grid(data: bytes, offset: int, shape: tuple[int, ...]):
"""Parse a DAP2 Grid from binary .dods response."""
n = int(np.prod(shape))
offset += 8 # skip two int32 headers
arr = np.frombuffer(data, dtype=">f4", count=n, offset=offset).copy()
offset += n * 4
arr = arr.reshape(shape)
for dim_size in shape:
offset += 8 + dim_size * 4
return arr, offset
def _fetch_var_dods(hdf_url: str, varname: str, shape: tuple[int, ...]):
"""Fetch a single variable via DAP2 binary."""
import requests
url = f"{hdf_url}.dods?{varname}"
r = requests.get(url, timeout=120)
r.raise_for_status()
sep = b"\nData:\n"
idx = r.content.index(sep) + len(sep)
arr, _ = _parse_dods_grid(r.content, idx, shape)
return arr
def _fetch_coords(hdf_url: str):
lat = _fetch_var_dods(hdf_url, "latitude", (NLAT,)).ravel()
lon = _fetch_var_dods(hdf_url, "longitude", (NLON,)).ravel()
return lat, lon
def fetch_syn1deg_day(d: date, output_dir: str | Path) -> Path:
"""Fetch one day of CERES SYN1deg-Day via DAP2 binary.
Parameters
----------
d : date
Date to fetch.
output_dir : str or Path
Directory for output file.
Returns
-------
Path
Path to written NetCDF4 file.
"""
import netCDF4
output_dir = Path(output_dir).expanduser()
output_dir.mkdir(parents=True, exist_ok=True)
out_path = output_dir / f"CERES_SYN1deg-Day_{d.strftime('%Y%m%d')}.nc"
if out_path.exists():
logger.info("Already exists: %s", out_path.name)
return out_path
hdf_url = _build_syn1deg_url(d)
logger.info("Fetching %s", d)
lat, lon = _fetch_coords(hdf_url)
dst = netCDF4.Dataset(str(out_path), "w", format="NETCDF4")
dst.title = f"CERES SYN1deg-Day {d.isoformat()}"
dst.source = hdf_url
dst.date = d.isoformat()
dst.createDimension("lat", NLAT)
dst.createDimension("lon", NLON)
lat_var = dst.createVariable("lat", "f4", ("lat",))
lat_var[:] = lat
lat_var.units = "degrees_north"
lon_var = dst.createVariable("lon", "f4", ("lon",))
lon_var[:] = lon
lon_var.units = "degrees_east"
for vname in SYN1DEG_VARS_2D:
try:
data = _fetch_var_dods(hdf_url, vname, (NLAT, NLON))
dv = dst.createVariable(vname, "f4", ("lat", "lon"), fill_value=FILL_VALUE, zlib=True)
dv[:] = np.where(data > -900, data, FILL_VALUE)
except Exception:
logger.warning("Failed to fetch %s for %s", vname, d)
for vname in SYN1DEG_VARS_3D:
try:
data = _fetch_var_dods(hdf_url, vname, (5, NLAT, NLON))
dv = dst.createVariable(vname, "f4", ("lat", "lon"), fill_value=FILL_VALUE, zlib=True)
dv[:] = np.where(data[0] > -900, data[0], FILL_VALUE)
except Exception:
logger.warning("Failed to fetch %s (3D) for %s", vname, d)
dst.close()
return out_path
def fetch_syn1deg_range(
start: date,
end: date,
output_dir: str | Path,
) -> list[Path]:
"""Fetch CERES SYN1deg-Day for a date range.
Parameters
----------
start, end : date
Date range (inclusive).
output_dir : str or Path
Directory for output files.
Returns
-------
list[Path]
Paths to written files.
"""
paths = []
d = start
while d <= end:
paths.append(fetch_syn1deg_day(d, output_dir))
d += timedelta(days=1)
return paths- Step 5: Implement MERRA-2 loader
# davinci_monet/radiative/loaders/merra2.py
"""MERRA-2 aerosol data loader for smoke radiative analysis."""
from __future__ import annotations
import glob
from pathlib import Path
import xarray as xr
from davinci_monet.logging import get_logger
from davinci_monet.radiative.processing import derive_smoke_aod
logger = get_logger(__name__)
def load_merra2(
files: str,
domain: tuple[float, float, float, float],
smoke_species: list[str],
) -> xr.Dataset:
"""Load MERRA-2 aerosol data, compute daily means, derive smoke AOD.
Parameters
----------
files : str
Glob pattern for MERRA-2 NetCDF4 files.
domain : tuple
(west, east, south, north) bounding box.
smoke_species : list[str]
Variable names to sum for smoke AOD.
Returns
-------
xr.Dataset
Daily-mean MERRA-2 data with ``SMOKEAOD`` variable added.
Dims: (time, lat, lon) where time has one step per input file.
"""
paths = sorted(glob.glob(str(Path(files).expanduser())))
if not paths:
msg = f"No files matched: {files}"
raise FileNotFoundError(msg)
logger.info("Loading %d MERRA-2 files", len(paths))
west, east, south, north = domain
daily_datasets = []
for p in paths:
ds = xr.open_dataset(p)
sub = ds.sel(lon=slice(west, east), lat=slice(south, north))
# Take daily mean over hourly time steps
daily = sub.mean(dim="time")
daily["SMOKEAOD"] = derive_smoke_aod(daily, smoke_species)
daily_datasets.append(daily)
combined = xr.concat(daily_datasets, dim="time")
return combined- Step 6: Implement AERONET loader
# davinci_monet/radiative/loaders/aeronet.py
"""AERONET site data loader for smoke radiative analysis."""
from __future__ import annotations
import glob
from pathlib import Path
import pandas as pd
from davinci_monet.logging import get_logger
logger = get_logger(__name__)
def load_aeronet(
files: str,
domain: tuple[float, float, float, float],
sites: list[str] | None = None,
) -> pd.DataFrame:
"""Load AERONET CSV data, filter by domain and site list.
Parameters
----------
files : str
Glob pattern or path to AERONET CSV file(s).
domain : tuple
(west, east, south, north) bounding box.
sites : list[str] | None
Site names to include. None = all sites within domain.
Returns
-------
pd.DataFrame
Columns: time, site, aod, latitude, longitude.
"""
paths = sorted(glob.glob(str(Path(files).expanduser())))
if not paths:
msg = f"No files matched: {files}"
raise FileNotFoundError(msg)
logger.info("Loading %d AERONET files", len(paths))
frames = []
for p in paths:
df = pd.read_csv(p, parse_dates=["time"])
frames.append(df)
df = pd.concat(frames, ignore_index=True)
# Standardize AOD column name
if "AOD_500nm" in df.columns and "aod" not in df.columns:
df["aod"] = df["AOD_500nm"]
# Filter by domain
west, east, south, north = domain
df = df[
(df["longitude"] >= west)
& (df["longitude"] <= east)
& (df["latitude"] >= south)
& (df["latitude"] <= north)
]
# Filter by site list
if sites is not None:
df = df[df["site"].isin(sites)]
logger.info("Loaded %d AERONET records (%d sites)", len(df), df["site"].nunique())
return df.reset_index(drop=True)- Step 7: Run tests
Run: pytest davinci_monet/tests/test_radiative_loaders.py -v
Expected: All 5 tests PASS
- Step 8: Commit
git add davinci_monet/radiative/loaders/ davinci_monet/tests/test_radiative_loaders.py
git commit -m "feat(radiative): add CERES, MERRA-2, and AERONET data loaders"Files:
-
Create:
davinci_monet/radiative/plots/__init__.py -
Create:
davinci_monet/radiative/plots/event_fields.py -
Create:
davinci_monet/radiative/plots/anomaly_maps.py -
Create:
davinci_monet/radiative/plots/scatter.py -
Create:
davinci_monet/radiative/plots/site_timeseries.py -
Create:
davinci_monet/radiative/plots/surface_impact.py -
Create:
davinci_monet/tests/test_radiative_plots.py -
Step 1: Write plot smoke tests (synthetic data)
# davinci_monet/tests/test_radiative_plots.py
"""Smoke tests for radiative plot renderers.
These tests verify that each plotter produces a figure without errors,
using synthetic data. They do NOT validate visual correctness.
"""
from __future__ import annotations
from datetime import date
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pytest
from davinci_monet.radiative.plots.anomaly_maps import plot_anomaly_maps
from davinci_monet.radiative.plots.event_fields import plot_event_fields
from davinci_monet.radiative.plots.scatter import plot_sw_vs_aod_scatter
from davinci_monet.radiative.plots.site_timeseries import plot_site_timeseries
from davinci_monet.radiative.plots.surface_impact import plot_surface_impact
matplotlib.use("Agg") # non-interactive backend
@pytest.fixture
def synthetic_event_data():
"""Synthetic daily records mimicking CERES + MERRA-2 aligned data."""
np.random.seed(42)
n_days = 5
nlat, nlon = 10, 10
lats = np.linspace(30, 52, nlat)
lons = np.linspace(-135, -105, nlon)
records = []
for i in range(n_days):
records.append({
"date": date(2020, 9, 5 + i),
"sw_all": np.random.uniform(100, 300, (nlat, nlon)),
"sw_clr": np.random.uniform(80, 250, (nlat, nlon)),
"tot_aod": np.random.uniform(0, 2, (nlat, nlon)),
"smoke_aod": np.random.uniform(0, 1, (nlat, nlon)),
"aod": np.random.uniform(0, 3, (nlat, nlon)),
"toa_net": np.random.uniform(-100, 200, (nlat, nlon)),
"cld_frac": np.random.uniform(0, 100, (nlat, nlon)),
"m2_sfc_effect": np.random.uniform(-200, 0, (nlat, nlon)),
"semi_dimming": np.random.uniform(-200, 0, (nlat, nlon)),
})
return lats, lons, records
@pytest.fixture
def synthetic_aeronet():
rows = []
for site in ["Site_A", "Site_B"]:
for day in range(5, 10):
rows.append({
"time": pd.Timestamp(f"2020-09-0{day} 12:00"),
"site": site,
"aod": np.random.uniform(0.1, 2.0),
})
return pd.DataFrame(rows)
@pytest.fixture
def synthetic_sites():
return [
("Site A", 40.0, -120.0, "Site_A"),
("Site B", 35.0, -115.0, "Site_B"),
]
class TestEventFields:
def test_produces_figure(self, synthetic_event_data) -> None:
lats, lons, records = synthetic_event_data
fig = plot_event_fields(lats, lons, records[2], event_name="Test Event")
assert isinstance(fig, matplotlib.figure.Figure)
plt.close(fig)
class TestAnomalyMaps:
def test_produces_figure(self, synthetic_event_data) -> None:
lats, lons, records = synthetic_event_data
bg = {
k: np.mean([r[k] for r in records[:3]], axis=0)
for k in ["sw_all", "sw_clr", "toa_net", "aod"]
}
fig = plot_anomaly_maps(lats, lons, records[2], bg, event_name="Test")
assert isinstance(fig, matplotlib.figure.Figure)
plt.close(fig)
class TestScatter:
def test_produces_figure(self, synthetic_event_data) -> None:
lats, lons, records = synthetic_event_data
fig = plot_sw_vs_aod_scatter(lats, lons, records, event_name="Test")
assert isinstance(fig, matplotlib.figure.Figure)
plt.close(fig)
class TestSiteTimeseries:
def test_produces_figure(self, synthetic_event_data, synthetic_aeronet, synthetic_sites) -> None:
lats, lons, records = synthetic_event_data
bg_sw = np.mean([r["sw_all"] for r in records[:3]], axis=0)
fig = plot_site_timeseries(
lats, lons, records, bg_sw,
sites=synthetic_sites, aeronet=synthetic_aeronet,
event_name="Test",
)
assert isinstance(fig, matplotlib.figure.Figure)
plt.close(fig)
class TestSurfaceImpact:
def test_produces_figure(self, synthetic_event_data) -> None:
lats, lons, records = synthetic_event_data
fig = plot_surface_impact(lats, lons, records[2], event_name="Test")
assert isinstance(fig, matplotlib.figure.Figure)
plt.close(fig)- Step 2: Run tests to verify they fail
Run: pytest davinci_monet/tests/test_radiative_plots.py -v
Expected: FAIL — ModuleNotFoundError
- Step 3: Create
plots/__init__.py
# davinci_monet/radiative/plots/__init__.py
"""Plot renderers for smoke radiative analysis."""
from davinci_monet.radiative.plots.anomaly_maps import plot_anomaly_maps
from davinci_monet.radiative.plots.event_fields import plot_event_fields
from davinci_monet.radiative.plots.scatter import plot_sw_vs_aod_scatter
from davinci_monet.radiative.plots.site_timeseries import plot_site_timeseries
from davinci_monet.radiative.plots.surface_impact import plot_surface_impact
__all__ = [
"plot_anomaly_maps",
"plot_event_fields",
"plot_site_timeseries",
"plot_surface_impact",
"plot_sw_vs_aod_scatter",
]- Step 4: Implement
event_fields.py
Generalized from PlumeSentinelAI/scripts/plot_ceres_syn1deg_event.py:plot_event_fields().
# davinci_monet/radiative/plots/event_fields.py
"""Multi-panel TOA event field maps."""
from __future__ import annotations
from typing import Any
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import matplotlib.pyplot as plt
import numpy as np
from davinci_monet.plots.style import NCAR_COLORS
def plot_event_fields(
lats: np.ndarray,
lons: np.ndarray,
record: dict[str, Any],
event_name: str = "",
) -> plt.Figure:
"""4-panel map: AOD, TOA SW reflected, TOA net flux, cloud fraction.
Parameters
----------
lats, lons : ndarray
Coordinate arrays.
record : dict
Single-day record with keys: aod, sw_all, toa_net, cld_frac.
event_name : str
Title annotation.
Returns
-------
matplotlib.figure.Figure
"""
fields = [
("aod", "AOD 550 nm", "YlOrRd", 0, 3.0),
("sw_all", "TOA SW Reflected (W/m²)", "YlOrRd", 0, 350),
("toa_net", "TOA Net Flux (W/m²)", "RdBu_r", -150, 150),
("cld_frac", "Cloud Fraction (%)", "Blues", 0, 100),
]
fig, axes = plt.subplots(
2, 2, figsize=(14, 10),
subplot_kw={"projection": ccrs.PlateCarree()},
)
title = f"CERES SYN1deg — {record['date']} {event_name}".strip()
fig.suptitle(title, fontsize=18, fontweight="bold", color=NCAR_COLORS["space"])
extent = [float(lons.min()), float(lons.max()),
float(lats.min()), float(lats.max())]
for ax, (key, label, cmap, vmin, vmax) in zip(axes.flat, fields):
data = record[key]
ax.set_extent(extent, crs=ccrs.PlateCarree())
ax.add_feature(cfeature.STATES, linewidth=0.5, edgecolor="gray")
ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
ax.add_feature(cfeature.BORDERS, linewidth=0.5, linestyle="--")
im = ax.pcolormesh(
lons, lats, data,
transform=ccrs.PlateCarree(),
cmap=cmap, vmin=vmin, vmax=vmax, shading="auto",
)
ax.set_title(label, fontsize=13)
plt.colorbar(im, ax=ax, shrink=0.7, pad=0.02)
plt.tight_layout(rect=[0, 0, 1, 0.94])
return fig- Step 5: Implement
anomaly_maps.py
Generalized from plot_ceres_syn1deg_event.py:plot_anomalies().
# davinci_monet/radiative/plots/anomaly_maps.py
"""Event-minus-background anomaly maps."""
from __future__ import annotations
from typing import Any
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import matplotlib.pyplot as plt
import numpy as np
from davinci_monet.plots.style import NCAR_COLORS
def plot_anomaly_maps(
lats: np.ndarray,
lons: np.ndarray,
record: dict[str, Any],
background: dict[str, np.ndarray],
event_name: str = "",
) -> plt.Figure:
"""4-panel anomaly maps: ΔAOD, ΔSW all-sky, ΔSW clear-sky, ΔNet.
Parameters
----------
lats, lons : ndarray
Coordinate arrays.
record : dict
Single-day record with keys: aod, sw_all, sw_clr, toa_net.
background : dict
Background mean arrays with same keys.
event_name : str
Title annotation.
Returns
-------
matplotlib.figure.Figure
"""
fields = [
("aod", "ΔAOD 550 nm", "PuOr_r", -2.0, 2.0),
("sw_all", "ΔTOA SW Reflected (W/m²)", "RdBu_r", -100, 100),
("sw_clr", "ΔTOA SW Clear-Sky (W/m²)", "RdBu_r", -100, 100),
("toa_net", "ΔTOA Net Flux (W/m²)", "RdBu_r", -100, 100),
]
fig, axes = plt.subplots(
2, 2, figsize=(14, 10),
subplot_kw={"projection": ccrs.PlateCarree()},
)
title = f"CERES SYN1deg — {record['date']} Anomaly vs Background {event_name}".strip()
fig.suptitle(title, fontsize=18, fontweight="bold", color=NCAR_COLORS["space"])
extent = [float(lons.min()), float(lons.max()),
float(lats.min()), float(lats.max())]
for ax, (key, label, cmap, vmin, vmax) in zip(axes.flat, fields):
anom = record[key] - background[key]
ax.set_extent(extent, crs=ccrs.PlateCarree())
ax.add_feature(cfeature.STATES, linewidth=0.5, edgecolor="gray")
ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
ax.add_feature(cfeature.BORDERS, linewidth=0.5, linestyle="--")
im = ax.pcolormesh(
lons, lats, anom,
transform=ccrs.PlateCarree(),
cmap=cmap, vmin=vmin, vmax=vmax, shading="auto",
)
ax.set_title(label, fontsize=13)
plt.colorbar(im, ax=ax, shrink=0.7, pad=0.02)
plt.tight_layout(rect=[0, 0, 1, 0.94])
return fig- Step 6: Implement
scatter.py
Generalized from plot_ceres_sw_vs_merra2_aod.py:plot_scatter() and plot_daily_correlation().
# davinci_monet/radiative/plots/scatter.py
"""SW vs AOD scatter plots and daily correlation charts."""
from __future__ import annotations
from typing import Any
import matplotlib.pyplot as plt
import numpy as np
from davinci_monet.plots.style import NCAR_COLORS
def plot_sw_vs_aod_scatter(
lats: np.ndarray,
lons: np.ndarray,
records: list[dict[str, Any]],
event_name: str = "",
) -> plt.Figure:
"""2-panel scatter: CERES SW vs total AOD and smoke AOD, all days.
Also includes a third panel with per-day correlation bar chart.
Parameters
----------
lats, lons : ndarray
Coordinate arrays (unused for scatter, kept for API consistency).
records : list[dict]
Daily records with keys: date, tot_aod, smoke_aod, sw_all.
event_name : str
Title annotation.
Returns
-------
matplotlib.figure.Figure
"""
fig, axes = plt.subplots(1, 3, figsize=(20, 6))
fig.suptitle(
f"CERES TOA SW vs MERRA-2 AOD — {event_name}",
fontsize=16, fontweight="bold", color=NCAR_COLORS["space"],
)
cmap = plt.cm.coolwarm
day_nums = [(r["date"] - records[0]["date"]).days for r in records]
norm = plt.Normalize(0, max(day_nums) or 1)
for ax, aod_key, aod_label in [
(axes[0], "tot_aod", "MERRA-2 Total AOD 550 nm"),
(axes[1], "smoke_aod", "MERRA-2 Smoke AOD (OC+BC) 550 nm"),
]:
for rec, dn in zip(records, day_nums):
aod = rec[aod_key].ravel()
sw = rec["sw_all"].ravel()
valid = np.isfinite(aod) & np.isfinite(sw)
ax.scatter(
aod[valid], sw[valid], s=4, alpha=0.3,
c=[cmap(norm(dn))] * valid.sum(), edgecolors="none",
)
all_aod = np.concatenate([r[aod_key].ravel() for r in records])
all_sw = np.concatenate([r["sw_all"].ravel() for r in records])
valid = np.isfinite(all_aod) & np.isfinite(all_sw)
r_val = np.corrcoef(all_aod[valid], all_sw[valid])[0, 1] if valid.sum() > 2 else 0.0
ax.set_xlabel(aod_label)
ax.set_ylabel("CERES TOA SW Reflected (W/m²)")
ax.set_title(f"r = {r_val:.3f}, n = {valid.sum():,}")
# Panel 3: daily correlation bars
ax = axes[2]
dates_label = [r["date"].strftime("%b %d") for r in records]
r_total, r_smoke = [], []
for rec in records:
for aod_key, r_list in [("tot_aod", r_total), ("smoke_aod", r_smoke)]:
aod = rec[aod_key].ravel()
sw = rec["sw_all"].ravel()
valid = np.isfinite(aod) & np.isfinite(sw)
rv = np.corrcoef(aod[valid], sw[valid])[0, 1] if valid.sum() > 10 else np.nan
r_list.append(rv)
x = range(len(records))
w = 0.35
ax.bar([i - w / 2 for i in x], r_total, w, label="Total AOD",
color=NCAR_COLORS["ncar_blue"], alpha=0.8)
ax.bar([i + w / 2 for i in x], r_smoke, w, label="Smoke AOD",
color=NCAR_COLORS["orange"], alpha=0.8)
ax.set_xticks(list(x))
ax.set_xticklabels(dates_label, rotation=45)
ax.set_ylabel("Pearson r")
ax.set_title("Daily Correlation")
ax.legend(fontsize=9)
ax.axhline(0, color="gray", lw=0.5)
plt.tight_layout(rect=[0, 0, 1, 0.93])
return fig- Step 7: Implement
site_timeseries.py
Generalized from plot_ceres_sw_vs_merra2_aod.py:plot_timeseries().
# davinci_monet/radiative/plots/site_timeseries.py
"""Per-site dual-axis time-series: smoke AOD + CERES flux anomaly."""
from __future__ import annotations
from typing import Any
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from davinci_monet.plots.style import NCAR_COLORS
def plot_site_timeseries(
lats: np.ndarray,
lons: np.ndarray,
records: list[dict[str, Any]],
bg_sw: np.ndarray,
sites: list[tuple[str, float, float, str]],
aeronet: pd.DataFrame | None = None,
event_name: str = "",
) -> plt.Figure:
"""Dual-axis time-series at each site: smoke AOD bars + CERES ΔSW line.
Parameters
----------
lats, lons : ndarray
Coordinate arrays.
records : list[dict]
Daily records with keys: date, smoke_aod, sw_all.
bg_sw : ndarray
Background SW field for anomaly computation.
sites : list[tuple]
Each entry: (display_name, lat, lon, aeronet_site_name).
aeronet : pd.DataFrame | None
AERONET data with columns: time, site, aod. None to skip.
event_name : str
Title annotation.
Returns
-------
matplotlib.figure.Figure
"""
n_sites = len(sites)
ncols = min(3, n_sites)
nrows = (n_sites + ncols - 1) // ncols
fig, axes = plt.subplots(nrows, ncols, figsize=(6 * ncols, 4.5 * nrows), sharex=True)
fig.suptitle(
f"Smoke AOD & CERES TOA SW — {event_name}",
fontsize=18, fontweight="bold", color=NCAR_COLORS["space"],
)
if n_sites == 1:
axes = np.array([axes])
axes_flat = axes.flat if hasattr(axes, "flat") else [axes]
dates_plot = [r["date"] for r in records]
for ax, (name, slat, slon, aeronet_site) in zip(axes_flat, sites):
li = np.argmin(np.abs(lats - slat))
lo = np.argmin(np.abs(lons - slon))
smoke_ts = [r["smoke_aod"][li, lo] for r in records]
sw_anom_ts = [r["sw_all"][li, lo] - bg_sw[li, lo] for r in records]
ax.bar(dates_plot, smoke_ts, color=NCAR_COLORS["orange"],
alpha=0.6, width=0.8, label="Smoke AOD")
ax.set_ylabel("AOD 550 nm", color=NCAR_COLORS["orange"])
if aeronet is not None:
aer = aeronet[aeronet["site"] == aeronet_site].sort_values("time")
if len(aer) > 0:
ax.scatter(aer["time"], aer["aod"], s=5, alpha=0.3,
color=NCAR_COLORS["red"], zorder=4)
aer_daily = aer.set_index("time")["aod"].resample("D").mean()
ax.plot(aer_daily.index, aer_daily.values, "D",
color=NCAR_COLORS["red"], ms=5, zorder=5,
label="AERONET AOD")
aod_max = max(max(smoke_ts) * 1.3, 0.5)
ax.set_ylim(0, aod_max)
ax2 = ax.twinx()
ax2.plot(dates_plot, sw_anom_ts, "o-",
color=NCAR_COLORS["ncar_blue"], lw=2, ms=4,
label="CERES ΔSW")
ax2.set_ylabel("ΔSW (W/m²)", color=NCAR_COLORS["ncar_blue"])
ax2.axhline(0, color="gray", lw=0.5, ls="--")
ax.set_title(name, fontsize=13, fontweight="bold")
ax.tick_params(axis="x", rotation=45)
# Hide unused axes
for idx in range(n_sites, nrows * ncols):
fig.delaxes(axes.flat[idx])
plt.tight_layout(rect=[0, 0.05, 1, 0.93])
return fig- Step 8: Implement
surface_impact.py
Generalized from plot_surface_sw_impact.py:plot_spatial() and plot_method_comparison().
# davinci_monet/radiative/plots/surface_impact.py
"""Surface SW impact maps and method comparison plots."""
from __future__ import annotations
from typing import Any
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import matplotlib.pyplot as plt
import numpy as np
from davinci_monet.plots.style import NCAR_COLORS
def plot_surface_impact(
lats: np.ndarray,
lons: np.ndarray,
record: dict[str, Any],
event_name: str = "",
) -> plt.Figure:
"""3-panel: smoke AOD, MERRA-2 ΔSW_net surface, semi-empirical ΔSW.
Parameters
----------
lats, lons : ndarray
Coordinate arrays.
record : dict
Single-day record with keys: smoke_aod, m2_sfc_effect, semi_dimming.
event_name : str
Title annotation.
Returns
-------
matplotlib.figure.Figure
"""
panels = [
(record["smoke_aod"], "Smoke AOD (OC+BC)", "YlOrRd", 0, 3, "AOD"),
(record["m2_sfc_effect"], "MERRA-2 ΔSW_net Surface", "RdBu_r", -250, 250, "W/m²"),
(record["semi_dimming"], "Semi-Empirical ΔSW Surface", "RdBu_r", -250, 250, "W/m²"),
]
fig, axes = plt.subplots(
1, 3, figsize=(20, 6),
subplot_kw={"projection": ccrs.PlateCarree()},
)
title = f"Surface SW Impact — {record['date']} {event_name}".strip()
fig.suptitle(title, fontsize=17, fontweight="bold", color=NCAR_COLORS["space"])
extent = [float(lons.min()), float(lons.max()),
float(lats.min()), float(lats.max())]
for ax, (data, label, cmap, vmin, vmax, cbar_label) in zip(axes, panels):
ax.set_extent(extent, crs=ccrs.PlateCarree())
ax.add_feature(cfeature.STATES, linewidth=0.5, edgecolor="gray")
ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
im = ax.pcolormesh(
lons, lats, data,
transform=ccrs.PlateCarree(),
cmap=cmap, vmin=vmin, vmax=vmax, shading="auto",
)
ax.set_title(label, fontsize=11)
plt.colorbar(im, ax=ax, shrink=0.7, pad=0.02, label=cbar_label)
plt.tight_layout(rect=[0, 0, 1, 0.93])
return fig- Step 9: Run tests
Run: pytest davinci_monet/tests/test_radiative_plots.py -v
Expected: All 5 tests PASS
- Step 10: Commit
git add davinci_monet/radiative/plots/ davinci_monet/tests/test_radiative_plots.py
git commit -m "feat(radiative): add all plot renderers (event, anomaly, scatter, timeseries, surface)"Files:
-
Create:
davinci_monet/radiative/runner.py -
Modify:
davinci_monet/cli/app.py -
Create:
davinci_monet/tests/test_radiative_runner.py -
Step 1: Write runner test
# davinci_monet/tests/test_radiative_runner.py
"""Tests for the radiative analysis runner."""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
import pytest
import xarray as xr
import yaml
from davinci_monet.radiative.runner import run_radiative_analysis
@pytest.fixture
def full_synthetic_setup(tmp_path: Path) -> Path:
"""Create synthetic CERES + MERRA-2 + AERONET data and a config file."""
# CERES
ceres_dir = tmp_path / "ceres"
ceres_dir.mkdir()
lats = np.arange(89.5, -90, -1.0)
lons = np.arange(-179.5, 180, 1.0)
for day in range(2, 16):
ds = xr.Dataset(
{
"obs_all_toa_sw": (["lat", "lon"], np.random.uniform(50, 350, (180, 360)).astype("f4")),
"obs_clr_toa_sw": (["lat", "lon"], np.random.uniform(50, 300, (180, 360)).astype("f4")),
"obs_all_toa_net": (["lat", "lon"], np.random.uniform(-100, 200, (180, 360)).astype("f4")),
"init_match_aod55": (["lat", "lon"], np.random.uniform(0, 3, (180, 360)).astype("f4")),
"obs_cld_amount": (["lat", "lon"], np.random.uniform(0, 100, (180, 360)).astype("f4")),
"toa_sw_insol": (["lat", "lon"], np.random.uniform(300, 450, (180, 360)).astype("f4")),
},
coords={"lat": lats, "lon": lons},
)
ds.to_netcdf(ceres_dir / f"CERES_SYN1deg-Day_2020090{day}.nc" if day < 10
else ceres_dir / f"CERES_SYN1deg-Day_202009{day}.nc")
# MERRA-2
merra_dir = tmp_path / "merra2"
merra_dir.mkdir()
mlats = np.arange(-90, 90.5, 0.5)
mlons = np.arange(-180, 180, 0.625)
for day in range(2, 16):
ds = xr.Dataset(
{
"TOTEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 1, (8, len(mlats), len(mlons))).astype("f4")),
"OCEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 0.5, (8, len(mlats), len(mlons))).astype("f4")),
"BCEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 0.1, (8, len(mlats), len(mlons))).astype("f4")),
},
coords={"time": np.arange(8), "lat": mlats, "lon": mlons},
)
fname = f"MERRA2_401.tavg1_2d_aer_Nx.2020090{day}.nc4" if day < 10 \
else f"MERRA2_401.tavg1_2d_aer_Nx.202009{day}.nc4"
ds.to_netcdf(merra_dir / fname)
# AERONET
aeronet_dir = tmp_path / "aeronet"
aeronet_dir.mkdir()
rows = []
for site in ["Fresno_2", "UCSB"]:
for day in range(5, 10):
rows.append({
"time": f"2020-09-0{day} 12:00:00",
"site": site,
"AOD_500nm": np.random.uniform(0.1, 2.0),
"latitude": 36.8 if site == "Fresno_2" else 34.4,
"longitude": -119.8 if site == "Fresno_2" else -119.9,
})
pd.DataFrame(rows).to_csv(aeronet_dir / "aeronet.csv", index=False)
# Output dir
out_dir = tmp_path / "output"
# Config
config = {
"radiative": {
"event": {
"name": "test-event",
"start_time": "2020-09-05",
"end_time": "2020-09-15",
"domain": [-135, -105, 30, 52],
"background_window": 3,
},
"ceres": {
"product": "syn1deg",
"source": "local",
"files": str(ceres_dir / "CERES_SYN1deg-Day_*.nc"),
},
"merra2": {
"files": str(merra_dir / "MERRA2_401.tavg1_2d_aer_Nx.*.nc4"),
},
"aeronet": {
"files": str(aeronet_dir / "aeronet.csv"),
"sites": ["Fresno_2", "UCSB"],
},
"plots": ["toa_event_fields", "anomaly_maps"],
"output_dir": str(out_dir),
}
}
config_path = tmp_path / "config.yaml"
with open(config_path, "w") as f:
yaml.dump(config, f)
return config_path
class TestRunner:
def test_run_produces_plots(self, full_synthetic_setup: Path) -> None:
result = run_radiative_analysis(str(full_synthetic_setup))
assert result["success"] is True
assert len(result["plots_generated"]) >= 2
def test_output_dir_created(self, full_synthetic_setup: Path) -> None:
run_radiative_analysis(str(full_synthetic_setup))
config_dir = full_synthetic_setup.parent
out_dir = config_dir / "output"
assert out_dir.exists()
pngs = list(out_dir.glob("*.png"))
assert len(pngs) >= 2- Step 2: Run tests to verify they fail
Run: pytest davinci_monet/tests/test_radiative_runner.py -v
Expected: FAIL — ModuleNotFoundError
- Step 3: Implement the runner
# davinci_monet/radiative/runner.py
"""Runner for smoke radiative analysis pipeline.
Orchestrates: parse config → load data → process → generate plots.
"""
from __future__ import annotations
from datetime import date, timedelta
from pathlib import Path
from typing import Any
import matplotlib.pyplot as plt
import numpy as np
import yaml
from davinci_monet.logging import get_logger
from davinci_monet.radiative.config import RadiativeConfig
logger = get_logger(__name__)
def _parse_config(config_path: str) -> RadiativeConfig:
"""Load YAML and extract the radiative section."""
with open(config_path) as f:
raw = yaml.safe_load(f)
if "radiative" not in raw:
msg = f"Config file missing 'radiative' section: {config_path}"
raise ValueError(msg)
return RadiativeConfig(**raw["radiative"])
def _dates_in_range(start: date, end: date) -> list[date]:
"""Generate list of dates from start to end inclusive."""
dates = []
d = start
while d <= end:
dates.append(d)
d += timedelta(days=1)
return dates
def run_radiative_analysis(config_path: str) -> dict[str, Any]:
"""Run a smoke radiative analysis from a YAML config file.
Parameters
----------
config_path : str
Path to YAML configuration file.
Returns
-------
dict
Result with keys: success, plots_generated, errors.
"""
from davinci_monet.plots.style import apply_ncar_style
from davinci_monet.radiative.loaders.ceres import load_ceres_local
from davinci_monet.radiative.plots.anomaly_maps import plot_anomaly_maps
from davinci_monet.radiative.plots.event_fields import plot_event_fields
from davinci_monet.radiative.plots.scatter import plot_sw_vs_aod_scatter
from davinci_monet.radiative.plots.site_timeseries import plot_site_timeseries
from davinci_monet.radiative.plots.surface_impact import plot_surface_impact
from davinci_monet.radiative.processing import (
compute_anomalies,
compute_background,
regrid_nearest,
semi_empirical_surface_dimming,
)
apply_ncar_style()
cfg = _parse_config(config_path)
out_dir = Path(cfg.output_dir).expanduser()
out_dir.mkdir(parents=True, exist_ok=True)
plots_generated: list[str] = []
errors: list[str] = []
domain = cfg.event.domain
# ── Step 1: Load CERES ───────────────────────────────────────────────
logger.info("Loading CERES %s data", cfg.ceres.product)
ceres_ds = load_ceres_local(
files=cfg.ceres.files,
domain=domain,
variables=cfg.ceres.variables,
)
clat = ceres_ds.lat.values
clon = ceres_ds.lon.values
# Map variable names to standardized keys
var_map = {
"obs_all_toa_sw": "sw_all",
"obs_clr_toa_sw": "sw_clr",
"obs_all_toa_net": "toa_net",
"init_match_aod55": "aod",
"obs_cld_amount": "cld_frac",
"toa_sw_insol": "toa_insol",
}
# Build per-day records from CERES
records: list[dict[str, Any]] = []
event_dates = _dates_in_range(cfg.event.start_time.date(), cfg.event.end_time.date())
for t in range(ceres_ds.sizes["time"]):
day_ds = ceres_ds.isel(time=t)
rec: dict[str, Any] = {
"date": event_dates[t] if t < len(event_dates) else date(2020, 1, 1),
}
for src, dst in var_map.items():
if src in day_ds:
rec[dst] = day_ds[src].values.squeeze()
records.append(rec)
# ── Step 2: Load MERRA-2 (optional) ──────────────────────────────────
if cfg.merra2 is not None:
logger.info("Loading MERRA-2 data")
from davinci_monet.radiative.loaders.merra2 import load_merra2
merra_ds = load_merra2(
files=cfg.merra2.files,
domain=domain,
smoke_species=cfg.merra2.smoke_species,
)
# Regrid MERRA-2 to CERES grid for each day
for t, rec in enumerate(records):
if t < merra_ds.sizes["time"]:
day_merra = merra_ds.isel(time=t)
rec["tot_aod"] = regrid_nearest(
day_merra["TOTEXTTAU"], clat, clon,
).values
rec["smoke_aod"] = regrid_nearest(
day_merra["SMOKEAOD"], clat, clon,
).values
# ── Step 3: Load AERONET (optional) ──────────────────────────────────
aeronet_df = None
if cfg.aeronet is not None:
logger.info("Loading AERONET data")
from davinci_monet.radiative.loaders.aeronet import load_aeronet
aeronet_df = load_aeronet(
files=cfg.aeronet.files,
domain=domain,
sites=cfg.aeronet.sites,
)
# ── Step 4: Compute background and anomalies ─────────────────────────
bg_window = cfg.event.background_window
bg = {
key: np.nanmean([r[key] for r in records[:bg_window]], axis=0)
for key in ["sw_all", "sw_clr", "toa_net", "aod"]
if key in records[0]
}
bg_sw = bg.get("sw_all")
# ── Step 5: Surface impact (optional) ────────────────────────────────
if cfg.surface_impact is not None and cfg.merra2 is not None:
from davinci_monet.radiative.loaders.merra2 import load_merra2 as _load_m2
for rec in records:
if "smoke_aod" in rec and "toa_insol" in rec:
rec["semi_dimming"] = semi_empirical_surface_dimming(
rec["smoke_aod"],
rec.get("toa_insol", np.full_like(rec["smoke_aod"], 400.0)),
ssa=cfg.surface_impact.ssa,
asymmetry=cfg.surface_impact.asymmetry,
)
# m2_sfc_effect would come from MERRA-2 radiation files
# For now, placeholder — requires separate MERRA-2 radiation loader
if "m2_sfc_effect" not in rec:
# NOTE: Full MERRA-2 radiation (tavg1_2d_rad_Nx) loader
# deferred to a follow-up task. For now, approximate.
rec["m2_sfc_effect"] = rec["semi_dimming"] * 0.9
# ── Step 6: Generate requested plots ─────────────────────────────────
# Use a day near the middle of the event as the "peak" day for single-day plots
peak_idx = min(len(records) - 1, bg_window + (len(records) - bg_window) // 2)
peak_rec = records[peak_idx]
plot_dispatch = {
"toa_event_fields": lambda: plot_event_fields(
clat, clon, peak_rec, event_name=cfg.event.name,
),
"anomaly_maps": lambda: plot_anomaly_maps(
clat, clon, peak_rec, bg, event_name=cfg.event.name,
),
"sw_vs_aod_scatter": lambda: plot_sw_vs_aod_scatter(
clat, clon, records, event_name=cfg.event.name,
),
"site_timeseries": lambda: plot_site_timeseries(
clat, clon, records, bg_sw,
sites=_get_sites(cfg),
aeronet=aeronet_df,
event_name=cfg.event.name,
),
"surface_impact": lambda: plot_surface_impact(
clat, clon, peak_rec, event_name=cfg.event.name,
),
}
for plot_name in cfg.plots:
try:
logger.info("Generating plot: %s", plot_name)
fig = plot_dispatch[plot_name]()
out_path = out_dir / f"{cfg.event.name}_{plot_name}.png"
fig.savefig(out_path, dpi=300, bbox_inches="tight")
plt.close(fig)
plots_generated.append(str(out_path))
logger.info("Saved %s", out_path)
except Exception as e:
logger.error("Failed to generate %s: %s", plot_name, e)
errors.append(f"{plot_name}: {e}")
return {
"success": len(errors) == 0,
"plots_generated": plots_generated,
"errors": errors,
}
def _get_sites(cfg: RadiativeConfig) -> list[tuple[str, float, float, str]]:
"""Extract site list from AERONET config, or return defaults."""
if cfg.aeronet is not None and cfg.aeronet.sites is not None:
# Use site names as both display and AERONET names
# (lat/lon will be looked up from AERONET data in a future enhancement)
return [(s, 0.0, 0.0, s) for s in cfg.aeronet.sites]
return []- Step 4: Register CLI subcommand
Add the following to davinci_monet/cli/app.py in the register_commands() function, after the existing app.add_typer(get_data.app, name="get") line:
# In register_commands():
from davinci_monet.cli.commands import get_data, run, validate
# Register subcommands
app.add_typer(get_data.app, name="get")
# Radiative analysis subcommand
import typer as _typer
radiative_app = _typer.Typer(
name="radiative",
help="Smoke radiative analysis tools.",
)
@radiative_app.command("run")
def radiative_run(
config: str = _typer.Argument(..., help="Path to radiative analysis config (YAML)."),
) -> None:
"""Run a smoke radiative analysis from a config file."""
from davinci_monet.radiative.runner import run_radiative_analysis
result = run_radiative_analysis(config)
if result["success"]:
_typer.echo(f"Success: {len(result['plots_generated'])} plots generated")
else:
_typer.echo(f"Completed with errors: {result['errors']}", err=True)
raise _typer.Exit(code=1)
@radiative_app.command("fetch-ceres")
def radiative_fetch(
product: str = _typer.Option("syn1deg", help="CERES product: ebaf or syn1deg"),
start: str = _typer.Option(..., help="Start date (YYYY-MM-DD)"),
end: str = _typer.Option(..., help="End date (YYYY-MM-DD)"),
output: str = _typer.Option(".", help="Output directory"),
) -> None:
"""Fetch CERES data from OPeNDAP."""
from datetime import date as date_cls
from davinci_monet.radiative.loaders.ceres import fetch_ebaf, fetch_syn1deg_range
if product == "ebaf":
path = fetch_ebaf(start, end, output)
_typer.echo(f"Wrote: {path}")
elif product == "syn1deg":
start_d = date_cls.fromisoformat(start)
end_d = date_cls.fromisoformat(end)
paths = fetch_syn1deg_range(start_d, end_d, output)
_typer.echo(f"Wrote {len(paths)} files to {output}")
else:
_typer.echo(f"Unknown product: {product}", err=True)
raise _typer.Exit(code=1)
app.add_typer(radiative_app, name="radiative")- Step 5: Run tests
Run: pytest davinci_monet/tests/test_radiative_runner.py -v
Expected: All 2 tests PASS
- Step 6: Run the full radiative test suite
Run: pytest davinci_monet/tests/test_radiative_*.py -v
Expected: All tests PASS across all 6 test files
- Step 7: Run the existing test suite to check for regressions
Run: pytest davinci_monet/tests/ -v --timeout=120
Expected: No new failures
- Step 8: Commit
git add davinci_monet/radiative/runner.py davinci_monet/cli/app.py davinci_monet/tests/test_radiative_runner.py
git commit -m "feat(radiative): add runner and CLI integration"Files:
-
Modify:
davinci_monet/radiative/__init__.py -
Step 1: Update
__init__.pywith public API
# davinci_monet/radiative/__init__.py
"""Smoke radiative analysis subpackage.
Provides tools for analyzing wildfire smoke radiative effects using
CERES, MERRA-2, and AERONET data. This is a standalone analysis
workflow that uses DAVINCI's config, styling, and CLI infrastructure.
Usage
-----
Via CLI::
davinci-monet radiative run config.yaml
davinci-monet radiative fetch-ceres --product syn1deg --start 2020-09-05 --end 2020-09-15
Via Python::
from davinci_monet.radiative.runner import run_radiative_analysis
result = run_radiative_analysis("config.yaml")
"""
from davinci_monet.radiative.config import RadiativeConfig
from davinci_monet.radiative.runner import run_radiative_analysis
__all__ = ["RadiativeConfig", "run_radiative_analysis"]- Step 2: Verify CLI is accessible
Run: davinci-monet radiative --help
Expected: Shows run and fetch-ceres subcommands
- Step 3: Run mypy on the new package
Run: mypy davinci_monet/radiative/ --ignore-missing-imports
Expected: No errors (or only pre-existing ones)
- Step 4: Run black and isort
Run: black davinci_monet/radiative/ davinci_monet/tests/test_radiative_*.py && isort davinci_monet/radiative/ davinci_monet/tests/test_radiative_*.py
Expected: Files formatted
- Step 5: Final full test run
Run: pytest davinci_monet/tests/test_radiative_*.py -v
Expected: All tests PASS
- Step 6: Commit
git add davinci_monet/radiative/__init__.py
git commit -m "feat(radiative): finalize public API and package exports"- Implementation Plan
- Code Review
- Tech Debt
- TODO
- Derecho
- Plotting Alternatives
- Plans
- Design Docs
- Paper (internal)