Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions src/anthias_server/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,21 @@
# kill-versus-catch rationale as the on-demand probe limits above.
ASSET_REVALIDATION_SOFT_TIME_LIMIT_S = ASSET_REVALIDATION_TIME_LIMIT_S - 60

# Time budget for the lightweight periodic pokes — the display-power
# CEC query and the telemetry POST. Both ran under a bare
# ``time_limit=30`` and share the asset probe's failure mode: the CEC
# query can wedge inside libcec, and the telemetry POST can stall in
# getaddrinfo against a broken resolver (requests' ``timeout`` covers
# connect/read but not DNS). Tripping the *hard* limit SIGKILLs the
# pool child, which Sentry groups by the kill signature regardless of
# task — so these two kept the ANTHIAS-A / ANTHIAS-9 / ANTHIAS-B trio
# alive after #3017 fixed only the asset probe. The soft limit raises
# ``SoftTimeLimitExceeded`` *inside* the task so it logs and skips the
# tick cleanly; the hard limit stays as the backstop for a call stuck
# in C code where the soft signal can't be delivered.
PERIODIC_POKE_SOFT_TIME_LIMIT_S = 30
PERIODIC_POKE_TIME_LIMIT_S = 60

# Redis key for the sweep singleton lock. Whoever sets it first runs
# the sweep; later beat ticks observe the key and exit. The TTL matches
# the time_limit so a worker that crashes mid-sweep doesn't lock the
Expand Down Expand Up @@ -288,7 +303,10 @@ def setup_periodic_tasks(sender: Any, **kwargs: Any) -> None:
)


@celery.task(time_limit=30)
@celery.task(
soft_time_limit=PERIODIC_POKE_SOFT_TIME_LIMIT_S,
time_limit=PERIODIC_POKE_TIME_LIMIT_S,
)
def get_display_power() -> None:
# diagnostics.get_display_power() returns ``str | bool`` (bool for
# a clean CEC True/False, str for the error fallbacks). redis-py
Expand All @@ -299,13 +317,42 @@ def get_display_power() -> None:
# passes the value through, so 'True'/'False'/'CEC error' all fit
# — and the on/off state now actually populates instead of only
# the error cases ever landing.
r.set('display_power', str(diagnostics.get_display_power()))
r.expire('display_power', 3600)
try:
# Single SET with ex= so the value and its TTL are written
# atomically — a soft-limit signal landing between a separate
# SET and EXPIRE would otherwise leave the key without a TTL
# (a stale display_power that never expires).
r.set('display_power', str(diagnostics.get_display_power()), ex=3600)
except SoftTimeLimitExceeded:
# The CEC query is meant to be bounded by its own
# subprocess timeout, but a child wedged in libcec can keep
# the pipe open past it. Skip this tick rather than let the
# hard limit SIGKILL the worker (ANTHIAS-A / 9 / B); the next
# beat tick re-queries.
logging.warning(
'get_display_power: CEC query exceeded %ss; skipping this tick',
PERIODIC_POKE_SOFT_TIME_LIMIT_S,
)


@celery.task(time_limit=30)
@celery.task(
soft_time_limit=PERIODIC_POKE_SOFT_TIME_LIMIT_S,
time_limit=PERIODIC_POKE_TIME_LIMIT_S,
)
def send_telemetry_task() -> None:
send_telemetry()
try:
send_telemetry()
except SoftTimeLimitExceeded:
# requests' timeout doesn't cover a getaddrinfo stall against
# a broken resolver, so the POST can outlive the soft budget.
# Skip this tick instead of being SIGKILLed by the hard limit
# (ANTHIAS-A / 9 / B); send_telemetry didn't set its cooldown,
# so the next beat tick retries.
logging.warning(
'send_telemetry_task: telemetry POST exceeded %ss; '
'skipping this tick',
PERIODIC_POKE_SOFT_TIME_LIMIT_S,
)
Comment thread
vpetersson marked this conversation as resolved.


@celery.task
Expand Down
8 changes: 6 additions & 2 deletions src/anthias_server/lib/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def send_telemetry() -> bool:
logging.debug('Telemetry POST failed: %s', exc)
return False

