Skip to content

Plan Radiative

David Fillmore edited this page Mar 30, 2026 · 1 revision

Smoke Radiative Analysis Subpackage — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Port the CERES smoke radiative analysis from PlumeSentinelAI into DAVINCI as a standalone davinci_monet/radiative/ subpackage, generalized for arbitrary events (Sep 2020 West Coast, Australian fires 2019-2020, FIREX-AQ).

Architecture: Pipeline-inspired subpackage with config → loaders → processing → plots → runner flow. Uses DAVINCI's Pydantic config, NCAR style system, and Typer CLI but does NOT use the model-vs-obs pairing pipeline. The RT module comes over as-is from PlumeSentinelAI.

Tech Stack: xarray, numpy, pandas, matplotlib, cartopy, netCDF4, requests, pydantic, typer

Design doc: Plan-Radiative-Design in the wiki


File Map

New Files

davinci_monet/radiative/__init__.py              — Public API exports
davinci_monet/radiative/config.py                — Pydantic schemas (RadiativeConfig and children)
davinci_monet/radiative/rt.py                    — Delta-Eddington two-stream (ported as-is)
davinci_monet/radiative/loaders/__init__.py       — Loader exports
davinci_monet/radiative/loaders/ceres.py         — CERES EBAF + SYN1deg (local + OPeNDAP)
davinci_monet/radiative/loaders/merra2.py        — MERRA-2 aerosol loading + regrid
davinci_monet/radiative/loaders/aeronet.py       — AERONET CSV reader with site/domain filtering
davinci_monet/radiative/processing.py            — Anomaly, smoke AOD, regridding utilities
davinci_monet/radiative/plots/__init__.py         — Plot dispatcher
davinci_monet/radiative/plots/event_fields.py    — Multi-panel TOA event maps
davinci_monet/radiative/plots/anomaly_maps.py    — Event-minus-background anomaly maps
davinci_monet/radiative/plots/scatter.py         — SW vs AOD scatter + daily correlation
davinci_monet/radiative/plots/site_timeseries.py — Per-site dual-axis time-series
davinci_monet/radiative/plots/surface_impact.py  — Surface SW maps + method comparison
davinci_monet/radiative/runner.py                — Orchestrates load → process → plot
davinci_monet/tests/test_radiative_rt.py         — RT module tests (ported as-is)
davinci_monet/tests/test_radiative_config.py     — Config validation tests
davinci_monet/tests/test_radiative_processing.py — Processing function tests
davinci_monet/tests/test_radiative_loaders.py    — Loader tests (synthetic data)
davinci_monet/tests/test_radiative_plots.py      — Plot smoke tests (synthetic data)
davinci_monet/tests/test_radiative_runner.py     — Runner integration test

Modified Files

davinci_monet/cli/app.py                         — Register `radiative` subcommand group

Task 1: Port RT Module and Tests

Files:

  • Create: davinci_monet/radiative/__init__.py
  • Create: davinci_monet/radiative/rt.py
  • Create: davinci_monet/tests/test_radiative_rt.py

This is a direct port — the RT code comes over unchanged from PlumeSentinelAI. The test file is adapted to use proper imports instead of sys.path hacking.

  • Step 1: Create the radiative package with __init__.py
# davinci_monet/radiative/__init__.py
"""Smoke radiative analysis subpackage.

Provides tools for analyzing wildfire smoke radiative effects using
CERES, MERRA-2, and AERONET data. This is a standalone analysis
workflow that uses DAVINCI's config, styling, and CLI infrastructure.
"""
  • Step 2: Copy rt.py from PlumeSentinelAI

Copy ~/EarthSystem/PlumeSentinelAI/scripts/rt.py to davinci_monet/radiative/rt.py. The file should be identical to the source — no changes needed. It contains:

  • delta_eddington_2stream(tau, omega, g, mu0, albedo) — two-stream solver

  • rayleigh_tau(lambda_um, p_hpa) — Rayleigh optical depth

  • smoke_aod_at_wavelength(aod_550, lambda_um, angstrom) — Angstrom scaling

  • combine_aerosol_rayleigh(tau_aer, ssa_aer, g_aer, tau_ray) — bulk properties

  • daily_mean_coszen(lat_deg, doy) — solar geometry

  • Step 3: Create the test file

Port ~/EarthSystem/PlumeSentinelAI/scripts/test_rt.py to davinci_monet/tests/test_radiative_rt.py. Change the import from sys.path hack to proper package import. Remove the standalone _run_all() runner (pytest handles this).

# davinci_monet/tests/test_radiative_rt.py
"""Unit tests for the radiative transfer module.

Ported from PlumeSentinelAI/scripts/test_rt.py.
"""

from __future__ import annotations

import numpy as np

from davinci_monet.radiative.rt import (
    combine_aerosol_rayleigh,
    daily_mean_coszen,
    delta_eddington_2stream,
    rayleigh_tau,
    smoke_aod_at_wavelength,
)

ATOL = 1e-6


# ── Two-stream: energy conservation ─────────────────────────────────────────


def test_energy_conservation_basic():
    """R + T + A = 1 for a range of inputs."""
    cases = [
        (0.001, 0.92, 0.65, 0.5, 0.0),
        (0.5, 0.92, 0.65, 0.5, 0.0),
        (3.0, 0.92, 0.65, 0.5, 0.0),
        (10.0, 0.92, 0.65, 0.5, 0.0),
        (100.0, 0.0, 0.0, 0.5, 0.0),
        (0.5, 1.0, 0.0, 0.5, 0.0),
        (0.5, 0.92, 0.65, 0.8, 0.0),
        (0.5, 0.92, 0.65, 0.3, 0.0),
        (1.0, 0.85, 0.70, 0.6, 0.0),
    ]
    for tau, omega, g, mu0, alb in cases:
        R, T, A = delta_eddington_2stream(tau, omega, g, mu0, alb)
        total = R + T + A
        assert abs(total - 1.0) < ATOL, (
            f"Energy not conserved: τ={tau} ω={omega} g={g} μ₀={mu0} "
            f"R={R:.6f} T={T:.6f} A={A:.6f} sum={total:.6f}"
        )


def test_energy_conservation_with_albedo():
    """R + T + A = 1 with nonzero surface albedo."""
    cases = [
        (0.5, 0.92, 0.65, 0.5, 0.1),
        (0.5, 0.92, 0.65, 0.5, 0.3),
        (1.0, 0.92, 0.65, 0.5, 0.5),
        (2.0, 0.88, 0.68, 0.6, 0.2),
    ]
    for tau, omega, g, mu0, alb in cases:
        R, T, A = delta_eddington_2stream(tau, omega, g, mu0, alb)
        total = R + T + A
        assert abs(total - 1.0) < ATOL, (
            f"Energy not conserved with albedo: τ={tau} α={alb} "
            f"sum={total:.6f}"
        )


def test_energy_conservation_arrays():
    """Energy conservation for array inputs."""
    tau = np.array([0.1, 0.5, 1.0, 3.0, 10.0])
    R, T, A = delta_eddington_2stream(tau, 0.92, 0.65, 0.5)
    total = R + T + A
    assert np.allclose(total, 1.0, atol=ATOL), f"Array energy: {total}"


# ── Two-stream: physical limits ─────────────────────────────────────────────


def test_thin_layer_transmits():
    """Very thin layer should transmit nearly all light."""
    R, T, A = delta_eddington_2stream(0.001, 0.92, 0.65, 0.5)
    assert T > 0.99, f"Thin layer T={T:.4f}, expected > 0.99"
    assert R < 0.01, f"Thin layer R={R:.4f}, expected < 0.01"


def test_thick_absorber_absorbs():
    """Thick purely absorbing layer should absorb everything."""
    R, T, A = delta_eddington_2stream(100.0, 0.0, 0.0, 0.5)
    assert A > 0.99, f"Thick absorber A={A:.4f}, expected > 0.99"
    assert T < 0.01, f"Thick absorber T={T:.4f}, expected < 0.01"


def test_conservative_no_absorption():
    """Conservative scattering (ω=1) should have zero absorption."""
    R, T, A = delta_eddington_2stream(0.5, 0.999999, 0.0, 0.5)
    assert abs(A) < 0.01, f"Conservative A={A:.6f}, expected ≈ 0"


def test_thick_smoke_moderate_transmission():
    """τ=3 with SSA=0.92 should still transmit ~30%."""
    R, T, A = delta_eddington_2stream(3.0, 0.92, 0.65, 0.5)
    assert 0.15 < T < 0.50, f"Thick smoke T={T:.4f}, expected 0.15–0.50"
    assert 0.20 < R < 0.50, f"Thick smoke R={R:.4f}, expected 0.20–0.50"


def test_higher_mu_more_transmission():
    """Higher μ₀ (sun higher) → shorter path → more transmission."""
    _, T_low, _ = delta_eddington_2stream(1.0, 0.92, 0.65, 0.3)
    _, T_high, _ = delta_eddington_2stream(1.0, 0.92, 0.65, 0.8)
    assert T_high > T_low, (
        f"T(μ₀=0.8)={T_high:.4f} should > T(μ₀=0.3)={T_low:.4f}"
    )


def test_albedo_increases_reflection():
    """Surface albedo should increase total reflected flux."""
    R0, _, _ = delta_eddington_2stream(0.5, 0.92, 0.65, 0.5, albedo=0.0)
    R3, _, _ = delta_eddington_2stream(0.5, 0.92, 0.65, 0.5, albedo=0.3)
    assert R3 > R0, f"R(α=0.3)={R3:.4f} should > R(α=0)={R0:.4f}"


def test_zero_tau():
    """Zero optical depth → perfect transmission."""
    R, T, A = delta_eddington_2stream(0.0, 0.92, 0.65, 0.5)
    assert T > 0.999, f"Zero τ: T={T:.4f}"
    assert R < 0.001, f"Zero τ: R={R:.4f}"


def test_nonnegative_outputs():
    """R, T, A should all be non-negative for physical inputs."""
    taus = [0.01, 0.1, 0.5, 1.0, 3.0, 10.0]
    for tau in taus:
        R, T, A = delta_eddington_2stream(tau, 0.92, 0.65, 0.5)
        assert R >= -ATOL, f"R={R:.6f} < 0 at τ={tau}"
        assert T >= -ATOL, f"T={T:.6f} < 0 at τ={tau}"
        assert A >= -ATOL, f"A={A:.6f} < 0 at τ={tau}"


# ── Spectral utilities ───────────────────────────────────────────────────────


def test_rayleigh_wavelength_dependence():
    """Rayleigh τ should decrease strongly with wavelength."""
    t_uv = rayleigh_tau(0.35)
    t_vis = rayleigh_tau(0.55)
    t_nir = rayleigh_tau(1.0)
    assert t_uv > t_vis > t_nir, "Rayleigh should decrease with λ"
    assert t_uv / t_nir > 50, "UV/NIR Rayleigh ratio should be > 50"


def test_rayleigh_pressure_scaling():
    """Rayleigh τ should scale linearly with pressure."""
    t_full = rayleigh_tau(0.55, p_hpa=1013.25)
    t_half = rayleigh_tau(0.55, p_hpa=506.625)
    assert abs(t_half / t_full - 0.5) < 0.001


def test_smoke_aod_angstrom():
    """Smoke AOD should increase at shorter wavelengths."""
    aod_uv = smoke_aod_at_wavelength(1.0, 0.35)
    aod_vis = smoke_aod_at_wavelength(1.0, 0.55)
    aod_nir = smoke_aod_at_wavelength(1.0, 1.0)
    assert aod_uv > aod_vis > aod_nir
    assert abs(aod_vis - 1.0) < 0.01


