diff --git a/CHANGELOG.md b/CHANGELOG.md index bef908a..8d45b19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,24 @@ and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.ht ## [Unreleased] +### Added — observability (layer 1) + +- `Timer.exception_count`, `Timer.last_exception`, `Timer.last_exception_at` + — cumulative-across-restarts target-exception telemetry. Atomic-read + under the GIL, polling-friendly for Prometheus exporters or sync + `/metrics` endpoints. +- Every library-emitted log call now carries `extra={"event": + "async_timer.", ...}` structured fields. JSON-log handlers + (Datadog, Splunk, journald, structlog) read them as queryable + attributes without parsing the message string. Events: + `async_timer.target_exception`, `async_timer.fixed_rate_skip`, + `async_timer.cancel_aws_raised`, `async_timer.subscription_drop`, + `async_timer.group_cancel_failure`. Human-readable messages are + unchanged — console UX is the same. +- The default `exc_cb` now uses the per-timer logger + (`async_timer.timer.`) instead of the module logger, so named + timers can be filtered separately at the handler. + ## [1.3.0] - 2026-06-07 ### Added — exception hierarchy diff --git a/README.md b/README.md index 9708723..cc18be0 100644 --- a/README.md +++ b/README.md @@ -293,6 +293,34 @@ async def test_periodic_refresh(): Same surface as `Timer` — `join`, `wait`, `trigger`, `subscribe`, `TimerGroup`, decorator wrapping all work the same way. +## Observability + +Every Timer exposes read-only telemetry attributes (atomic-read under the +GIL — safe to poll from any thread, including a Prometheus exporter): + +| Attribute | Meaning | +| --- | --- | +| `timer.hit_count` | Successful ticks completed | +| `timer.last_result` / `last_tick_at` | Latest tick value + `time.monotonic()` | +| `timer.exception_count` | Target exceptions raised (cumulative across restarts) | +| `timer.last_exception` / `last_exception_at` | Most recent target error + `time.monotonic()` | +| `subscription.qsize` / `dropped_count` | Per-consumer buffer depth + cumulative drops | + +Every library-emitted log record carries `extra={"event": "async_timer.", ...}` +structured fields — JSON-log handlers (Datadog, Splunk, journald, structlog) +get queryable attributes without parsing the message string: + +| `event` | Fields | +| --- | --- | +| `async_timer.target_exception` | `timer_name`, `target`, `hit_count` | +| `async_timer.fixed_rate_skip` | `skipped_ticks`, `delay_s`, `behind_s` | +| `async_timer.cancel_aws_raised` | `exception_type` | +| `async_timer.subscription_drop` | `subscription_name`, `maxsize`, `dropped_count` | +| `async_timer.group_cancel_failure` | `group_name`, `timer_name`, `exception_type` | + +Named timers (`Timer(..., name="cache")`) scope their logger to +`async_timer.timer.cache` — handy for per-timer log filtering. + ## Exceptions All library-raised errors derive from `async_timer.TimerError`, which diff --git a/docs/badges/tests.svg b/docs/badges/tests.svg index 1f8973b..9a039ca 100644 --- a/docs/badges/tests.svg +++ b/docs/badges/tests.svg @@ -5,7 +5,7 @@ width="70.0" height="20" role="img" - aria-label="tests: 240" + aria-label="tests: 247" > - tests: 240 + tests: 247 @@ -42,8 +42,8 @@ - 240 - 240 + 247 + 247 diff --git a/src/async_timer/group.py b/src/async_timer/group.py index 0877273..856c325 100644 --- a/src/async_timer/group.py +++ b/src/async_timer/group.py @@ -156,6 +156,12 @@ async def cancel_all(self): "TimerGroup: cancelling %r raised %s", timer, type(result).__name__, + extra={ + "event": "async_timer.group_cancel_failure", + "group_name": self.name, + "timer_name": timer.name, + "exception_type": type(result).__name__, + }, exc_info=result, ) diff --git a/src/async_timer/pacemaker.py b/src/async_timer/pacemaker.py index f5d9723..2869830 100644 --- a/src/async_timer/pacemaker.py +++ b/src/async_timer/pacemaker.py @@ -84,6 +84,10 @@ def _on_cancel_fut_done(self, fut: asyncio.Future): "cancel_aws awaitable %r raised %s; treating as stop signal", fut, exc, + extra={ + "event": "async_timer.cancel_aws_raised", + "exception_type": type(exc).__name__, + }, exc_info=exc, ) self.stop() @@ -172,12 +176,19 @@ def _compute_fixed_rate_wait(self) -> float: target_index += 1 next_tick_at = self._start_time + target_index * self.delay if skipped: + behind_s = now - (next_tick_at - skipped * self.delay) logger.warning( "fixed_rate pacemaker fell behind: skipping %d tick(s) " "(delay=%.3fs, behind by %.3fs)", skipped, self.delay, - now - (next_tick_at - skipped * self.delay), + behind_s, + extra={ + "event": "async_timer.fixed_rate_skip", + "skipped_ticks": skipped, + "delay_s": self.delay, + "behind_s": behind_s, + }, ) self._tick_number = target_index wait_for = next_tick_at - now diff --git a/src/async_timer/subscription.py b/src/async_timer/subscription.py index c64c102..e412b98 100644 --- a/src/async_timer/subscription.py +++ b/src/async_timer/subscription.py @@ -92,6 +92,12 @@ def _push_value(self, value: T) -> None: self._name or "", self._maxsize, self.dropped_count, + extra={ + "event": "async_timer.subscription_drop", + "subscription_name": self._name, + "maxsize": self._maxsize, + "dropped_count": self.dropped_count, + }, ) self._queue.put_nowait(value) diff --git a/src/async_timer/timer.py b/src/async_timer/timer.py index 36a12db..b69848c 100644 --- a/src/async_timer/timer.py +++ b/src/async_timer/timer.py @@ -98,9 +98,17 @@ def _noop_cb(*_, **__): pass -def _default_main_loop_exception_callback(*_, **__): - """Default exc_cb: log the target exception. Does not re-raise.""" - logger.exception("An unexpected exception in the timer loop.") +def _default_main_loop_exception_callback(timer, target): + """Default exc_cb: log the target exception via the timer's logger.""" + timer._logger.exception( + "An unexpected exception in the timer loop.", + extra={ + "event": "async_timer.target_exception", + "timer_name": timer.name, + "target": repr(target), + "hit_count": timer.hit_count, + }, + ) class Timer(typing.Generic[T]): @@ -127,6 +135,12 @@ class Timer(typing.Generic[T]): cancel_callback: TimerCallbackT[T] last_result: typing.Optional[T] = None last_tick_at: typing.Optional[float] = None # time.monotonic() of last tick + # Target-exception telemetry. Cumulative across restarts (matches + # hit_count semantics). Updated before `exc_cb` fires, so the callback + # sees the post-increment values. Reads are atomic under the GIL. + exception_count: int = 0 + last_exception: typing.Optional[BaseException] = None + last_exception_at: typing.Optional[float] = None # time.monotonic() # WeakSet — dropped subscriptions get GC'd and auto-removed. _subscriptions: "weakref.WeakSet[Subscription[T]]" # Bound at start(); used by *_threadsafe methods to marshal calls @@ -340,6 +354,11 @@ async def _loop_callback_routine(self): except StopAsyncIteration: break except Exception as err: + # Record telemetry before exc_cb fires, so the callback + # observes the post-increment values. + self.exception_count += 1 + self.last_exception = err + self.last_exception_at = time.monotonic() self.result_fanout.send_exception(err) # Snapshot: WeakSet may shrink mid-iter if a sub is GC'd. for sub in list(self._subscriptions): diff --git a/tests/async_timer/test_observability.py b/tests/async_timer/test_observability.py new file mode 100644 index 0000000..c04a3b2 --- /dev/null +++ b/tests/async_timer/test_observability.py @@ -0,0 +1,170 @@ +"""Tests for layer-1 observability additions. + +Covers: + * `Timer.exception_count` / `last_exception` / `last_exception_at` + counters wired into the loop's exception path. + * `extra={"event": "async_timer.*", ...}` payload on every WARNING / + EXCEPTION log call — JSON-log handlers read these as structured + fields without parsing message strings. +""" + +import asyncio +import logging +import time + +import pytest + +import async_timer +from async_timer.subscription import Subscription + + +@pytest.mark.asyncio +async def test_exception_telemetry_is_recorded(): + """One target exception → counter=1, last_exception set, timestamp set.""" + err = RuntimeError("boom") + + def explode(): + raise err + + t0 = time.monotonic() + timer: async_timer.Timer = async_timer.Timer(0.001, explode, start=True) + with pytest.raises(RuntimeError, match="boom"): + await timer.wait() # FanoutRv re-raises the sticky target exception + await timer.cancel() + assert timer.exception_count == 1 + assert timer.last_exception is err + assert timer.last_exception_at is not None + assert timer.last_exception_at >= t0 + + +@pytest.mark.asyncio +async def test_exception_telemetry_cumulative_across_restarts(): + """Counters accumulate across cancel/restart — same semantics as hit_count.""" + err1 = RuntimeError("first") + err2 = RuntimeError("second") + errors = iter([err1, err2]) + + def explode(): + raise next(errors) + + timer: async_timer.Timer = async_timer.Timer(0.001, explode) + timer.start() + with pytest.raises(RuntimeError, match="first"): + await timer.wait() + await timer.cancel() + assert timer.exception_count == 1 + assert timer.last_exception is err1 + + timer.start() # restart + with pytest.raises(RuntimeError, match="second"): + await timer.wait() + await timer.cancel() + assert timer.exception_count == 2 + assert timer.last_exception is err2 + + +def test_exception_telemetry_defaults_zero_before_start(): + """A freshly-constructed Timer has the default zero/None values.""" + timer: async_timer.Timer = async_timer.Timer(1.0, lambda: 42) + assert timer.exception_count == 0 + assert timer.last_exception is None + assert timer.last_exception_at is None + + +@pytest.mark.asyncio +async def test_fixed_rate_skip_has_structured_extras(caplog): + """The fall-behind warning carries `event`, `skipped_ticks`, `delay_s`, + `behind_s` as structured fields so JSON-log handlers don't have to + regex the message string.""" + import async_timer.pacemaker as pacemaker + + pm = pacemaker.TimerPacemaker(delay=0.001, mode="fixed_rate") + pm._start_time = time.monotonic() - 5.0 # five-second backlog + pm._tick_number = 0 + + caplog.set_level(logging.WARNING, logger="async_timer.pacemaker") + pm._compute_fixed_rate_wait() + + skip_records = [ + r + for r in caplog.records + if getattr(r, "event", None) == "async_timer.fixed_rate_skip" + ] + assert len(skip_records) == 1 + rec = skip_records[0] + assert rec.skipped_ticks >= 1 + assert rec.delay_s == pytest.approx(0.001) + assert rec.behind_s > 0 + + +@pytest.mark.asyncio +async def test_cancel_aws_raise_has_structured_extras(caplog): + """`cancel_aws` exception warning carries `event` + `exception_type`.""" + import async_timer.pacemaker as pacemaker + + cancel_fut: asyncio.Future = asyncio.Future() + pm = pacemaker.TimerPacemaker(delay=10e-5) + pm.stop_on([cancel_fut]) + + caplog.set_level(logging.WARNING, logger="async_timer.pacemaker") + iter_count = 0 + async for _ in pm: + iter_count += 1 + if iter_count == 1: + cancel_fut.set_exception(RuntimeError("from-test")) + + raise_records = [ + r + for r in caplog.records + if getattr(r, "event", None) == "async_timer.cancel_aws_raised" + ] + assert len(raise_records) == 1 + assert raise_records[0].exception_type == "RuntimeError" + + +def test_subscription_drop_has_structured_extras(caplog): + """The bounded-queue drop warning carries `event` + `subscription_name` + + `dropped_count`.""" + caplog.set_level(logging.WARNING, logger="async_timer.subscription") + sub: Subscription[int] = Subscription(maxsize=2, name="metrics") + for v in range(5): + sub._push_value(v) + + drop_records = [ + r + for r in caplog.records + if getattr(r, "event", None) == "async_timer.subscription_drop" + ] + assert drop_records, "expected at least one drop warning" + last = drop_records[-1] + assert last.subscription_name == "metrics" + assert last.maxsize == 2 + assert last.dropped_count >= 1 + + +@pytest.mark.asyncio +async def test_default_exc_cb_uses_named_logger_and_extras(caplog): + """Naming a Timer scopes the exception log to `async_timer.timer.` + and the record carries `event`, `timer_name`, `hit_count` extras.""" + + def explode(): + raise ValueError("named-boom") + + caplog.set_level(logging.ERROR) + t: async_timer.Timer = async_timer.Timer( + 0.001, explode, name="cache-warmup", start=True + ) + with pytest.raises(ValueError, match="named-boom"): + await t.wait() + await t.cancel() + + matching = [ + r + for r in caplog.records + if getattr(r, "event", None) == "async_timer.target_exception" + ] + assert matching, "default exc_cb should emit a structured-extras record" + rec = matching[0] + assert rec.timer_name == "cache-warmup" + assert rec.name == "async_timer.timer.cache-warmup" + assert rec.hit_count == t.hit_count