diff --git a/scripts/mosaic_day.py b/scripts/mosaic_day.py index bdc17d2..014430b 100644 --- a/scripts/mosaic_day.py +++ b/scripts/mosaic_day.py @@ -17,6 +17,7 @@ 6. Coadd with noise (1/σ²) weighting 7. Write per-window mosaic FITS: {date}_w{NN}_mosaic.fits """ + import argparse import dataclasses import glob @@ -29,6 +30,7 @@ import numpy as np from astropy.io import fits from astropy.wcs import WCS + # casa_tables (wraps casatools) is imported lazily inside functions that # need it so that the rest of this module remains importable in environments # where casatools is not installed (e.g. CI, unit tests with mocks). @@ -50,6 +52,7 @@ # ── Cal-path resolution (inlined to avoid dependency on untracked ensure.py) ─ + def _resolve_cal_table_paths(ms_dir: str, cal_date: str) -> tuple[str, str]: """Find bandpass and gain table paths for a cal date. @@ -65,6 +68,7 @@ def _resolve_cal_table_paths(ms_dir: str, cal_date: str) -> tuple[str, str]: # ── TileConfig: explicit pipeline configuration ───────────────────────────── + @dataclasses.dataclass(frozen=True) class TileConfig: """Immutable configuration for a single pipeline run. @@ -144,7 +148,9 @@ class TileResult: status: str # "imaged" | "cached" | "failed" fits_path: str | None = None - failed_stage: str | None = None # "phaseshift", "applycal", "allzero", "imaging", "no_output", "qa", "timeout" + failed_stage: str | None = ( + None # "phaseshift", "applycal", "allzero", "imaging", "no_output", "qa", "timeout" + ) error: str | None = None @property @@ -177,19 +183,33 @@ def from_dict(d: dict) -> "TileResult": # ── Helpers ────────────────────────────────────────────────────────────────── + def find_valid_ms(cfg: TileConfig) -> list[str]: """Return sorted list of valid (non-corrupt) raw MS paths for the configured date.""" from dsa110_continuum.adapters.casa_tables import table # noqa: PLC0415 + candidates = sorted(glob.glob(f"{cfg.ms_dir}/{cfg.date}T*.ms")) candidates = [p for p in candidates if "meridian" not in p and "flagversion" not in p] valid = [] for path in candidates: + field_path = os.path.join(path, "FIELD") + if not os.path.exists(field_path): + log.warning("Skipping corrupt MS (missing FIELD table): %s", path) + continue + if not os.path.isdir(field_path): + log.warning("Skipping corrupt MS (FIELD table is not a directory): %s", path) + continue try: - with table(path + "/FIELD", readonly=True, ack=False) as _: + with table(field_path, readonly=True, ack=False) as _: pass valid.append(path) - except Exception: - log.warning("Skipping corrupt MS (no FIELD table): %s", path) + except Exception as exc: + log.warning( + "Skipping corrupt MS (unreadable FIELD table: %s: %s): %s", + type(exc).__name__, + exc, + path, + ) log.info("Found %d valid MS files", len(valid)) return valid @@ -202,6 +222,7 @@ def get_meridian_path(ms_path: str) -> str: def needs_calibration(ms_path: str) -> bool: """Return True if CORRECTED_DATA doesn't exist or has ratio close to 1.""" from dsa110_continuum.adapters.casa_tables import table # noqa: PLC0415 + # NOTE: TypeError (e.g. wrong argument type) is intentionally NOT caught # here — it indicates a programming error and should propagate to the caller. try: @@ -224,6 +245,7 @@ def needs_calibration(ms_path: str) -> bool: def _ms_is_valid(path: str) -> bool: """Return True only if path looks like a complete CASA Measurement Set.""" import glob as _g + return ( os.path.isdir(path) and os.path.exists(os.path.join(path, "table.dat")) @@ -298,9 +320,7 @@ def validate_window_params(window_tiles: int, stride_tiles: int) -> list[str]: if stride_tiles < 1: errors.append(f"--stride-tiles must be >= 1, got {stride_tiles}") if stride_tiles > window_tiles and window_tiles >= 2: - errors.append( - f"--stride-tiles ({stride_tiles}) must be <= --window-tiles ({window_tiles})" - ) + errors.append(f"--stride-tiles ({stride_tiles}) must be <= --window-tiles ({window_tiles})") return errors @@ -364,7 +384,8 @@ def process_ms( if not applycal_needed and needs_calibration(meridian_ms): log.warning( "[%s] Applycal sentinel exists but ratio check disagrees — " - "trusting sentinel (use --force-recal to override)", tag + "trusting sentinel (use --force-recal to override)", + tag, ) if applycal_needed: if os.path.isfile(sentinel): @@ -384,6 +405,7 @@ def process_ms( # Verify CORRECTED_DATA isn't all zeros (silent applycal failure mode) try: from dsa110_continuum.adapters.casa_tables import table # noqa: PLC0415 + with table(meridian_ms, readonly=True, ack=False) as t: if "CORRECTED_DATA" in t.colnames(): cd = t.getcol("CORRECTED_DATA", nrow=2048) @@ -391,7 +413,9 @@ def process_ms( unflagged = cd[~fl] if len(unflagged) > 0 and np.all(np.abs(unflagged) < 1e-10): log.error("[%s] CORRECTED_DATA is all zeros after applycal", tag) - return TileResult("failed", failed_stage="allzero", error="CORRECTED_DATA all zeros") + return TileResult( + "failed", failed_stage="allzero", error="CORRECTED_DATA all zeros" + ) except (OSError, RuntimeError) as e: log.warning("[%s] Post-applycal check failed: %s — continuing", tag, e) @@ -429,7 +453,9 @@ def process_ms( log.info("[%s] PB-corrected image done: %s", tag, pbcor_fits) result_fits = pbcor_fits elif os.path.exists(image_fits): - log.warning("[%s] -image-pb.fits not found; falling back to plain image: %s", tag, image_fits) + log.warning( + "[%s] -image-pb.fits not found; falling back to plain image: %s", tag, image_fits + ) result_fits = image_fits else: log.error("[%s] WSClean finished but no image FITS found", tag) @@ -437,11 +463,16 @@ def process_ms( # ── Per-tile image QA ───────────────────────────────────────────────── from dsa110_continuum.validation.image_validator import validate_image_quality - tile_ok, tile_errors = validate_image_quality(Path(result_fits), min_snr=3.0, max_flagged_fraction=0.5) + + tile_ok, tile_errors = validate_image_quality( + Path(result_fits), min_snr=3.0, max_flagged_fraction=0.5 + ) if not tile_ok: for err in tile_errors: log.warning("[%s] Tile QA: %s", tag, err) - fatal = [e for e in tile_errors if "all zeros" in e.lower() or "no valid pixels" in e.lower()] + fatal = [ + e for e in tile_errors if "all zeros" in e.lower() or "no valid pixels" in e.lower() + ] if fatal: log.error("[%s] Tile rejected by image QA", tag) return TileResult("failed", failed_stage="qa", error="; ".join(fatal)) @@ -464,6 +495,7 @@ def process_ms( # ── Mosaicking ──────────────────────────────────────────────────────────────── + def _get_tile_center_ra(path: str) -> float: """Return the centre RA (deg) of a tile FITS image.""" with fits.open(path) as hdul: @@ -524,9 +556,7 @@ def build_common_wcs(fits_paths: list[str], margin_deg: float = 0.5) -> tuple[WC hdr = hdul[0].header wcs = WCS(hdr).celestial ny, nx = hdul[0].data.squeeze().shape - corners = wcs.pixel_to_world( - [0, nx - 1, 0, nx - 1], [0, 0, ny - 1, ny - 1] - ) + corners = wcs.pixel_to_world([0, nx - 1, 0, nx - 1], [0, 0, ny - 1, ny - 1]) ras = [c.ra.deg for c in corners] decs = [c.dec.deg for c in corners] all_ras.extend(ras) @@ -538,9 +568,9 @@ def build_common_wcs(fits_paths: list[str], margin_deg: float = 0.5) -> tuple[WC # crossing the 0°/360° boundary get a correct centre. The arithmetic # mean of [359°, 1°] is 180° (wrong); the circular mean is 0° (correct). ra_rad = np.deg2rad(all_ras) - mean_ra = float(np.rad2deg( - np.arctan2(np.mean(np.sin(ra_rad)), np.mean(np.cos(ra_rad))) - )) % 360.0 + mean_ra = ( + float(np.rad2deg(np.arctan2(np.mean(np.sin(ra_rad)), np.mean(np.cos(ra_rad))))) % 360.0 + ) shifted = np.array([(ra - mean_ra + 180.0) % 360.0 - 180.0 for ra in all_ras]) ra_min = mean_ra + float(shifted.min()) - margin_deg ra_max = mean_ra + float(shifted.max()) + margin_deg @@ -566,7 +596,13 @@ def build_common_wcs(fits_paths: list[str], margin_deg: float = 0.5) -> tuple[WC log.info( "Common WCS: RA %.2f–%.2f deg, Dec %.2f–%.2f deg, %d×%d px (center RA=%.2f)", - ra_min, ra_max, dec_min, dec_max, nx, ny, ra_center, + ra_min, + ra_max, + dec_min, + dec_max, + nx, + ny, + ra_center, ) return out_wcs, ny, nx @@ -615,13 +651,13 @@ def coadd_tiles(fits_paths: list[str], out_wcs: WCS, ny: int, nx: int) -> np.nda cy, cx = data.shape[0] // 2, data.shape[1] // 2 margin = 200 # pixels from center inner = data[ - max(0, cy - margin): cy + margin, - max(0, cx - margin): cx + margin, + max(0, cy - margin) : cy + margin, + max(0, cx - margin) : cx + margin, ] noise = np.nanstd(inner[np.isfinite(inner)]) if noise <= 0 or not np.isfinite(noise): noise = 1.0 # fallback unit weight - weight = 1.0 / (noise ** 2) + weight = 1.0 / (noise**2) # Reproject onto common grid try: @@ -642,8 +678,9 @@ def coadd_tiles(fits_paths: list[str], out_wcs: WCS, ny: int, nx: int) -> np.nda return mosaic -def write_mosaic(mosaic: np.ndarray, out_wcs: WCS, fits_paths: list[str], - output_path: str, date: str = "") -> str: +def write_mosaic( + mosaic: np.ndarray, out_wcs: WCS, fits_paths: list[str], output_path: str, date: str = "" +) -> str: """Write mosaic to FITS using a representative header from the first tile.""" out_path = output_path os.makedirs(os.path.dirname(out_path), exist_ok=True) @@ -680,7 +717,7 @@ def check_mosaic_quality(mosaic_path: str) -> bool: strip_height = ny // n_strips noises = [] for i in range(n_strips): - strip = data[i * strip_height:(i + 1) * strip_height, :] + strip = data[i * strip_height : (i + 1) * strip_height, :] valid = strip[np.isfinite(strip)] if len(valid) > 100: # Robust noise estimate via MAD @@ -704,6 +741,7 @@ def check_mosaic_quality(mosaic_path: str) -> bool: # ── Main ────────────────────────────────────────────────────────────────────── + def main(): """Parse CLI arguments and run the mosaic pipeline.""" parser = argparse.ArgumentParser( @@ -794,8 +832,10 @@ def main(): "To symlink from 2026-01-25, run:\n" " ln -s %s/2026-01-25T22:26:05_0~23.b %s\n" " ln -s %s/2026-01-25T22:26:05_0~23.g %s", - cfg.ms_dir, cfg.bp_table, - cfg.ms_dir, cfg.g_table, + cfg.ms_dir, + cfg.bp_table, + cfg.ms_dir, + cfg.g_table, ) sys.exit(1) @@ -817,8 +857,12 @@ def main(): if result.ok: tile_images.append(result.fits_path) else: - log.warning("Skipping failed MS (%s): %s — %s", - result.failed_stage, ms_path, result.error or "unknown") + log.warning( + "Skipping failed MS (%s): %s — %s", + result.failed_stage, + ms_path, + result.error or "unknown", + ) log.info("\n=== Processed %d/%d tiles successfully ===\n", len(tile_images), len(ms_list)) @@ -846,9 +890,10 @@ def main(): else: tile_ras = [_get_tile_center_ra(p) for p in strip_tiles] ra_rad = np.deg2rad(tile_ras) - strip_ra = float(np.rad2deg( - np.arctan2(np.mean(np.sin(ra_rad)), np.mean(np.cos(ra_rad))) - )) % 360.0 + strip_ra = ( + float(np.rad2deg(np.arctan2(np.mean(np.sin(ra_rad)), np.mean(np.cos(ra_rad))))) + % 360.0 + ) strip_out = cfg.mosaic_out.replace( ".fits", f"_ra{int(round(strip_ra)) % 360:03d}.fits" ) @@ -858,12 +903,17 @@ def main(): continue log.info( "\n=== [full-day] Building mosaic for strip %d (%d tiles) ===\n", - strip_idx, len(strip_tiles), + strip_idx, + len(strip_tiles), ) out_wcs, ny, nx = build_common_wcs(strip_tiles) mosaic_data = coadd_tiles(strip_tiles, out_wcs, ny, nx) strip_path = write_mosaic( - mosaic_data, out_wcs, strip_tiles, output_path=strip_out, date=cfg.date, + mosaic_data, + out_wcs, + strip_tiles, + output_path=strip_out, + date=cfg.date, ) mosaic_paths.append(strip_path) else: @@ -873,11 +923,14 @@ def main(): log.warning( "Only %d tiles available (< --window-tiles %d); " "producing single short-window mosaic", - len(tile_images), args.window_tiles, + len(tile_images), + args.window_tiles, ) log.info( "Science mode: %d window(s) of <=%d tiles, stride %d", - len(windows), args.window_tiles, args.stride_tiles, + len(windows), + args.window_tiles, + args.stride_tiles, ) for win_idx, win_tiles in enumerate(windows): strips = group_tiles_by_ra(win_tiles) @@ -885,16 +938,21 @@ def main(): if len(strip_tiles) < 2: log.warning( "Window %d strip %d has only %d tile — skipping", - win_idx, strip_idx, len(strip_tiles), + win_idx, + strip_idx, + len(strip_tiles), ) continue base = f"{cfg.date}_w{win_idx:02d}_mosaic" if len(strips) > 1: tile_ras = [_get_tile_center_ra(p) for p in strip_tiles] ra_rad = np.deg2rad(tile_ras) - strip_ra = float(np.rad2deg( - np.arctan2(np.mean(np.sin(ra_rad)), np.mean(np.cos(ra_rad))) - )) % 360.0 + strip_ra = ( + float( + np.rad2deg(np.arctan2(np.mean(np.sin(ra_rad)), np.mean(np.cos(ra_rad)))) + ) + % 360.0 + ) base += f"_ra{int(round(strip_ra)) % 360:03d}" win_out = os.path.join(cfg.image_dir, base + ".fits") if os.path.exists(win_out): @@ -903,12 +961,18 @@ def main(): continue log.info( "\n=== Building mosaic: window %d, strip %d (%d tiles) ===\n", - win_idx, strip_idx, len(strip_tiles), + win_idx, + strip_idx, + len(strip_tiles), ) out_wcs, ny, nx = build_common_wcs(strip_tiles) mosaic_data = coadd_tiles(strip_tiles, out_wcs, ny, nx) win_path = write_mosaic( - mosaic_data, out_wcs, strip_tiles, output_path=win_out, date=cfg.date, + mosaic_data, + out_wcs, + strip_tiles, + output_path=win_out, + date=cfg.date, ) mosaic_paths.append(win_path) @@ -962,12 +1026,11 @@ def _move_mosaic_to_products(cfg: TileConfig, *, full_day: bool) -> None: Skips with a warning if *products_dir* already exists. """ import glob as _g + if full_day: mosaic_files = sorted(_g.glob(os.path.join(cfg.image_dir, "full_mosaic*.fits"))) else: - mosaic_files = sorted( - _g.glob(os.path.join(cfg.image_dir, f"{cfg.date}_w*_mosaic*.fits")) - ) + mosaic_files = sorted(_g.glob(os.path.join(cfg.image_dir, f"{cfg.date}_w*_mosaic*.fits"))) if not mosaic_files: log.warning("Move skipped: no mosaic files found in %s", cfg.image_dir) return @@ -984,7 +1047,10 @@ def _move_mosaic_to_products(cfg: TileConfig, *, full_day: bool) -> None: mode_label = "full-day" if full_day else "science-window" log.info( "Archiving %d %s mosaic(s): %s → %s", - len(mosaic_files), mode_label, cfg.image_dir, cfg.products_dir, + len(mosaic_files), + mode_label, + cfg.image_dir, + cfg.products_dir, ) try: for src in mosaic_files: diff --git a/tests/test_mosaic_day_ms_discovery.py b/tests/test_mosaic_day_ms_discovery.py new file mode 100644 index 0000000..ddb6b00 --- /dev/null +++ b/tests/test_mosaic_day_ms_discovery.py @@ -0,0 +1,104 @@ +"""Tests for Measurement Set discovery diagnostics in mosaic_day.py.""" + +from __future__ import annotations + +import logging +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts")) +import mosaic_day +from mosaic_day import TileConfig + + +class _OpenableTable: + def __init__(self, tablename: str, readonly: bool = True, ack: bool = False): + self.tablename = tablename + self.readonly = readonly + self.ack = ack + + def __enter__(self): + return self + + def __exit__(self, *exc: object) -> None: + return None + + +class _FailingTable: + def __init__(self, *args: object, **kwargs: object): + raise RuntimeError("adapter unavailable") + + +def _cfg(tmp_path: Path) -> TileConfig: + return TileConfig( + date="2026-01-25", + ms_dir=str(tmp_path), + image_dir=str(tmp_path / "images"), + mosaic_out=str(tmp_path / "mosaic.fits"), + products_dir=str(tmp_path / "products"), + bp_table=str(tmp_path / "cal.b"), + g_table=str(tmp_path / "cal.g"), + ) + + +def _ms(tmp_path: Path, name: str, *, field: str = "dir") -> Path: + path = tmp_path / name + path.mkdir() + if field == "dir": + (path / "FIELD").mkdir() + elif field == "file": + (path / "FIELD").write_text("not a table") + elif field != "missing": + raise ValueError(f"unknown field mode: {field}") + return path + + +def test_find_valid_ms_returns_openable_field_table(tmp_path, monkeypatch): + ms_path = _ms(tmp_path, "2026-01-25T22:00:18.ms") + monkeypatch.setattr("dsa110_continuum.adapters.casa_tables.table", _OpenableTable) + + assert mosaic_day.find_valid_ms(_cfg(tmp_path)) == [str(ms_path)] + + +def test_find_valid_ms_reports_missing_field_table(tmp_path, monkeypatch, caplog): + _ms(tmp_path, "2026-01-25T22:00:18.ms", field="missing") + monkeypatch.setattr("dsa110_continuum.adapters.casa_tables.table", _OpenableTable) + + with caplog.at_level(logging.WARNING, logger="mosaic_day"): + assert mosaic_day.find_valid_ms(_cfg(tmp_path)) == [] + + assert "missing FIELD table" in caplog.text + + +def test_find_valid_ms_reports_field_table_open_failure(tmp_path, monkeypatch, caplog): + _ms(tmp_path, "2026-01-25T22:00:18.ms") + monkeypatch.setattr("dsa110_continuum.adapters.casa_tables.table", _FailingTable) + + with caplog.at_level(logging.WARNING, logger="mosaic_day"): + assert mosaic_day.find_valid_ms(_cfg(tmp_path)) == [] + + assert "unreadable FIELD table" in caplog.text + assert "RuntimeError: adapter unavailable" in caplog.text + + +def test_find_valid_ms_filters_meridian_and_flagversion_paths(tmp_path, monkeypatch): + valid_ms = _ms(tmp_path, "2026-01-25T22:00:18.ms") + _ms(tmp_path, "2026-01-25T22:05:28_meridian.ms") + _ms(tmp_path, "2026-01-25T22:10:37_flagversion.ms") + monkeypatch.setattr("dsa110_continuum.adapters.casa_tables.table", _OpenableTable) + + assert mosaic_day.find_valid_ms(_cfg(tmp_path)) == [str(valid_ms)] + + +def test_find_valid_ms_reports_field_path_that_is_not_directory( + tmp_path, + monkeypatch, + caplog, +): + _ms(tmp_path, "2026-01-25T22:00:18.ms", field="file") + monkeypatch.setattr("dsa110_continuum.adapters.casa_tables.table", _OpenableTable) + + with caplog.at_level(logging.WARNING, logger="mosaic_day"): + assert mosaic_day.find_valid_ms(_cfg(tmp_path)) == [] + + assert "FIELD table is not a directory" in caplog.text