def test_combine_aerosol_rayleigh_conservative():
    """Adding Rayleigh (SSA=1, g=0) should increase SSA and decrease g."""
    tau_aer, ssa_aer, g_aer = 0.5, 0.88, 0.68
    tau_ray = rayleigh_tau(0.50)
    tau_t, ssa_t, g_t = combine_aerosol_rayleigh(tau_aer, ssa_aer, g_aer, tau_ray)
    assert tau_t > tau_aer, "Total τ should exceed aerosol τ"
    assert ssa_t > ssa_aer, "Adding conservative scatterer should raise SSA"
    assert g_t < g_aer, "Adding isotropic scatterer should lower g"


# ── Solar geometry ───────────────────────────────────────────────────────────


def test_coszen_equinox_equator():
    """At equinox, equator should have μ̄ ≈ 1/π ≈ 0.318."""
    mu = daily_mean_coszen(0.0, 81)
    assert abs(mu - 1.0 / np.pi) < 0.02, f"Equator equinox μ̄={mu:.4f}"


def test_coszen_latitude_gradient():
    """Higher latitude → lower μ̄ in summer."""
    mu_low = daily_mean_coszen(30.0, 249)
    mu_high = daily_mean_coszen(50.0, 249)
    assert mu_low > mu_high, "Lower latitude should have higher μ̄"


def test_coszen_positive():
    """μ̄ should always be positive for our domain."""
    lats = np.arange(30, 53, 1.0)
    for doy in range(249, 260):
        mu = daily_mean_coszen(lats, doy)
        assert np.all(mu > 0), f"Negative μ̄ at doy={doy}"
  • Step 4: Run tests

Run: pytest davinci_monet/tests/test_radiative_rt.py -v Expected: All 18 tests PASS

  • Step 5: Commit
git add davinci_monet/radiative/__init__.py davinci_monet/radiative/rt.py davinci_monet/tests/test_radiative_rt.py
git commit -m "feat(radiative): port RT module and tests from PlumeSentinelAI"

Task 2: Configuration Schema

Files:

  • Create: davinci_monet/radiative/config.py

  • Create: davinci_monet/tests/test_radiative_config.py

  • Step 1: Write the config test file

# davinci_monet/tests/test_radiative_config.py
"""Tests for radiative analysis configuration schema."""

from __future__ import annotations

from datetime import datetime

import pytest

from davinci_monet.radiative.config import (
    AeronetConfig,
    CeresConfig,
    EventConfig,
    Merra2Config,
    RadiativeConfig,
    SurfaceImpactConfig,
)


class TestEventConfig:
    def test_valid_event(self) -> None:
        cfg = EventConfig(
            name="west-coast-2020",
            start_time=datetime(2020, 9, 5),
            end_time=datetime(2020, 9, 15),
            domain=(-135.0, -105.0, 30.0, 52.0),
        )
        assert cfg.name == "west-coast-2020"
        assert cfg.background_window == 3  # default

    def test_end_before_start_raises(self) -> None:
        with pytest.raises(ValueError, match="end_time must be after start_time"):
            EventConfig(
                name="bad",
                start_time=datetime(2020, 9, 15),
                end_time=datetime(2020, 9, 5),
                domain=(-135.0, -105.0, 30.0, 52.0),
            )

    def test_custom_background_window(self) -> None:
        cfg = EventConfig(
            name="test",
            start_time=datetime(2020, 9, 5),
            end_time=datetime(2020, 9, 15),
            domain=(-135.0, -105.0, 30.0, 52.0),
            background_window=5,
        )
        assert cfg.background_window == 5


class TestCeresConfig:
    def test_local_source(self) -> None:
        cfg = CeresConfig(
            product="syn1deg",
            source="local",
            files="/data/ceres/*.nc",
        )
        assert cfg.product == "syn1deg"
        assert cfg.source == "local"

    def test_opendap_source(self) -> None:
        cfg = CeresConfig(product="ebaf", source="opendap")
        assert cfg.files is None

    def test_local_requires_files(self) -> None:
        with pytest.raises(ValueError, match="files.*required.*local"):
            CeresConfig(product="syn1deg", source="local", files=None)


class TestRadiativeConfig:
    def test_minimal_config(self) -> None:
        """CERES-only config (no MERRA-2, no AERONET)."""
        cfg = RadiativeConfig(
            event=EventConfig(
                name="test",
                start_time=datetime(2020, 9, 5),
                end_time=datetime(2020, 9, 15),
                domain=(-135.0, -105.0, 30.0, 52.0),
            ),
            ceres=CeresConfig(product="syn1deg", source="local", files="/data/*.nc"),
            plots=["toa_event_fields"],
            output_dir="/tmp/output",
        )
        assert cfg.merra2 is None
        assert cfg.aeronet is None
        assert cfg.surface_impact is None

    def test_full_config(self) -> None:
        """Full config with all optional sections."""
        cfg = RadiativeConfig(
            event=EventConfig(
                name="west-coast-2020",
                start_time=datetime(2020, 9, 5),
                end_time=datetime(2020, 9, 15),
                domain=(-135.0, -105.0, 30.0, 52.0),
            ),
            ceres=CeresConfig(product="syn1deg", source="local", files="/data/ceres/*.nc"),
            merra2=Merra2Config(files="/data/merra2/*.nc4"),
            aeronet=AeronetConfig(files="/data/aeronet/*.csv", sites=["Fresno_2"]),
            surface_impact=SurfaceImpactConfig(ssa=0.90, asymmetry=0.60),
            plots=["toa_event_fields", "anomaly_maps", "sw_vs_aod_scatter"],
            output_dir="/tmp/output",
        )
        assert cfg.merra2 is not None
        assert cfg.surface_impact.ssa == 0.90

    def test_surface_impact_requires_merra2(self) -> None:
        with pytest.raises(ValueError, match="surface_impact requires merra2"):
            RadiativeConfig(
                event=EventConfig(
                    name="test",
                    start_time=datetime(2020, 9, 5),
                    end_time=datetime(2020, 9, 15),
                    domain=(-135.0, -105.0, 30.0, 52.0),
                ),
                ceres=CeresConfig(product="syn1deg", source="local", files="/data/*.nc"),
                surface_impact=SurfaceImpactConfig(),
                plots=["surface_impact"],
                output_dir="/tmp/output",
            )

    def test_invalid_plot_type(self) -> None:
        with pytest.raises(ValueError, match="Invalid plot"):
            RadiativeConfig(
                event=EventConfig(
                    name="test",
                    start_time=datetime(2020, 9, 5),
                    end_time=datetime(2020, 9, 15),
                    domain=(-135.0, -105.0, 30.0, 52.0),
                ),
                ceres=CeresConfig(product="syn1deg", source="local", files="/data/*.nc"),
                plots=["nonexistent_plot_type"],
                output_dir="/tmp/output",
            )
  • Step 2: Run tests to verify they fail

Run: pytest davinci_monet/tests/test_radiative_config.py -v Expected: FAIL — ModuleNotFoundError: No module named 'davinci_monet.radiative.config'

  • Step 3: Implement the config schema
# davinci_monet/radiative/config.py
"""Pydantic configuration schemas for smoke radiative analysis."""

from __future__ import annotations

from datetime import datetime
from typing import Literal

from pydantic import BaseModel, ConfigDict, model_validator

VALID_PLOT_TYPES = frozenset({
    "toa_event_fields",
    "anomaly_maps",
    "sw_vs_aod_scatter",
    "site_timeseries",
    "surface_impact",
})


class EventConfig(BaseModel):
    """Event definition: time window, domain, and background baseline."""

    model_config = ConfigDict(extra="forbid")

    name: str
    start_time: datetime
    end_time: datetime
    domain: tuple[float, float, float, float]  # (west, east, south, north)
    background_window: int = 3  # days before start_time for anomaly baseline

    @model_validator(mode="after")
    def _check_time_order(self) -> EventConfig:
        if self.end_time <= self.start_time:
            msg = "end_time must be after start_time"
            raise ValueError(msg)
        return self


class CeresConfig(BaseModel):
    """CERES data source configuration."""

    model_config = ConfigDict(extra="forbid")

    product: Literal["ebaf", "syn1deg"]
    source: Literal["local", "opendap"]
    files: str | None = None
    variables: list[str] | None = None

    @model_validator(mode="after")
    def _check_local_has_files(self) -> CeresConfig:
        if self.source == "local" and not self.files:
            msg = "files is required when source is local"
            raise ValueError(msg)
        return self


class Merra2Config(BaseModel):
    """MERRA-2 aerosol data configuration."""

    model_config = ConfigDict(extra="forbid")

    files: str
    smoke_species: list[str] = ["OCEXTTAU", "BCEXTTAU"]


class AeronetConfig(BaseModel):
    """AERONET site observation configuration."""

    model_config = ConfigDict(extra="forbid")

    files: str
    sites: list[str] | None = None  # None = all sites within domain


class SurfaceImpactConfig(BaseModel):
    """Semi-empirical surface impact RT parameters."""

    model_config = ConfigDict(extra="forbid")

    method: list[Literal["merra2", "semi_empirical"]] = [
        "merra2",
        "semi_empirical",
    ]
    ssa: float = 0.92
    asymmetry: float = 0.65


class RadiativeConfig(BaseModel):
    """Top-level radiative analysis configuration."""

    model_config = ConfigDict(extra="forbid")

    event: EventConfig
    ceres: CeresConfig
    merra2: Merra2Config | None = None
    aeronet: AeronetConfig | None = None
    surface_impact: SurfaceImpactConfig | None = None
    plots: list[str]
    output_dir: str

    @model_validator(mode="after")
    def _check_constraints(self) -> RadiativeConfig:
        if self.surface_impact is not None and self.merra2 is None:
            msg = "surface_impact requires merra2 configuration"
            raise ValueError(msg)
        invalid = set(self.plots) - VALID_PLOT_TYPES
        if invalid:
            msg = f"Invalid plot types: {invalid}. Valid: {sorted(VALID_PLOT_TYPES)}"
            raise ValueError(msg)
        return self
  • Step 4: Run tests

Run: pytest davinci_monet/tests/test_radiative_config.py -v Expected: All 8 tests PASS

  • Step 5: Commit
git add davinci_monet/radiative/config.py davinci_monet/tests/test_radiative_config.py
git commit -m "feat(radiative): add Pydantic configuration schema"

Task 3: Processing Module

Files:

  • Create: davinci_monet/radiative/processing.py

  • Create: davinci_monet/tests/test_radiative_processing.py

  • Step 1: Write processing tests

# davinci_monet/tests/test_radiative_processing.py
"""Tests for radiative analysis processing functions."""

from __future__ import annotations

import numpy as np
import pytest
import xarray as xr

from davinci_monet.radiative.processing import (
    compute_anomalies,
    compute_background,
    derive_smoke_aod,
    regrid_nearest,
    semi_empirical_surface_dimming,
)


@pytest.fixture
def daily_dataset() -> xr.Dataset:
    """Synthetic CERES-like daily dataset: 5 days, 10x10 grid."""
    np.random.seed(42)
    times = np.arange(5)
    lats = np.linspace(30, 52, 10)
    lons = np.linspace(-135, -105, 10)
    return xr.Dataset(
        {
            "toa_sw": (["time", "lat", "lon"], np.random.uniform(100, 300, (5, 10, 10))),
            "aod": (["time", "lat", "lon"], np.random.uniform(0, 2, (5, 10, 10))),
        },
        coords={"time": times, "lat": lats, "lon": lons},
    )