r.set(TELEMETRY_COOLDOWN_KEY, '1')
r.expire(TELEMETRY_COOLDOWN_KEY, TELEMETRY_COOLDOWN_TTL)
# Single SET with ex= so the value and its TTL are written
# atomically — send_telemetry_task now runs under a soft time
# limit, and a SoftTimeLimitExceeded landing between a separate
# SET and EXPIRE would leave the cooldown key without a TTL,
# silencing telemetry permanently.
r.set(TELEMETRY_COOLDOWN_KEY, '1', ex=TELEMETRY_COOLDOWN_TTL)
return True
48 changes: 46 additions & 2 deletions tests/test_celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest import mock

import pytest
from celery.exceptions import SoftTimeLimitExceeded

import anthias_server.celery_tasks as celery_tasks_module
from anthias_server.app.models import Asset
Expand Down Expand Up @@ -193,11 +194,13 @@ def test_get_display_power_writes_redis_as_str(
):
get_display_power.apply()

fake_redis.set.assert_called_once_with('display_power', expected)
# Value and TTL are written atomically in a single SET (ex=) so a
# soft-limit signal can't leave the key without a TTL.
fake_redis.set.assert_called_once_with('display_power', expected, ex=3600)
fake_redis.expire.assert_not_called()
# No bool ever reaches redis — guards against the DataError.
written = fake_redis.set.call_args.args[1]
assert isinstance(written, str)
fake_redis.expire.assert_called_once_with('display_power', 3600)


def test_send_telemetry_task_dispatches() -> None:
Expand Down Expand Up @@ -1767,6 +1770,47 @@ def test_sweep_soft_limit_aborts_and_releases_lock(
assert celery_tasks_module.r.get(ASSET_REVALIDATION_LOCK_KEY) is None


class TestPeriodicPokeTimeLimits:
"""The lightweight periodic pokes (display-power CEC query,
telemetry POST) share the asset probe's SIGKILL failure mode and so
fed the same ANTHIAS-A / ANTHIAS-9 / ANTHIAS-B group. They must
catch the soft limit and skip the tick instead of tripping the hard
limit, which SIGKILLs the pool child."""

def test_tasks_have_soft_limit_headroom(self) -> None:
for task in (get_display_power, send_telemetry_task):
soft = task.soft_time_limit
hard = task.time_limit
assert soft == celery_tasks_module.PERIODIC_POKE_SOFT_TIME_LIMIT_S
assert hard == celery_tasks_module.PERIODIC_POKE_TIME_LIMIT_S
assert soft is not None
assert hard is not None
assert soft < hard

def test_display_power_soft_limit_skips_tick(self) -> None:
fake_redis = mock.MagicMock()
with (
mock.patch.object(celery_tasks_module, 'r', fake_redis),
mock.patch(
'anthias_server.celery_tasks.diagnostics.get_display_power',
side_effect=SoftTimeLimitExceeded,
),
):
result = get_display_power.apply()
# Caught inside the task — no failure propagates to Sentry.
assert result.successful()
fake_redis.set.assert_not_called()

def test_telemetry_soft_limit_skips_tick(self) -> None:
with mock.patch.object(
celery_tasks_module,
'send_telemetry',
side_effect=SoftTimeLimitExceeded,
):
result = send_telemetry_task.apply()
assert result.successful()


class TestWaitForMigrations:
"""Startup gate for Sentry ANTHIAS-1 — the worker must not consume
tasks while the server's migrate/dbrestore pass is still rewriting
Expand Down
9 changes: 6 additions & 3 deletions tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def settings_data() -> dict[str, Any]:
def fake_redis(redis_data: dict[str, str]) -> MagicMock:
fake = MagicMock()
fake.get.side_effect = redis_data.get
fake.set.side_effect = lambda key, value: redis_data.__setitem__(
fake.set.side_effect = lambda key, value, ex=None: redis_data.__setitem__(
key, value
)
fake.expire.side_effect = lambda key, _ttl: None
Expand Down Expand Up @@ -165,9 +165,12 @@ def test_sets_cooldown_after_success(
) -> None:
telemetry.send_telemetry()
assert telemetry.TELEMETRY_COOLDOWN_KEY in redis_data
fake_redis.expire.assert_any_call(
# Value and TTL are written in a single atomic SET so a soft time
# limit can't strand the cooldown key without an expiry.
fake_redis.set.assert_any_call(
telemetry.TELEMETRY_COOLDOWN_KEY,
telemetry.TELEMETRY_COOLDOWN_TTL,
'1',
ex=telemetry.TELEMETRY_COOLDOWN_TTL,
)


Expand Down