diff --git a/src/anthias_server/celery_tasks.py b/src/anthias_server/celery_tasks.py index bedfcc939..9a77fd395 100755 --- a/src/anthias_server/celery_tasks.py +++ b/src/anthias_server/celery_tasks.py @@ -143,6 +143,21 @@ # kill-versus-catch rationale as the on-demand probe limits above. ASSET_REVALIDATION_SOFT_TIME_LIMIT_S = ASSET_REVALIDATION_TIME_LIMIT_S - 60 +# Time budget for the lightweight periodic pokes — the display-power +# CEC query and the telemetry POST. Both ran under a bare +# ``time_limit=30`` and share the asset probe's failure mode: the CEC +# query can wedge inside libcec, and the telemetry POST can stall in +# getaddrinfo against a broken resolver (requests' ``timeout`` covers +# connect/read but not DNS). Tripping the *hard* limit SIGKILLs the +# pool child, which Sentry groups by the kill signature regardless of +# task — so these two kept the ANTHIAS-A / ANTHIAS-9 / ANTHIAS-B trio +# alive after #3017 fixed only the asset probe. The soft limit raises +# ``SoftTimeLimitExceeded`` *inside* the task so it logs and skips the +# tick cleanly; the hard limit stays as the backstop for a call stuck +# in C code where the soft signal can't be delivered. +PERIODIC_POKE_SOFT_TIME_LIMIT_S = 30 +PERIODIC_POKE_TIME_LIMIT_S = 60 + # Redis key for the sweep singleton lock. Whoever sets it first runs # the sweep; later beat ticks observe the key and exit. The TTL matches # the time_limit so a worker that crashes mid-sweep doesn't lock the @@ -288,7 +303,10 @@ def setup_periodic_tasks(sender: Any, **kwargs: Any) -> None: ) -@celery.task(time_limit=30) +@celery.task( + soft_time_limit=PERIODIC_POKE_SOFT_TIME_LIMIT_S, + time_limit=PERIODIC_POKE_TIME_LIMIT_S, +) def get_display_power() -> None: # diagnostics.get_display_power() returns ``str | bool`` (bool for # a clean CEC True/False, str for the error fallbacks). redis-py @@ -299,13 +317,42 @@ def get_display_power() -> None: # passes the value through, so 'True'/'False'/'CEC error' all fit # — and the on/off state now actually populates instead of only # the error cases ever landing. - r.set('display_power', str(diagnostics.get_display_power())) - r.expire('display_power', 3600) + try: + # Single SET with ex= so the value and its TTL are written + # atomically — a soft-limit signal landing between a separate + # SET and EXPIRE would otherwise leave the key without a TTL + # (a stale display_power that never expires). + r.set('display_power', str(diagnostics.get_display_power()), ex=3600) + except SoftTimeLimitExceeded: + # The CEC query is meant to be bounded by its own + # subprocess timeout, but a child wedged in libcec can keep + # the pipe open past it. Skip this tick rather than let the + # hard limit SIGKILL the worker (ANTHIAS-A / 9 / B); the next + # beat tick re-queries. + logging.warning( + 'get_display_power: CEC query exceeded %ss; skipping this tick', + PERIODIC_POKE_SOFT_TIME_LIMIT_S, + ) -@celery.task(time_limit=30) +@celery.task( + soft_time_limit=PERIODIC_POKE_SOFT_TIME_LIMIT_S, + time_limit=PERIODIC_POKE_TIME_LIMIT_S, +) def send_telemetry_task() -> None: - send_telemetry() + try: + send_telemetry() + except SoftTimeLimitExceeded: + # requests' timeout doesn't cover a getaddrinfo stall against + # a broken resolver, so the POST can outlive the soft budget. + # Skip this tick instead of being SIGKILLed by the hard limit + # (ANTHIAS-A / 9 / B); send_telemetry didn't set its cooldown, + # so the next beat tick retries. + logging.warning( + 'send_telemetry_task: telemetry POST exceeded %ss; ' + 'skipping this tick', + PERIODIC_POKE_SOFT_TIME_LIMIT_S, + ) @celery.task diff --git a/src/anthias_server/lib/telemetry.py b/src/anthias_server/lib/telemetry.py index b3d475d3f..bd83dd076 100644 --- a/src/anthias_server/lib/telemetry.py +++ b/src/anthias_server/lib/telemetry.py @@ -119,6 +119,10 @@ def send_telemetry() -> bool: logging.debug('Telemetry POST failed: %s', exc) return False - r.set(TELEMETRY_COOLDOWN_KEY, '1') - r.expire(TELEMETRY_COOLDOWN_KEY, TELEMETRY_COOLDOWN_TTL) + # Single SET with ex= so the value and its TTL are written + # atomically — send_telemetry_task now runs under a soft time + # limit, and a SoftTimeLimitExceeded landing between a separate + # SET and EXPIRE would leave the cooldown key without a TTL, + # silencing telemetry permanently. + r.set(TELEMETRY_COOLDOWN_KEY, '1', ex=TELEMETRY_COOLDOWN_TTL) return True diff --git a/tests/test_celery_tasks.py b/tests/test_celery_tasks.py index b4732f675..fc1656feb 100644 --- a/tests/test_celery_tasks.py +++ b/tests/test_celery_tasks.py @@ -7,6 +7,7 @@ from unittest import mock import pytest +from celery.exceptions import SoftTimeLimitExceeded import anthias_server.celery_tasks as celery_tasks_module from anthias_server.app.models import Asset @@ -193,11 +194,13 @@ def test_get_display_power_writes_redis_as_str( ): get_display_power.apply() - fake_redis.set.assert_called_once_with('display_power', expected) + # Value and TTL are written atomically in a single SET (ex=) so a + # soft-limit signal can't leave the key without a TTL. + fake_redis.set.assert_called_once_with('display_power', expected, ex=3600) + fake_redis.expire.assert_not_called() # No bool ever reaches redis — guards against the DataError. written = fake_redis.set.call_args.args[1] assert isinstance(written, str) - fake_redis.expire.assert_called_once_with('display_power', 3600) def test_send_telemetry_task_dispatches() -> None: @@ -1767,6 +1770,47 @@ def test_sweep_soft_limit_aborts_and_releases_lock( assert celery_tasks_module.r.get(ASSET_REVALIDATION_LOCK_KEY) is None +class TestPeriodicPokeTimeLimits: + """The lightweight periodic pokes (display-power CEC query, + telemetry POST) share the asset probe's SIGKILL failure mode and so + fed the same ANTHIAS-A / ANTHIAS-9 / ANTHIAS-B group. They must + catch the soft limit and skip the tick instead of tripping the hard + limit, which SIGKILLs the pool child.""" + + def test_tasks_have_soft_limit_headroom(self) -> None: + for task in (get_display_power, send_telemetry_task): + soft = task.soft_time_limit + hard = task.time_limit + assert soft == celery_tasks_module.PERIODIC_POKE_SOFT_TIME_LIMIT_S + assert hard == celery_tasks_module.PERIODIC_POKE_TIME_LIMIT_S + assert soft is not None + assert hard is not None + assert soft < hard + + def test_display_power_soft_limit_skips_tick(self) -> None: + fake_redis = mock.MagicMock() + with ( + mock.patch.object(celery_tasks_module, 'r', fake_redis), + mock.patch( + 'anthias_server.celery_tasks.diagnostics.get_display_power', + side_effect=SoftTimeLimitExceeded, + ), + ): + result = get_display_power.apply() + # Caught inside the task — no failure propagates to Sentry. + assert result.successful() + fake_redis.set.assert_not_called() + + def test_telemetry_soft_limit_skips_tick(self) -> None: + with mock.patch.object( + celery_tasks_module, + 'send_telemetry', + side_effect=SoftTimeLimitExceeded, + ): + result = send_telemetry_task.apply() + assert result.successful() + + class TestWaitForMigrations: """Startup gate for Sentry ANTHIAS-1 — the worker must not consume tasks while the server's migrate/dbrestore pass is still rewriting diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index a4cf67208..a7ab01ac9 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -29,7 +29,7 @@ def settings_data() -> dict[str, Any]: def fake_redis(redis_data: dict[str, str]) -> MagicMock: fake = MagicMock() fake.get.side_effect = redis_data.get - fake.set.side_effect = lambda key, value: redis_data.__setitem__( + fake.set.side_effect = lambda key, value, ex=None: redis_data.__setitem__( key, value ) fake.expire.side_effect = lambda key, _ttl: None @@ -165,9 +165,12 @@ def test_sets_cooldown_after_success( ) -> None: telemetry.send_telemetry() assert telemetry.TELEMETRY_COOLDOWN_KEY in redis_data - fake_redis.expire.assert_any_call( + # Value and TTL are written in a single atomic SET so a soft time + # limit can't strand the cooldown key without an expiry. + fake_redis.set.assert_any_call( telemetry.TELEMETRY_COOLDOWN_KEY, - telemetry.TELEMETRY_COOLDOWN_TTL, + '1', + ex=telemetry.TELEMETRY_COOLDOWN_TTL, )