class TestComputeBackground:
    def test_mean_of_first_n_days(self, daily_dataset: xr.Dataset) -> None:
        bg = compute_background(daily_dataset, window=3)
        expected = daily_dataset.isel(time=slice(0, 3)).mean(dim="time")
        xr.testing.assert_allclose(bg, expected)

    def test_window_larger_than_data_uses_all(self, daily_dataset: xr.Dataset) -> None:
        bg = compute_background(daily_dataset, window=10)
        expected = daily_dataset.mean(dim="time")
        xr.testing.assert_allclose(bg, expected)


class TestComputeAnomalies:
    def test_anomaly_shape(self, daily_dataset: xr.Dataset) -> None:
        bg = compute_background(daily_dataset, window=3)
        anom = compute_anomalies(daily_dataset, bg)
        assert anom.dims == daily_dataset.dims
        assert set(anom.data_vars) == set(daily_dataset.data_vars)

    def test_anomaly_values(self, daily_dataset: xr.Dataset) -> None:
        bg = compute_background(daily_dataset, window=3)
        anom = compute_anomalies(daily_dataset, bg)
        # First 3 days should have anomaly near zero on average
        first3_mean = anom.isel(time=slice(0, 3)).mean().to_array().values
        assert np.all(np.abs(first3_mean) < 50)  # reasonable tolerance


class TestDeriveSmokeAod:
    def test_sum_of_species(self) -> None:
        ds = xr.Dataset({
            "OCEXTTAU": (["lat", "lon"], np.ones((5, 5)) * 0.3),
            "BCEXTTAU": (["lat", "lon"], np.ones((5, 5)) * 0.1),
        })
        smoke = derive_smoke_aod(ds, species=["OCEXTTAU", "BCEXTTAU"])
        np.testing.assert_allclose(smoke.values, 0.4)

    def test_single_species(self) -> None:
        ds = xr.Dataset({
            "OCEXTTAU": (["lat", "lon"], np.ones((5, 5)) * 0.3),
        })
        smoke = derive_smoke_aod(ds, species=["OCEXTTAU"])
        np.testing.assert_allclose(smoke.values, 0.3)


class TestRegridNearest:
    def test_coarsen(self) -> None:
        """Regrid fine grid to coarser grid via nearest-neighbor."""
        src_lat = np.linspace(30, 52, 44)  # ~0.5° spacing
        src_lon = np.linspace(-135, -105, 48)  # ~0.625° spacing
        src = xr.DataArray(
            np.ones((44, 48)),
            dims=["lat", "lon"],
            coords={"lat": src_lat, "lon": src_lon},
        )
        tgt_lat = np.linspace(30, 52, 22)  # ~1° spacing
        tgt_lon = np.linspace(-135, -105, 30)
        result = regrid_nearest(src, tgt_lat, tgt_lon)
        assert result.shape == (22, 30)
        np.testing.assert_allclose(result.values, 1.0)

    def test_preserves_pattern(self) -> None:
        """Check that a lat-gradient pattern survives regridding."""
        src_lat = np.linspace(0, 10, 20)
        src_lon = np.linspace(0, 10, 20)
        data = np.broadcast_to(src_lat[:, None], (20, 20)).copy()
        src = xr.DataArray(data, dims=["lat", "lon"],
                           coords={"lat": src_lat, "lon": src_lon})
        tgt_lat = np.linspace(0, 10, 10)
        tgt_lon = np.linspace(0, 10, 10)
        result = regrid_nearest(src, tgt_lat, tgt_lon)
        # Values should increase with latitude index
        assert np.all(np.diff(result.values.mean(axis=1)) > 0)


class TestSemiEmpiricalDimming:
    def test_zero_aod_no_dimming(self) -> None:
        dimming = semi_empirical_surface_dimming(
            smoke_aod=np.array([0.0]), toa_insol=np.array([300.0]),
        )
        np.testing.assert_allclose(dimming, 0.0, atol=1e-10)

    def test_dimming_increases_with_aod(self) -> None:
        aod = np.array([0.1, 0.5, 1.0, 3.0])
        dimming = semi_empirical_surface_dimming(
            smoke_aod=aod, toa_insol=np.full(4, 300.0),
        )
        # Dimming should be negative and increasingly negative
        assert np.all(dimming < 0)
        assert np.all(np.diff(dimming) < 0)

    def test_custom_ssa(self) -> None:
        aod = np.array([1.0])
        insol = np.array([300.0])
        d_high_ssa = semi_empirical_surface_dimming(aod, insol, ssa=0.99, asymmetry=0.65)
        d_low_ssa = semi_empirical_surface_dimming(aod, insol, ssa=0.80, asymmetry=0.65)
        # Higher SSA = more scattering, less absorption, but less surface dimming
        # because more forward-scattered light reaches surface
        assert abs(d_high_ssa[0]) < abs(d_low_ssa[0])
  • Step 2: Run tests to verify they fail

Run: pytest davinci_monet/tests/test_radiative_processing.py -v Expected: FAIL — ModuleNotFoundError

  • Step 3: Implement the processing module
# davinci_monet/radiative/processing.py
"""Processing functions for smoke radiative analysis.

Anomaly computation, smoke AOD derivation, regridding, and
semi-empirical surface dimming estimation.
"""

from __future__ import annotations

import numpy as np
import xarray as xr


def compute_background(ds: xr.Dataset, window: int) -> xr.Dataset:
    """Compute background mean from the first ``window`` time steps.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset with a ``time`` dimension.
    window : int
        Number of leading time steps to average.

    Returns
    -------
    xr.Dataset
        Time-averaged background (no time dimension).
    """
    n = min(window, ds.sizes["time"])
    return ds.isel(time=slice(0, n)).mean(dim="time")


def compute_anomalies(ds: xr.Dataset, background: xr.Dataset) -> xr.Dataset:
    """Subtract background from each time step.

    Parameters
    ----------
    ds : xr.Dataset
        Full event dataset with ``time`` dimension.
    background : xr.Dataset
        Background mean (no ``time`` dimension).

    Returns
    -------
    xr.Dataset
        Anomaly dataset with same shape as ``ds``.
    """
    return ds - background


def derive_smoke_aod(ds: xr.Dataset, species: list[str]) -> xr.DataArray:
    """Sum specified AOD species to get smoke AOD.

    Parameters
    ----------
    ds : xr.Dataset
        Dataset containing the species variables.
    species : list[str]
        Variable names to sum (e.g., ``["OCEXTTAU", "BCEXTTAU"]``).

    Returns
    -------
    xr.DataArray
        Smoke AOD field.
    """
    result = ds[species[0]].copy()
    for s in species[1:]:
        result = result + ds[s]
    return result


def regrid_nearest(
    source: xr.DataArray,
    target_lat: np.ndarray,
    target_lon: np.ndarray,
) -> xr.DataArray:
    """Nearest-neighbor regrid a 2D field to a new lat/lon grid.

    Parameters
    ----------
    source : xr.DataArray
        Source data with ``lat`` and ``lon`` dimensions.
    target_lat : np.ndarray
        Target latitude array.
    target_lon : np.ndarray
        Target longitude array.

    Returns
    -------
    xr.DataArray
        Regridded data on target grid.
    """
    src_lat = source.lat.values
    src_lon = source.lon.values
    data = source.values

    out = np.empty((len(target_lat), len(target_lon)), dtype=data.dtype)
    for i, tlat in enumerate(target_lat):
        li = np.argmin(np.abs(src_lat - tlat))
        for j, tlon in enumerate(target_lon):
            lj = np.argmin(np.abs(src_lon - tlon))
            out[i, j] = data[li, lj]

    return xr.DataArray(
        out,
        dims=["lat", "lon"],
        coords={"lat": target_lat, "lon": target_lon},
    )


def semi_empirical_surface_dimming(
    smoke_aod: np.ndarray,
    toa_insol: np.ndarray,
    ssa: float = 0.92,
    asymmetry: float = 0.65,
) -> np.ndarray:
    """Estimate surface SW dimming from smoke AOD using Beer-Lambert.

    Parameters
    ----------
    smoke_aod : array_like
        Smoke aerosol optical depth at 550 nm.
    toa_insol : array_like
        TOA insolation (W/m²), used as baseline S₀.
    ssa : float
        Single-scattering albedo (default 0.92).
    asymmetry : float
        Asymmetry parameter (default 0.65).

    Returns
    -------
    ndarray
        Surface SW dimming (negative = dimming, W/m²).
    """
    smoke_aod = np.asarray(smoke_aod, dtype=np.float64)
    toa_insol = np.asarray(toa_insol, dtype=np.float64)
    mu = 0.5  # effective daily-mean cosine of solar zenith
    extinct = 1.0 - np.exp(-smoke_aod / mu)
    fwd_scatter_frac = ssa * (1.0 + asymmetry) / 2.0
    return -toa_insol * extinct * (1.0 - fwd_scatter_frac)
  • Step 4: Run tests

Run: pytest davinci_monet/tests/test_radiative_processing.py -v Expected: All 10 tests PASS

  • Step 5: Commit
git add davinci_monet/radiative/processing.py davinci_monet/tests/test_radiative_processing.py
git commit -m "feat(radiative): add processing module (anomalies, regridding, surface dimming)"

Task 4: Data Loaders

Files:

  • Create: davinci_monet/radiative/loaders/__init__.py

  • Create: davinci_monet/radiative/loaders/ceres.py

  • Create: davinci_monet/radiative/loaders/merra2.py

  • Create: davinci_monet/radiative/loaders/aeronet.py

  • Create: davinci_monet/tests/test_radiative_loaders.py

  • Step 1: Write loader tests (synthetic data, no network)

# davinci_monet/tests/test_radiative_loaders.py
"""Tests for radiative analysis data loaders using synthetic data."""

from __future__ import annotations

from pathlib import Path

import numpy as np
import pandas as pd
import pytest
import xarray as xr

from davinci_monet.radiative.loaders.aeronet import load_aeronet
from davinci_monet.radiative.loaders.ceres import load_ceres_local
from davinci_monet.radiative.loaders.merra2 import load_merra2


@pytest.fixture
def synthetic_ceres_dir(tmp_path: Path) -> Path:
    """Create synthetic CERES SYN1deg daily files."""
    lats = np.arange(89.5, -90, -1.0)  # 180 lats, descending
    lons = np.arange(-179.5, 180, 1.0)  # 360 lons
    for day in range(5, 10):
        ds = xr.Dataset(
            {
                "obs_all_toa_sw": (["lat", "lon"], np.random.uniform(50, 350, (180, 360)).astype("f4")),
                "obs_clr_toa_sw": (["lat", "lon"], np.random.uniform(50, 300, (180, 360)).astype("f4")),
                "obs_all_toa_net": (["lat", "lon"], np.random.uniform(-100, 200, (180, 360)).astype("f4")),
                "init_match_aod55": (["lat", "lon"], np.random.uniform(0, 3, (180, 360)).astype("f4")),
                "obs_cld_amount": (["lat", "lon"], np.random.uniform(0, 100, (180, 360)).astype("f4")),
                "toa_sw_insol": (["lat", "lon"], np.random.uniform(300, 450, (180, 360)).astype("f4")),
            },
            coords={"lat": lats, "lon": lons},
        )
        ds.to_netcdf(tmp_path / f"CERES_SYN1deg-Day_2020090{day}.nc")
    return tmp_path


@pytest.fixture
def synthetic_merra2_dir(tmp_path: Path) -> Path:
    """Create synthetic MERRA-2 aerosol files."""
    lats = np.arange(-90, 90.5, 0.5)
    lons = np.arange(-180, 180, 0.625)
    times = np.arange(8)  # 8 hourly time steps
    for day in range(5, 10):
        ds = xr.Dataset(
            {
                "TOTEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 1, (8, len(lats), len(lons))).astype("f4")),
                "OCEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 0.5, (8, len(lats), len(lons))).astype("f4")),
                "BCEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 0.1, (8, len(lats), len(lons))).astype("f4")),
            },
            coords={"time": times, "lat": lats, "lon": lons},
        )
        ds.to_netcdf(tmp_path / f"MERRA2_401.tavg1_2d_aer_Nx.2020090{day}.nc4")
    return tmp_path


@pytest.fixture
def synthetic_aeronet_csv(tmp_path: Path) -> Path:
    """Create synthetic AERONET CSV."""
    rows = []
    for site in ["Fresno_2", "UCSB", "NASA_Ames"]:
        for day in range(5, 10):
            for hour in [10, 12, 14]:
                rows.append({
                    "time": f"2020-09-0{day} {hour}:00:00",
                    "site": site,
                    "AOD_500nm": np.random.uniform(0.1, 2.0),
                    "latitude": 36.8 if site == "Fresno_2" else 34.4 if site == "UCSB" else 37.4,
                    "longitude": -119.8 if site == "Fresno_2" else -119.9 if site == "UCSB" else -122.1,
                })
    df = pd.DataFrame(rows)
    out = tmp_path / "aeronet.csv"
    df.to_csv(out, index=False)
    return out


class TestCeresLoader:
    def test_load_local_syn1deg(self, synthetic_ceres_dir: Path) -> None:
        ds = load_ceres_local(
            files=str(synthetic_ceres_dir / "CERES_SYN1deg-Day_*.nc"),
            domain=(-135.0, -105.0, 30.0, 52.0),
        )
        assert "obs_all_toa_sw" in ds.data_vars
        assert "time" in ds.dims
        assert ds.sizes["time"] == 5
        # Check domain subsetting
        assert float(ds.lat.min()) >= 30.0
        assert float(ds.lat.max()) <= 52.0
        assert float(ds.lon.min()) >= -135.0
        assert float(ds.lon.max()) <= -105.0

    def test_load_with_variable_filter(self, synthetic_ceres_dir: Path) -> None:
        ds = load_ceres_local(
            files=str(synthetic_ceres_dir / "CERES_SYN1deg-Day_*.nc"),
            domain=(-135.0, -105.0, 30.0, 52.0),
            variables=["obs_all_toa_sw", "init_match_aod55"],
        )
        assert set(ds.data_vars) == {"obs_all_toa_sw", "init_match_aod55"}


class TestMerra2Loader:
    def test_load_and_daily_mean(self, synthetic_merra2_dir: Path) -> None:
        ds = load_merra2(
            files=str(synthetic_merra2_dir / "MERRA2_401.tavg1_2d_aer_Nx.*.nc4"),
            domain=(-135.0, -105.0, 30.0, 52.0),
            smoke_species=["OCEXTTAU", "BCEXTTAU"],
        )
        assert "time" in ds.dims
        assert "SMOKEAOD" in ds.data_vars
        # Should be daily means (no hourly time dimension within each day)
        # Each day is one time step
        assert ds.sizes["time"] == 5


class TestAeronetLoader:
    def test_load_all_sites(self, synthetic_aeronet_csv: Path) -> None:
        df = load_aeronet(
            files=str(synthetic_aeronet_csv),
            domain=(-135.0, -105.0, 30.0, 52.0),
        )
        assert len(df) > 0
        assert "time" in df.columns
        assert "site" in df.columns
        assert "aod" in df.columns

    def test_filter_by_sites(self, synthetic_aeronet_csv: Path) -> None:
        df = load_aeronet(
            files=str(synthetic_aeronet_csv),
            domain=(-135.0, -105.0, 30.0, 52.0),
            sites=["Fresno_2"],
        )
        assert set(df["site"].unique()) == {"Fresno_2"}
  • Step 2: Run tests to verify they fail

Run: pytest davinci_monet/tests/test_radiative_loaders.py -v Expected: FAIL — ModuleNotFoundError

  • Step 3: Create loaders/__init__.py
# davinci_monet/radiative/loaders/__init__.py
"""Data loaders for smoke radiative analysis."""

from davinci_monet.radiative.loaders.aeronet import load_aeronet
from davinci_monet.radiative.loaders.ceres import load_ceres_local
from davinci_monet.radiative.loaders.merra2 import load_merra2

__all__ = ["load_aeronet", "load_ceres_local", "load_merra2"]
  • Step 4: Implement CERES loader
# davinci_monet/radiative/loaders/ceres.py
"""CERES data loaders for local files and OPeNDAP.

Supports EBAF (monthly) and SYN1deg (daily) products.
"""

from __future__ import annotations

import glob
import struct
from datetime import date, timedelta
from pathlib import Path

import numpy as np
import xarray as xr

from davinci_monet.logging import get_logger

logger = get_logger(__name__)

# ── Standard variable lists ──────────────────────────────────────────────────

EBAF_VARIABLES = [
    "toa_sw_all_mon",
    "toa_lw_all_mon",
    "toa_net_all_mon",
    "toa_sw_clr_c_mon",
    "toa_lw_clr_c_mon",
    "toa_net_clr_c_mon",
    "solar_mon",
    "cldarea_total_daynight_mon",
    "cldtau_total_day_mon",
]

SYN1DEG_VARIABLES = [
    "obs_all_toa_sw",
    "obs_all_toa_lw",
    "obs_all_toa_net",
    "obs_clr_toa_sw",
    "obs_clr_toa_lw",
    "obs_clr_toa_net",
    "toa_sw_insol",
    "init_all_sfc_sw_dn",
    "init_all_sfc_sw_up",
    "init_all_sfc_lw_dn",
    "init_all_sfc_lw_up",
    "init_clr_sfc_sw_dn",
    "init_clr_sfc_lw_dn",
    "init_match_aod55",
    "init_skin_temp",
    "init_pw",
    "obs_cld_amount",
    "obs_cld_od",
]

FILL_VALUE = -999.0


def load_ceres_local(
    files: str,
    domain: tuple[float, float, float, float],
    variables: list[str] | None = None,
) -> xr.Dataset:
    """Load CERES data from local NetCDF files.

    Parameters
    ----------
    files : str
        Glob pattern for NetCDF files.
    domain : tuple
        (west, east, south, north) bounding box.
    variables : list[str] | None
        Variables to load. None loads all.

    Returns
    -------
    xr.Dataset
        CERES data with dims (time, lat, lon). Fill values replaced with NaN.
    """
    paths = sorted(glob.glob(str(Path(files).expanduser())))
    if not paths:
        msg = f"No files matched: {files}"
        raise FileNotFoundError(msg)

    logger.info("Loading %d CERES files", len(paths))

    datasets = []
    for p in paths:
        ds = xr.open_dataset(p)
        ds = ds.where(ds != FILL_VALUE)
        if variables:
            available = [v for v in variables if v in ds]
            ds = ds[available]
        datasets.append(ds)

    if len(datasets) == 1:
        combined = datasets[0].expand_dims("time")
    else:
        combined = xr.concat(datasets, dim="time")

    # Subset to domain
    west, east, south, north = domain
    lat = combined.lat.values
    if lat[0] > lat[-1]:  # descending (CERES convention)
        combined = combined.sel(lat=slice(north, south), lon=slice(west, east))
    else:
        combined = combined.sel(lat=slice(south, north), lon=slice(west, east))

    return combined


# ── OPeNDAP fetchers ─────────────────────────────────────────────────────────

EBAF_OPENDAP_URL = (
    "https://opendap.larc.nasa.gov/opendap/CERES/"
    "EBAF/TOA_Edition4.2/"
    "CERES_EBAF-TOA_Edition4.2_200003-202407.nc"
)

SYN1DEG_BASE_URL = (
    "https://opendap.larc.nasa.gov/opendap/CERES/"
    "SYN1deg-Day/Terra-Aqua-NOAA20_Edition4B"
)

NLAT, NLON = 180, 360

SYN1DEG_VARS_2D = [
    "obs_all_toa_sw", "obs_all_toa_lw", "obs_all_toa_net",
    "obs_clr_toa_sw", "obs_clr_toa_lw", "obs_clr_toa_net",
    "toa_sw_insol",
    "init_all_sfc_sw_dn", "init_all_sfc_sw_up",
    "init_all_sfc_lw_dn", "init_all_sfc_lw_up",
    "init_clr_sfc_sw_dn", "init_clr_sfc_lw_dn",
    "init_match_aod55", "init_skin_temp", "init_pw",
]
SYN1DEG_VARS_3D = ["obs_cld_amount", "obs_cld_od"]


def fetch_ebaf(
    start_time: str,
    end_time: str,
    output_dir: str | Path,
    variables: list[str] | None = None,
) -> Path:
    """Fetch CERES EBAF-TOA monthly data from OPeNDAP.

    Parameters
    ----------
    start_time, end_time : str
        Time range (e.g., "2020-06-01", "2020-12-31").
    output_dir : str or Path
        Directory for output file.
    variables : list[str] | None
        Variables to fetch. None uses EBAF_VARIABLES.

    Returns
    -------
    Path
        Path to written NetCDF file.
    """
    output_dir = Path(output_dir).expanduser()
    output_dir.mkdir(parents=True, exist_ok=True)
    out_file = output_dir / f"CERES_EBAF-TOA_{start_time}_{end_time}.nc"

    if out_file.exists():
        logger.info("Already exists: %s", out_file)
        return out_file

    logger.info("Opening OPeNDAP: %s", EBAF_OPENDAP_URL)
    ds = xr.open_dataset(EBAF_OPENDAP_URL)

    vars_to_use = variables or EBAF_VARIABLES
    available = [v for v in vars_to_use if v in ds]
    subset = ds[available].sel(time=slice(start_time, end_time))

    logger.info("Writing %s (%d months, %d variables)", out_file, len(subset.time), len(available))
    subset.to_netcdf(out_file)
    return out_file


def _build_syn1deg_url(d: date) -> str:
    return (
        f"{SYN1DEG_BASE_URL}/{d.year}/{d.month:02d}/"
        f"CER_SYN1deg-Day_Terra-Aqua-NOAA20_Edition4B_408412."
        f"{d.strftime('%Y%m%d')}.hdf"
    )


def _parse_dods_grid(data: bytes, offset: int, shape: tuple[int, ...]):
    """Parse a DAP2 Grid from binary .dods response."""
    n = int(np.prod(shape))
    offset += 8  # skip two int32 headers
    arr = np.frombuffer(data, dtype=">f4", count=n, offset=offset).copy()
    offset += n * 4
    arr = arr.reshape(shape)
    for dim_size in shape:
        offset += 8 + dim_size * 4
    return arr, offset


def _fetch_var_dods(hdf_url: str, varname: str, shape: tuple[int, ...]):
    """Fetch a single variable via DAP2 binary."""
    import requests

    url = f"{hdf_url}.dods?{varname}"
    r = requests.get(url, timeout=120)
    r.raise_for_status()
    sep = b"\nData:\n"
    idx = r.content.index(sep) + len(sep)
    arr, _ = _parse_dods_grid(r.content, idx, shape)
    return arr


def _fetch_coords(hdf_url: str):
    lat = _fetch_var_dods(hdf_url, "latitude", (NLAT,)).ravel()
    lon = _fetch_var_dods(hdf_url, "longitude", (NLON,)).ravel()
    return lat, lon


def fetch_syn1deg_day(d: date, output_dir: str | Path) -> Path:
    """Fetch one day of CERES SYN1deg-Day via DAP2 binary.

    Parameters
    ----------
    d : date
        Date to fetch.
    output_dir : str or Path
        Directory for output file.

    Returns
    -------
    Path
        Path to written NetCDF4 file.
    """
    import netCDF4

    output_dir = Path(output_dir).expanduser()
    output_dir.mkdir(parents=True, exist_ok=True)
    out_path = output_dir / f"CERES_SYN1deg-Day_{d.strftime('%Y%m%d')}.nc"

    if out_path.exists():
        logger.info("Already exists: %s", out_path.name)
        return out_path

    hdf_url = _build_syn1deg_url(d)
    logger.info("Fetching %s", d)

    lat, lon = _fetch_coords(hdf_url)

    dst = netCDF4.Dataset(str(out_path), "w", format="NETCDF4")
    dst.title = f"CERES SYN1deg-Day {d.isoformat()}"
    dst.source = hdf_url
    dst.date = d.isoformat()

    dst.createDimension("lat", NLAT)
    dst.createDimension("lon", NLON)
    lat_var = dst.createVariable("lat", "f4", ("lat",))
    lat_var[:] = lat
    lat_var.units = "degrees_north"
    lon_var = dst.createVariable("lon", "f4", ("lon",))
    lon_var[:] = lon
    lon_var.units = "degrees_east"

    for vname in SYN1DEG_VARS_2D:
        try:
            data = _fetch_var_dods(hdf_url, vname, (NLAT, NLON))
            dv = dst.createVariable(vname, "f4", ("lat", "lon"), fill_value=FILL_VALUE, zlib=True)
            dv[:] = np.where(data > -900, data, FILL_VALUE)
        except Exception:
            logger.warning("Failed to fetch %s for %s", vname, d)

    for vname in SYN1DEG_VARS_3D:
        try:
            data = _fetch_var_dods(hdf_url, vname, (5, NLAT, NLON))
            dv = dst.createVariable(vname, "f4", ("lat", "lon"), fill_value=FILL_VALUE, zlib=True)
            dv[:] = np.where(data[0] > -900, data[0], FILL_VALUE)
        except Exception:
            logger.warning("Failed to fetch %s (3D) for %s", vname, d)

    dst.close()
    return out_path


def fetch_syn1deg_range(
    start: date,
    end: date,
    output_dir: str | Path,
) -> list[Path]:
    """Fetch CERES SYN1deg-Day for a date range.

    Parameters
    ----------
    start, end : date
        Date range (inclusive).
    output_dir : str or Path
        Directory for output files.

    Returns
    -------
    list[Path]
        Paths to written files.
    """
    paths = []
    d = start
    while d <= end:
        paths.append(fetch_syn1deg_day(d, output_dir))
        d += timedelta(days=1)
    return paths
  • Step 5: Implement MERRA-2 loader
# davinci_monet/radiative/loaders/merra2.py
"""MERRA-2 aerosol data loader for smoke radiative analysis."""

from __future__ import annotations

import glob
from pathlib import Path

import xarray as xr

from davinci_monet.logging import get_logger
from davinci_monet.radiative.processing import derive_smoke_aod

logger = get_logger(__name__)


def load_merra2(
    files: str,
    domain: tuple[float, float, float, float],
    smoke_species: list[str],
) -> xr.Dataset:
    """Load MERRA-2 aerosol data, compute daily means, derive smoke AOD.

    Parameters
    ----------
    files : str
        Glob pattern for MERRA-2 NetCDF4 files.
    domain : tuple
        (west, east, south, north) bounding box.
    smoke_species : list[str]
        Variable names to sum for smoke AOD.

    Returns
    -------
    xr.Dataset
        Daily-mean MERRA-2 data with ``SMOKEAOD`` variable added.
        Dims: (time, lat, lon) where time has one step per input file.
    """
    paths = sorted(glob.glob(str(Path(files).expanduser())))
    if not paths:
        msg = f"No files matched: {files}"
        raise FileNotFoundError(msg)

    logger.info("Loading %d MERRA-2 files", len(paths))

    west, east, south, north = domain
    daily_datasets = []

    for p in paths:
        ds = xr.open_dataset(p)
        sub = ds.sel(lon=slice(west, east), lat=slice(south, north))
        # Take daily mean over hourly time steps
        daily = sub.mean(dim="time")
        daily["SMOKEAOD"] = derive_smoke_aod(daily, smoke_species)
        daily_datasets.append(daily)

    combined = xr.concat(daily_datasets, dim="time")
    return combined
  • Step 6: Implement AERONET loader
# davinci_monet/radiative/loaders/aeronet.py
"""AERONET site data loader for smoke radiative analysis."""

from __future__ import annotations

import glob
from pathlib import Path

import pandas as pd

from davinci_monet.logging import get_logger

logger = get_logger(__name__)


def load_aeronet(
    files: str,
    domain: tuple[float, float, float, float],
    sites: list[str] | None = None,
) -> pd.DataFrame:
    """Load AERONET CSV data, filter by domain and site list.

    Parameters
    ----------
    files : str
        Glob pattern or path to AERONET CSV file(s).
    domain : tuple
        (west, east, south, north) bounding box.
    sites : list[str] | None
        Site names to include. None = all sites within domain.

    Returns
    -------
    pd.DataFrame
        Columns: time, site, aod, latitude, longitude.
    """
    paths = sorted(glob.glob(str(Path(files).expanduser())))
    if not paths:
        msg = f"No files matched: {files}"
        raise FileNotFoundError(msg)

    logger.info("Loading %d AERONET files", len(paths))

    frames = []
    for p in paths:
        df = pd.read_csv(p, parse_dates=["time"])
        frames.append(df)
    df = pd.concat(frames, ignore_index=True)

    # Standardize AOD column name
    if "AOD_500nm" in df.columns and "aod" not in df.columns:
        df["aod"] = df["AOD_500nm"]

    # Filter by domain
    west, east, south, north = domain
    df = df[
        (df["longitude"] >= west)
        & (df["longitude"] <= east)
        & (df["latitude"] >= south)
        & (df["latitude"] <= north)
    ]

    # Filter by site list
    if sites is not None:
        df = df[df["site"].isin(sites)]

    logger.info("Loaded %d AERONET records (%d sites)", len(df), df["site"].nunique())
    return df.reset_index(drop=True)
  • Step 7: Run tests

Run: pytest davinci_monet/tests/test_radiative_loaders.py -v Expected: All 5 tests PASS

  • Step 8: Commit
git add davinci_monet/radiative/loaders/ davinci_monet/tests/test_radiative_loaders.py
git commit -m "feat(radiative): add CERES, MERRA-2, and AERONET data loaders"

Task 5: Plot Renderers

Files:

  • Create: davinci_monet/radiative/plots/__init__.py

  • Create: davinci_monet/radiative/plots/event_fields.py

  • Create: davinci_monet/radiative/plots/anomaly_maps.py

  • Create: davinci_monet/radiative/plots/scatter.py

  • Create: davinci_monet/radiative/plots/site_timeseries.py

  • Create: davinci_monet/radiative/plots/surface_impact.py

  • Create: davinci_monet/tests/test_radiative_plots.py

  • Step 1: Write plot smoke tests (synthetic data)

# davinci_monet/tests/test_radiative_plots.py
"""Smoke tests for radiative plot renderers.

These tests verify that each plotter produces a figure without errors,
using synthetic data. They do NOT validate visual correctness.
"""

from __future__ import annotations

from datetime import date

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pytest

from davinci_monet.radiative.plots.anomaly_maps import plot_anomaly_maps
from davinci_monet.radiative.plots.event_fields import plot_event_fields
from davinci_monet.radiative.plots.scatter import plot_sw_vs_aod_scatter
from davinci_monet.radiative.plots.site_timeseries import plot_site_timeseries
from davinci_monet.radiative.plots.surface_impact import plot_surface_impact

matplotlib.use("Agg")  # non-interactive backend


@pytest.fixture
def synthetic_event_data():
    """Synthetic daily records mimicking CERES + MERRA-2 aligned data."""
    np.random.seed(42)
    n_days = 5
    nlat, nlon = 10, 10
    lats = np.linspace(30, 52, nlat)
    lons = np.linspace(-135, -105, nlon)

    records = []
    for i in range(n_days):
        records.append({
            "date": date(2020, 9, 5 + i),
            "sw_all": np.random.uniform(100, 300, (nlat, nlon)),
            "sw_clr": np.random.uniform(80, 250, (nlat, nlon)),
            "tot_aod": np.random.uniform(0, 2, (nlat, nlon)),
            "smoke_aod": np.random.uniform(0, 1, (nlat, nlon)),
            "aod": np.random.uniform(0, 3, (nlat, nlon)),
            "toa_net": np.random.uniform(-100, 200, (nlat, nlon)),
            "cld_frac": np.random.uniform(0, 100, (nlat, nlon)),
            "m2_sfc_effect": np.random.uniform(-200, 0, (nlat, nlon)),
            "semi_dimming": np.random.uniform(-200, 0, (nlat, nlon)),
        })
    return lats, lons, records


@pytest.fixture
def synthetic_aeronet():
    rows = []
    for site in ["Site_A", "Site_B"]:
        for day in range(5, 10):
            rows.append({
                "time": pd.Timestamp(f"2020-09-0{day} 12:00"),
                "site": site,
                "aod": np.random.uniform(0.1, 2.0),
            })
    return pd.DataFrame(rows)


@pytest.fixture
def synthetic_sites():
    return [
        ("Site A", 40.0, -120.0, "Site_A"),
        ("Site B", 35.0, -115.0, "Site_B"),
    ]


class TestEventFields:
    def test_produces_figure(self, synthetic_event_data) -> None:
        lats, lons, records = synthetic_event_data
        fig = plot_event_fields(lats, lons, records[2], event_name="Test Event")
        assert isinstance(fig, matplotlib.figure.Figure)
        plt.close(fig)


class TestAnomalyMaps:
    def test_produces_figure(self, synthetic_event_data) -> None:
        lats, lons, records = synthetic_event_data
        bg = {
            k: np.mean([r[k] for r in records[:3]], axis=0)
            for k in ["sw_all", "sw_clr", "toa_net", "aod"]
        }
        fig = plot_anomaly_maps(lats, lons, records[2], bg, event_name="Test")
        assert isinstance(fig, matplotlib.figure.Figure)
        plt.close(fig)


class TestScatter:
    def test_produces_figure(self, synthetic_event_data) -> None:
        lats, lons, records = synthetic_event_data
        fig = plot_sw_vs_aod_scatter(lats, lons, records, event_name="Test")
        assert isinstance(fig, matplotlib.figure.Figure)
        plt.close(fig)


class TestSiteTimeseries:
    def test_produces_figure(self, synthetic_event_data, synthetic_aeronet, synthetic_sites) -> None:
        lats, lons, records = synthetic_event_data
        bg_sw = np.mean([r["sw_all"] for r in records[:3]], axis=0)
        fig = plot_site_timeseries(
            lats, lons, records, bg_sw,
            sites=synthetic_sites, aeronet=synthetic_aeronet,
            event_name="Test",
        )
        assert isinstance(fig, matplotlib.figure.Figure)
        plt.close(fig)


class TestSurfaceImpact:
    def test_produces_figure(self, synthetic_event_data) -> None:
        lats, lons, records = synthetic_event_data
        fig = plot_surface_impact(lats, lons, records[2], event_name="Test")
        assert isinstance(fig, matplotlib.figure.Figure)
        plt.close(fig)
  • Step 2: Run tests to verify they fail

Run: pytest davinci_monet/tests/test_radiative_plots.py -v Expected: FAIL — ModuleNotFoundError

  • Step 3: Create plots/__init__.py
# davinci_monet/radiative/plots/__init__.py
"""Plot renderers for smoke radiative analysis."""

from davinci_monet.radiative.plots.anomaly_maps import plot_anomaly_maps
from davinci_monet.radiative.plots.event_fields import plot_event_fields
from davinci_monet.radiative.plots.scatter import plot_sw_vs_aod_scatter
from davinci_monet.radiative.plots.site_timeseries import plot_site_timeseries
from davinci_monet.radiative.plots.surface_impact import plot_surface_impact

__all__ = [
    "plot_anomaly_maps",
    "plot_event_fields",
    "plot_site_timeseries",
    "plot_surface_impact",
    "plot_sw_vs_aod_scatter",
]
  • Step 4: Implement event_fields.py

Generalized from PlumeSentinelAI/scripts/plot_ceres_syn1deg_event.py:plot_event_fields().

# davinci_monet/radiative/plots/event_fields.py
"""Multi-panel TOA event field maps."""

from __future__ import annotations

from typing import Any

import cartopy.crs as ccrs
import cartopy.feature as cfeature
import matplotlib.pyplot as plt
import numpy as np

from davinci_monet.plots.style import NCAR_COLORS


def plot_event_fields(
    lats: np.ndarray,
    lons: np.ndarray,
    record: dict[str, Any],
    event_name: str = "",
) -> plt.Figure:
    """4-panel map: AOD, TOA SW reflected, TOA net flux, cloud fraction.

    Parameters
    ----------
    lats, lons : ndarray
        Coordinate arrays.
    record : dict
        Single-day record with keys: aod, sw_all, toa_net, cld_frac.
    event_name : str
        Title annotation.

    Returns
    -------
    matplotlib.figure.Figure
    """
    fields = [
        ("aod", "AOD 550 nm", "YlOrRd", 0, 3.0),
        ("sw_all", "TOA SW Reflected (W/m²)", "YlOrRd", 0, 350),
        ("toa_net", "TOA Net Flux (W/m²)", "RdBu_r", -150, 150),
        ("cld_frac", "Cloud Fraction (%)", "Blues", 0, 100),
    ]

    fig, axes = plt.subplots(
        2, 2, figsize=(14, 10),
        subplot_kw={"projection": ccrs.PlateCarree()},
    )
    title = f"CERES SYN1deg — {record['date']} {event_name}".strip()
    fig.suptitle(title, fontsize=18, fontweight="bold", color=NCAR_COLORS["space"])

    extent = [float(lons.min()), float(lons.max()),
              float(lats.min()), float(lats.max())]

    for ax, (key, label, cmap, vmin, vmax) in zip(axes.flat, fields):
        data = record[key]
        ax.set_extent(extent, crs=ccrs.PlateCarree())
        ax.add_feature(cfeature.STATES, linewidth=0.5, edgecolor="gray")
        ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
        ax.add_feature(cfeature.BORDERS, linewidth=0.5, linestyle="--")
        im = ax.pcolormesh(
            lons, lats, data,
            transform=ccrs.PlateCarree(),
            cmap=cmap, vmin=vmin, vmax=vmax, shading="auto",
        )
        ax.set_title(label, fontsize=13)
        plt.colorbar(im, ax=ax, shrink=0.7, pad=0.02)

    plt.tight_layout(rect=[0, 0, 1, 0.94])
    return fig
  • Step 5: Implement anomaly_maps.py

Generalized from plot_ceres_syn1deg_event.py:plot_anomalies().

# davinci_monet/radiative/plots/anomaly_maps.py
"""Event-minus-background anomaly maps."""

from __future__ import annotations

from typing import Any

import cartopy.crs as ccrs
import cartopy.feature as cfeature
import matplotlib.pyplot as plt
import numpy as np

from davinci_monet.plots.style import NCAR_COLORS


def plot_anomaly_maps(
    lats: np.ndarray,
    lons: np.ndarray,
    record: dict[str, Any],
    background: dict[str, np.ndarray],
    event_name: str = "",
) -> plt.Figure:
    """4-panel anomaly maps: ΔAOD, ΔSW all-sky, ΔSW clear-sky, ΔNet.

    Parameters
    ----------
    lats, lons : ndarray
        Coordinate arrays.
    record : dict
        Single-day record with keys: aod, sw_all, sw_clr, toa_net.
    background : dict
        Background mean arrays with same keys.
    event_name : str
        Title annotation.

    Returns
    -------
    matplotlib.figure.Figure
    """
    fields = [
        ("aod", "ΔAOD 550 nm", "PuOr_r", -2.0, 2.0),
        ("sw_all", "ΔTOA SW Reflected (W/m²)", "RdBu_r", -100, 100),
        ("sw_clr", "ΔTOA SW Clear-Sky (W/m²)", "RdBu_r", -100, 100),
        ("toa_net", "ΔTOA Net Flux (W/m²)", "RdBu_r", -100, 100),
    ]

    fig, axes = plt.subplots(
        2, 2, figsize=(14, 10),
        subplot_kw={"projection": ccrs.PlateCarree()},
    )
    title = f"CERES SYN1deg — {record['date']} Anomaly vs Background {event_name}".strip()
    fig.suptitle(title, fontsize=18, fontweight="bold", color=NCAR_COLORS["space"])

    extent = [float(lons.min()), float(lons.max()),
              float(lats.min()), float(lats.max())]

    for ax, (key, label, cmap, vmin, vmax) in zip(axes.flat, fields):
        anom = record[key] - background[key]
        ax.set_extent(extent, crs=ccrs.PlateCarree())
        ax.add_feature(cfeature.STATES, linewidth=0.5, edgecolor="gray")
        ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
        ax.add_feature(cfeature.BORDERS, linewidth=0.5, linestyle="--")
        im = ax.pcolormesh(
            lons, lats, anom,
            transform=ccrs.PlateCarree(),
            cmap=cmap, vmin=vmin, vmax=vmax, shading="auto",
        )
        ax.set_title(label, fontsize=13)
        plt.colorbar(im, ax=ax, shrink=0.7, pad=0.02)

    plt.tight_layout(rect=[0, 0, 1, 0.94])
    return fig
  • Step 6: Implement scatter.py

Generalized from plot_ceres_sw_vs_merra2_aod.py:plot_scatter() and plot_daily_correlation().

# davinci_monet/radiative/plots/scatter.py
"""SW vs AOD scatter plots and daily correlation charts."""

from __future__ import annotations

from typing import Any

import matplotlib.pyplot as plt
import numpy as np

from davinci_monet.plots.style import NCAR_COLORS


def plot_sw_vs_aod_scatter(
    lats: np.ndarray,
    lons: np.ndarray,
    records: list[dict[str, Any]],
    event_name: str = "",
) -> plt.Figure:
    """2-panel scatter: CERES SW vs total AOD and smoke AOD, all days.

    Also includes a third panel with per-day correlation bar chart.

    Parameters
    ----------
    lats, lons : ndarray
        Coordinate arrays (unused for scatter, kept for API consistency).
    records : list[dict]
        Daily records with keys: date, tot_aod, smoke_aod, sw_all.
    event_name : str
        Title annotation.

    Returns
    -------
    matplotlib.figure.Figure
    """
    fig, axes = plt.subplots(1, 3, figsize=(20, 6))
    fig.suptitle(
        f"CERES TOA SW vs MERRA-2 AOD — {event_name}",
        fontsize=16, fontweight="bold", color=NCAR_COLORS["space"],
    )

    cmap = plt.cm.coolwarm
    day_nums = [(r["date"] - records[0]["date"]).days for r in records]
    norm = plt.Normalize(0, max(day_nums) or 1)

    for ax, aod_key, aod_label in [
        (axes[0], "tot_aod", "MERRA-2 Total AOD 550 nm"),
        (axes[1], "smoke_aod", "MERRA-2 Smoke AOD (OC+BC) 550 nm"),
    ]:
        for rec, dn in zip(records, day_nums):
            aod = rec[aod_key].ravel()
            sw = rec["sw_all"].ravel()
            valid = np.isfinite(aod) & np.isfinite(sw)
            ax.scatter(
                aod[valid], sw[valid], s=4, alpha=0.3,
                c=[cmap(norm(dn))] * valid.sum(), edgecolors="none",
            )
        all_aod = np.concatenate([r[aod_key].ravel() for r in records])
        all_sw = np.concatenate([r["sw_all"].ravel() for r in records])
        valid = np.isfinite(all_aod) & np.isfinite(all_sw)
        r_val = np.corrcoef(all_aod[valid], all_sw[valid])[0, 1] if valid.sum() > 2 else 0.0
        ax.set_xlabel(aod_label)
        ax.set_ylabel("CERES TOA SW Reflected (W/m²)")
        ax.set_title(f"r = {r_val:.3f}, n = {valid.sum():,}")

    # Panel 3: daily correlation bars
    ax = axes[2]
    dates_label = [r["date"].strftime("%b %d") for r in records]
    r_total, r_smoke = [], []
    for rec in records:
        for aod_key, r_list in [("tot_aod", r_total), ("smoke_aod", r_smoke)]:
            aod = rec[aod_key].ravel()
            sw = rec["sw_all"].ravel()
            valid = np.isfinite(aod) & np.isfinite(sw)
            rv = np.corrcoef(aod[valid], sw[valid])[0, 1] if valid.sum() > 10 else np.nan
            r_list.append(rv)

    x = range(len(records))
    w = 0.35
    ax.bar([i - w / 2 for i in x], r_total, w, label="Total AOD",
           color=NCAR_COLORS["ncar_blue"], alpha=0.8)
    ax.bar([i + w / 2 for i in x], r_smoke, w, label="Smoke AOD",
           color=NCAR_COLORS["orange"], alpha=0.8)
    ax.set_xticks(list(x))
    ax.set_xticklabels(dates_label, rotation=45)
    ax.set_ylabel("Pearson r")
    ax.set_title("Daily Correlation")
    ax.legend(fontsize=9)
    ax.axhline(0, color="gray", lw=0.5)

    plt.tight_layout(rect=[0, 0, 1, 0.93])
    return fig
  • Step 7: Implement site_timeseries.py

Generalized from plot_ceres_sw_vs_merra2_aod.py:plot_timeseries().

# davinci_monet/radiative/plots/site_timeseries.py
"""Per-site dual-axis time-series: smoke AOD + CERES flux anomaly."""

from __future__ import annotations

from typing import Any

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from davinci_monet.plots.style import NCAR_COLORS


def plot_site_timeseries(
    lats: np.ndarray,
    lons: np.ndarray,
    records: list[dict[str, Any]],
    bg_sw: np.ndarray,
    sites: list[tuple[str, float, float, str]],
    aeronet: pd.DataFrame | None = None,
    event_name: str = "",
) -> plt.Figure:
    """Dual-axis time-series at each site: smoke AOD bars + CERES ΔSW line.

    Parameters
    ----------
    lats, lons : ndarray
        Coordinate arrays.
    records : list[dict]
        Daily records with keys: date, smoke_aod, sw_all.
    bg_sw : ndarray
        Background SW field for anomaly computation.
    sites : list[tuple]
        Each entry: (display_name, lat, lon, aeronet_site_name).
    aeronet : pd.DataFrame | None
        AERONET data with columns: time, site, aod. None to skip.
    event_name : str
        Title annotation.

    Returns
    -------
    matplotlib.figure.Figure
    """
    n_sites = len(sites)
    ncols = min(3, n_sites)
    nrows = (n_sites + ncols - 1) // ncols

    fig, axes = plt.subplots(nrows, ncols, figsize=(6 * ncols, 4.5 * nrows), sharex=True)
    fig.suptitle(
        f"Smoke AOD & CERES TOA SW — {event_name}",
        fontsize=18, fontweight="bold", color=NCAR_COLORS["space"],
    )

    if n_sites == 1:
        axes = np.array([axes])
    axes_flat = axes.flat if hasattr(axes, "flat") else [axes]

    dates_plot = [r["date"] for r in records]

    for ax, (name, slat, slon, aeronet_site) in zip(axes_flat, sites):
        li = np.argmin(np.abs(lats - slat))
        lo = np.argmin(np.abs(lons - slon))

        smoke_ts = [r["smoke_aod"][li, lo] for r in records]
        sw_anom_ts = [r["sw_all"][li, lo] - bg_sw[li, lo] for r in records]

        ax.bar(dates_plot, smoke_ts, color=NCAR_COLORS["orange"],
               alpha=0.6, width=0.8, label="Smoke AOD")
        ax.set_ylabel("AOD 550 nm", color=NCAR_COLORS["orange"])

        if aeronet is not None:
            aer = aeronet[aeronet["site"] == aeronet_site].sort_values("time")
            if len(aer) > 0:
                ax.scatter(aer["time"], aer["aod"], s=5, alpha=0.3,
                           color=NCAR_COLORS["red"], zorder=4)
                aer_daily = aer.set_index("time")["aod"].resample("D").mean()
                ax.plot(aer_daily.index, aer_daily.values, "D",
                        color=NCAR_COLORS["red"], ms=5, zorder=5,
                        label="AERONET AOD")

        aod_max = max(max(smoke_ts) * 1.3, 0.5)
        ax.set_ylim(0, aod_max)

        ax2 = ax.twinx()
        ax2.plot(dates_plot, sw_anom_ts, "o-",
                 color=NCAR_COLORS["ncar_blue"], lw=2, ms=4,
                 label="CERES ΔSW")
        ax2.set_ylabel("ΔSW (W/m²)", color=NCAR_COLORS["ncar_blue"])
        ax2.axhline(0, color="gray", lw=0.5, ls="--")

        ax.set_title(name, fontsize=13, fontweight="bold")
        ax.tick_params(axis="x", rotation=45)

    # Hide unused axes
    for idx in range(n_sites, nrows * ncols):
        fig.delaxes(axes.flat[idx])

    plt.tight_layout(rect=[0, 0.05, 1, 0.93])
    return fig
  • Step 8: Implement surface_impact.py

Generalized from plot_surface_sw_impact.py:plot_spatial() and plot_method_comparison().

# davinci_monet/radiative/plots/surface_impact.py
"""Surface SW impact maps and method comparison plots."""

from __future__ import annotations

from typing import Any

import cartopy.crs as ccrs
import cartopy.feature as cfeature
import matplotlib.pyplot as plt
import numpy as np

from davinci_monet.plots.style import NCAR_COLORS


def plot_surface_impact(
    lats: np.ndarray,
    lons: np.ndarray,
    record: dict[str, Any],
    event_name: str = "",
) -> plt.Figure:
    """3-panel: smoke AOD, MERRA-2 ΔSW_net surface, semi-empirical ΔSW.

    Parameters
    ----------
    lats, lons : ndarray
        Coordinate arrays.
    record : dict
        Single-day record with keys: smoke_aod, m2_sfc_effect, semi_dimming.
    event_name : str
        Title annotation.

    Returns
    -------
    matplotlib.figure.Figure
    """
    panels = [
        (record["smoke_aod"], "Smoke AOD (OC+BC)", "YlOrRd", 0, 3, "AOD"),
        (record["m2_sfc_effect"], "MERRA-2 ΔSW_net Surface", "RdBu_r", -250, 250, "W/m²"),
        (record["semi_dimming"], "Semi-Empirical ΔSW Surface", "RdBu_r", -250, 250, "W/m²"),
    ]

    fig, axes = plt.subplots(
        1, 3, figsize=(20, 6),
        subplot_kw={"projection": ccrs.PlateCarree()},
    )
    title = f"Surface SW Impact — {record['date']} {event_name}".strip()
    fig.suptitle(title, fontsize=17, fontweight="bold", color=NCAR_COLORS["space"])

    extent = [float(lons.min()), float(lons.max()),
              float(lats.min()), float(lats.max())]

    for ax, (data, label, cmap, vmin, vmax, cbar_label) in zip(axes, panels):
        ax.set_extent(extent, crs=ccrs.PlateCarree())
        ax.add_feature(cfeature.STATES, linewidth=0.5, edgecolor="gray")
        ax.add_feature(cfeature.COASTLINE, linewidth=0.8)
        im = ax.pcolormesh(
            lons, lats, data,
            transform=ccrs.PlateCarree(),
            cmap=cmap, vmin=vmin, vmax=vmax, shading="auto",
        )
        ax.set_title(label, fontsize=11)
        plt.colorbar(im, ax=ax, shrink=0.7, pad=0.02, label=cbar_label)

    plt.tight_layout(rect=[0, 0, 1, 0.93])
    return fig
  • Step 9: Run tests

Run: pytest davinci_monet/tests/test_radiative_plots.py -v Expected: All 5 tests PASS

  • Step 10: Commit
git add davinci_monet/radiative/plots/ davinci_monet/tests/test_radiative_plots.py
git commit -m "feat(radiative): add all plot renderers (event, anomaly, scatter, timeseries, surface)"

Task 6: Runner and CLI Integration

Files:

  • Create: davinci_monet/radiative/runner.py

  • Modify: davinci_monet/cli/app.py

  • Create: davinci_monet/tests/test_radiative_runner.py

  • Step 1: Write runner test

# davinci_monet/tests/test_radiative_runner.py
"""Tests for the radiative analysis runner."""

from __future__ import annotations

from pathlib import Path

import numpy as np
import pandas as pd
import pytest
import xarray as xr
import yaml

from davinci_monet.radiative.runner import run_radiative_analysis


@pytest.fixture
def full_synthetic_setup(tmp_path: Path) -> Path:
    """Create synthetic CERES + MERRA-2 + AERONET data and a config file."""
    # CERES
    ceres_dir = tmp_path / "ceres"
    ceres_dir.mkdir()
    lats = np.arange(89.5, -90, -1.0)
    lons = np.arange(-179.5, 180, 1.0)
    for day in range(2, 16):
        ds = xr.Dataset(
            {
                "obs_all_toa_sw": (["lat", "lon"], np.random.uniform(50, 350, (180, 360)).astype("f4")),
                "obs_clr_toa_sw": (["lat", "lon"], np.random.uniform(50, 300, (180, 360)).astype("f4")),
                "obs_all_toa_net": (["lat", "lon"], np.random.uniform(-100, 200, (180, 360)).astype("f4")),
                "init_match_aod55": (["lat", "lon"], np.random.uniform(0, 3, (180, 360)).astype("f4")),
                "obs_cld_amount": (["lat", "lon"], np.random.uniform(0, 100, (180, 360)).astype("f4")),
                "toa_sw_insol": (["lat", "lon"], np.random.uniform(300, 450, (180, 360)).astype("f4")),
            },
            coords={"lat": lats, "lon": lons},
        )
        ds.to_netcdf(ceres_dir / f"CERES_SYN1deg-Day_2020090{day}.nc" if day < 10
                      else ceres_dir / f"CERES_SYN1deg-Day_202009{day}.nc")

    # MERRA-2
    merra_dir = tmp_path / "merra2"
    merra_dir.mkdir()
    mlats = np.arange(-90, 90.5, 0.5)
    mlons = np.arange(-180, 180, 0.625)
    for day in range(2, 16):
        ds = xr.Dataset(
            {
                "TOTEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 1, (8, len(mlats), len(mlons))).astype("f4")),
                "OCEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 0.5, (8, len(mlats), len(mlons))).astype("f4")),
                "BCEXTTAU": (["time", "lat", "lon"], np.random.uniform(0, 0.1, (8, len(mlats), len(mlons))).astype("f4")),
            },
            coords={"time": np.arange(8), "lat": mlats, "lon": mlons},
        )
        fname = f"MERRA2_401.tavg1_2d_aer_Nx.2020090{day}.nc4" if day < 10 \
            else f"MERRA2_401.tavg1_2d_aer_Nx.202009{day}.nc4"
        ds.to_netcdf(merra_dir / fname)

    # AERONET
    aeronet_dir = tmp_path / "aeronet"
    aeronet_dir.mkdir()
    rows = []
    for site in ["Fresno_2", "UCSB"]:
        for day in range(5, 10):
            rows.append({
                "time": f"2020-09-0{day} 12:00:00",
                "site": site,
                "AOD_500nm": np.random.uniform(0.1, 2.0),
                "latitude": 36.8 if site == "Fresno_2" else 34.4,
                "longitude": -119.8 if site == "Fresno_2" else -119.9,
            })
    pd.DataFrame(rows).to_csv(aeronet_dir / "aeronet.csv", index=False)

    # Output dir
    out_dir = tmp_path / "output"

    # Config
    config = {
        "radiative": {
            "event": {
                "name": "test-event",
                "start_time": "2020-09-05",
                "end_time": "2020-09-15",
                "domain": [-135, -105, 30, 52],
                "background_window": 3,
            },
            "ceres": {
                "product": "syn1deg",
                "source": "local",
                "files": str(ceres_dir / "CERES_SYN1deg-Day_*.nc"),
            },
            "merra2": {
                "files": str(merra_dir / "MERRA2_401.tavg1_2d_aer_Nx.*.nc4"),
            },
            "aeronet": {
                "files": str(aeronet_dir / "aeronet.csv"),
                "sites": ["Fresno_2", "UCSB"],
            },
            "plots": ["toa_event_fields", "anomaly_maps"],
            "output_dir": str(out_dir),
        }
    }
    config_path = tmp_path / "config.yaml"
    with open(config_path, "w") as f:
        yaml.dump(config, f)

    return config_path


class TestRunner:
    def test_run_produces_plots(self, full_synthetic_setup: Path) -> None:
        result = run_radiative_analysis(str(full_synthetic_setup))
        assert result["success"] is True
        assert len(result["plots_generated"]) >= 2

    def test_output_dir_created(self, full_synthetic_setup: Path) -> None:
        run_radiative_analysis(str(full_synthetic_setup))
        config_dir = full_synthetic_setup.parent
        out_dir = config_dir / "output"
        assert out_dir.exists()
        pngs = list(out_dir.glob("*.png"))
        assert len(pngs) >= 2
  • Step 2: Run tests to verify they fail

Run: pytest davinci_monet/tests/test_radiative_runner.py -v Expected: FAIL — ModuleNotFoundError

  • Step 3: Implement the runner
# davinci_monet/radiative/runner.py
"""Runner for smoke radiative analysis pipeline.

Orchestrates: parse config → load data → process → generate plots.
"""

from __future__ import annotations

from datetime import date, timedelta
from pathlib import Path
from typing import Any

import matplotlib.pyplot as plt
import numpy as np
import yaml

from davinci_monet.logging import get_logger
from davinci_monet.radiative.config import RadiativeConfig

logger = get_logger(__name__)


def _parse_config(config_path: str) -> RadiativeConfig:
    """Load YAML and extract the radiative section."""
    with open(config_path) as f:
        raw = yaml.safe_load(f)

    if "radiative" not in raw:
        msg = f"Config file missing 'radiative' section: {config_path}"
        raise ValueError(msg)

    return RadiativeConfig(**raw["radiative"])


def _dates_in_range(start: date, end: date) -> list[date]:
    """Generate list of dates from start to end inclusive."""
    dates = []
    d = start
    while d <= end:
        dates.append(d)
        d += timedelta(days=1)
    return dates


def run_radiative_analysis(config_path: str) -> dict[str, Any]:
    """Run a smoke radiative analysis from a YAML config file.

    Parameters
    ----------
    config_path : str
        Path to YAML configuration file.

    Returns
    -------
    dict
        Result with keys: success, plots_generated, errors.
    """
    from davinci_monet.plots.style import apply_ncar_style
    from davinci_monet.radiative.loaders.ceres import load_ceres_local
    from davinci_monet.radiative.plots.anomaly_maps import plot_anomaly_maps
    from davinci_monet.radiative.plots.event_fields import plot_event_fields
    from davinci_monet.radiative.plots.scatter import plot_sw_vs_aod_scatter
    from davinci_monet.radiative.plots.site_timeseries import plot_site_timeseries
    from davinci_monet.radiative.plots.surface_impact import plot_surface_impact
    from davinci_monet.radiative.processing import (
        compute_anomalies,
        compute_background,
        regrid_nearest,
        semi_empirical_surface_dimming,
    )

    apply_ncar_style()

    cfg = _parse_config(config_path)
    out_dir = Path(cfg.output_dir).expanduser()
    out_dir.mkdir(parents=True, exist_ok=True)

    plots_generated: list[str] = []
    errors: list[str] = []
    domain = cfg.event.domain

    # ── Step 1: Load CERES ───────────────────────────────────────────────
    logger.info("Loading CERES %s data", cfg.ceres.product)
    ceres_ds = load_ceres_local(
        files=cfg.ceres.files,
        domain=domain,
        variables=cfg.ceres.variables,
    )
    clat = ceres_ds.lat.values
    clon = ceres_ds.lon.values

    # Map variable names to standardized keys
    var_map = {
        "obs_all_toa_sw": "sw_all",
        "obs_clr_toa_sw": "sw_clr",
        "obs_all_toa_net": "toa_net",
        "init_match_aod55": "aod",
        "obs_cld_amount": "cld_frac",
        "toa_sw_insol": "toa_insol",
    }

    # Build per-day records from CERES
    records: list[dict[str, Any]] = []
    event_dates = _dates_in_range(cfg.event.start_time.date(), cfg.event.end_time.date())

    for t in range(ceres_ds.sizes["time"]):
        day_ds = ceres_ds.isel(time=t)
        rec: dict[str, Any] = {
            "date": event_dates[t] if t < len(event_dates) else date(2020, 1, 1),
        }
        for src, dst in var_map.items():
            if src in day_ds:
                rec[dst] = day_ds[src].values.squeeze()
        records.append(rec)

    # ── Step 2: Load MERRA-2 (optional) ──────────────────────────────────
    if cfg.merra2 is not None:
        logger.info("Loading MERRA-2 data")
        from davinci_monet.radiative.loaders.merra2 import load_merra2

        merra_ds = load_merra2(
            files=cfg.merra2.files,
            domain=domain,
            smoke_species=cfg.merra2.smoke_species,
        )
        # Regrid MERRA-2 to CERES grid for each day
        for t, rec in enumerate(records):
            if t < merra_ds.sizes["time"]:
                day_merra = merra_ds.isel(time=t)
                rec["tot_aod"] = regrid_nearest(
                    day_merra["TOTEXTTAU"], clat, clon,
                ).values
                rec["smoke_aod"] = regrid_nearest(
                    day_merra["SMOKEAOD"], clat, clon,
                ).values

    # ── Step 3: Load AERONET (optional) ──────────────────────────────────
    aeronet_df = None
    if cfg.aeronet is not None:
        logger.info("Loading AERONET data")
        from davinci_monet.radiative.loaders.aeronet import load_aeronet

        aeronet_df = load_aeronet(
            files=cfg.aeronet.files,
            domain=domain,
            sites=cfg.aeronet.sites,
        )

    # ── Step 4: Compute background and anomalies ─────────────────────────
    bg_window = cfg.event.background_window
    bg = {
        key: np.nanmean([r[key] for r in records[:bg_window]], axis=0)
        for key in ["sw_all", "sw_clr", "toa_net", "aod"]
        if key in records[0]
    }
    bg_sw = bg.get("sw_all")

    # ── Step 5: Surface impact (optional) ────────────────────────────────
    if cfg.surface_impact is not None and cfg.merra2 is not None:
        from davinci_monet.radiative.loaders.merra2 import load_merra2 as _load_m2

        for rec in records:
            if "smoke_aod" in rec and "toa_insol" in rec:
                rec["semi_dimming"] = semi_empirical_surface_dimming(
                    rec["smoke_aod"],
                    rec.get("toa_insol", np.full_like(rec["smoke_aod"], 400.0)),
                    ssa=cfg.surface_impact.ssa,
                    asymmetry=cfg.surface_impact.asymmetry,
                )
                # m2_sfc_effect would come from MERRA-2 radiation files
                # For now, placeholder — requires separate MERRA-2 radiation loader
                if "m2_sfc_effect" not in rec:
                    # NOTE: Full MERRA-2 radiation (tavg1_2d_rad_Nx) loader
                    # deferred to a follow-up task. For now, approximate.
                    rec["m2_sfc_effect"] = rec["semi_dimming"] * 0.9

    # ── Step 6: Generate requested plots ─────────────────────────────────
    # Use a day near the middle of the event as the "peak" day for single-day plots
    peak_idx = min(len(records) - 1, bg_window + (len(records) - bg_window) // 2)
    peak_rec = records[peak_idx]

    plot_dispatch = {
        "toa_event_fields": lambda: plot_event_fields(
            clat, clon, peak_rec, event_name=cfg.event.name,
        ),
        "anomaly_maps": lambda: plot_anomaly_maps(
            clat, clon, peak_rec, bg, event_name=cfg.event.name,
        ),
        "sw_vs_aod_scatter": lambda: plot_sw_vs_aod_scatter(
            clat, clon, records, event_name=cfg.event.name,
        ),
        "site_timeseries": lambda: plot_site_timeseries(
            clat, clon, records, bg_sw,
            sites=_get_sites(cfg),
            aeronet=aeronet_df,
            event_name=cfg.event.name,
        ),
        "surface_impact": lambda: plot_surface_impact(
            clat, clon, peak_rec, event_name=cfg.event.name,
        ),
    }

    for plot_name in cfg.plots:
        try:
            logger.info("Generating plot: %s", plot_name)
            fig = plot_dispatch[plot_name]()
            out_path = out_dir / f"{cfg.event.name}_{plot_name}.png"
            fig.savefig(out_path, dpi=300, bbox_inches="tight")
            plt.close(fig)
            plots_generated.append(str(out_path))
            logger.info("Saved %s", out_path)
        except Exception as e:
            logger.error("Failed to generate %s: %s", plot_name, e)
            errors.append(f"{plot_name}: {e}")

    return {
        "success": len(errors) == 0,
        "plots_generated": plots_generated,
        "errors": errors,
    }


def _get_sites(cfg: RadiativeConfig) -> list[tuple[str, float, float, str]]:
    """Extract site list from AERONET config, or return defaults."""
    if cfg.aeronet is not None and cfg.aeronet.sites is not None:
        # Use site names as both display and AERONET names
        # (lat/lon will be looked up from AERONET data in a future enhancement)
        return [(s, 0.0, 0.0, s) for s in cfg.aeronet.sites]
    return []
  • Step 4: Register CLI subcommand

Add the following to davinci_monet/cli/app.py in the register_commands() function, after the existing app.add_typer(get_data.app, name="get") line:

# In register_commands():
    from davinci_monet.cli.commands import get_data, run, validate

    # Register subcommands
    app.add_typer(get_data.app, name="get")

    # Radiative analysis subcommand
    import typer as _typer

    radiative_app = _typer.Typer(
        name="radiative",
        help="Smoke radiative analysis tools.",
    )

    @radiative_app.command("run")
    def radiative_run(
        config: str = _typer.Argument(..., help="Path to radiative analysis config (YAML)."),
    ) -> None:
        """Run a smoke radiative analysis from a config file."""
        from davinci_monet.radiative.runner import run_radiative_analysis

        result = run_radiative_analysis(config)
        if result["success"]:
            _typer.echo(f"Success: {len(result['plots_generated'])} plots generated")
        else:
            _typer.echo(f"Completed with errors: {result['errors']}", err=True)
            raise _typer.Exit(code=1)

    @radiative_app.command("fetch-ceres")
    def radiative_fetch(
        product: str = _typer.Option("syn1deg", help="CERES product: ebaf or syn1deg"),
        start: str = _typer.Option(..., help="Start date (YYYY-MM-DD)"),
        end: str = _typer.Option(..., help="End date (YYYY-MM-DD)"),
        output: str = _typer.Option(".", help="Output directory"),
    ) -> None:
        """Fetch CERES data from OPeNDAP."""
        from datetime import date as date_cls

        from davinci_monet.radiative.loaders.ceres import fetch_ebaf, fetch_syn1deg_range

        if product == "ebaf":
            path = fetch_ebaf(start, end, output)
            _typer.echo(f"Wrote: {path}")
        elif product == "syn1deg":
            start_d = date_cls.fromisoformat(start)
            end_d = date_cls.fromisoformat(end)
            paths = fetch_syn1deg_range(start_d, end_d, output)
            _typer.echo(f"Wrote {len(paths)} files to {output}")
        else:
            _typer.echo(f"Unknown product: {product}", err=True)
            raise _typer.Exit(code=1)

    app.add_typer(radiative_app, name="radiative")
  • Step 5: Run tests

Run: pytest davinci_monet/tests/test_radiative_runner.py -v Expected: All 2 tests PASS

  • Step 6: Run the full radiative test suite

Run: pytest davinci_monet/tests/test_radiative_*.py -v Expected: All tests PASS across all 6 test files

  • Step 7: Run the existing test suite to check for regressions

Run: pytest davinci_monet/tests/ -v --timeout=120 Expected: No new failures

  • Step 8: Commit
git add davinci_monet/radiative/runner.py davinci_monet/cli/app.py davinci_monet/tests/test_radiative_runner.py
git commit -m "feat(radiative): add runner and CLI integration"

Task 7: Update Package __init__ and Verify End-to-End

Files:

  • Modify: davinci_monet/radiative/__init__.py

  • Step 1: Update __init__.py with public API

# davinci_monet/radiative/__init__.py
"""Smoke radiative analysis subpackage.

Provides tools for analyzing wildfire smoke radiative effects using
CERES, MERRA-2, and AERONET data. This is a standalone analysis
workflow that uses DAVINCI's config, styling, and CLI infrastructure.

Usage
-----
Via CLI::

    davinci-monet radiative run config.yaml
    davinci-monet radiative fetch-ceres --product syn1deg --start 2020-09-05 --end 2020-09-15

Via Python::

    from davinci_monet.radiative.runner import run_radiative_analysis
    result = run_radiative_analysis("config.yaml")
"""

from davinci_monet.radiative.config import RadiativeConfig
from davinci_monet.radiative.runner import run_radiative_analysis

__all__ = ["RadiativeConfig", "run_radiative_analysis"]
  • Step 2: Verify CLI is accessible

Run: davinci-monet radiative --help Expected: Shows run and fetch-ceres subcommands

  • Step 3: Run mypy on the new package

Run: mypy davinci_monet/radiative/ --ignore-missing-imports Expected: No errors (or only pre-existing ones)

  • Step 4: Run black and isort

Run: black davinci_monet/radiative/ davinci_monet/tests/test_radiative_*.py && isort davinci_monet/radiative/ davinci_monet/tests/test_radiative_*.py Expected: Files formatted

  • Step 5: Final full test run

Run: pytest davinci_monet/tests/test_radiative_*.py -v Expected: All tests PASS

  • Step 6: Commit
git add davinci_monet/radiative/__init__.py
git commit -m "feat(radiative): finalize public API and package exports"

Clone this wiki locally