Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ and this project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.ht

## [Unreleased]

### Added — observability (layer 1)

- `Timer.exception_count`, `Timer.last_exception`, `Timer.last_exception_at`
— cumulative-across-restarts target-exception telemetry. Atomic-read
under the GIL, polling-friendly for Prometheus exporters or sync
`/metrics` endpoints.
- Every library-emitted log call now carries `extra={"event":
"async_timer.<kind>", ...}` structured fields. JSON-log handlers
(Datadog, Splunk, journald, structlog) read them as queryable
attributes without parsing the message string. Events:
`async_timer.target_exception`, `async_timer.fixed_rate_skip`,
`async_timer.cancel_aws_raised`, `async_timer.subscription_drop`,
`async_timer.group_cancel_failure`. Human-readable messages are
unchanged — console UX is the same.
- The default `exc_cb` now uses the per-timer logger
(`async_timer.timer.<name>`) instead of the module logger, so named
timers can be filtered separately at the handler.

## [1.3.0] - 2026-06-07

### Added — exception hierarchy
Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,34 @@ async def test_periodic_refresh():
Same surface as `Timer` — `join`, `wait`, `trigger`, `subscribe`,
`TimerGroup`, decorator wrapping all work the same way.

## Observability

Every Timer exposes read-only telemetry attributes (atomic-read under the
GIL — safe to poll from any thread, including a Prometheus exporter):

| Attribute | Meaning |
| --- | --- |
| `timer.hit_count` | Successful ticks completed |
| `timer.last_result` / `last_tick_at` | Latest tick value + `time.monotonic()` |
| `timer.exception_count` | Target exceptions raised (cumulative across restarts) |
| `timer.last_exception` / `last_exception_at` | Most recent target error + `time.monotonic()` |
| `subscription.qsize` / `dropped_count` | Per-consumer buffer depth + cumulative drops |

Every library-emitted log record carries `extra={"event": "async_timer.<kind>", ...}`
structured fields — JSON-log handlers (Datadog, Splunk, journald, structlog)
get queryable attributes without parsing the message string:

| `event` | Fields |
| --- | --- |
| `async_timer.target_exception` | `timer_name`, `target`, `hit_count` |
| `async_timer.fixed_rate_skip` | `skipped_ticks`, `delay_s`, `behind_s` |
| `async_timer.cancel_aws_raised` | `exception_type` |
| `async_timer.subscription_drop` | `subscription_name`, `maxsize`, `dropped_count` |
| `async_timer.group_cancel_failure` | `group_name`, `timer_name`, `exception_type` |

Named timers (`Timer(..., name="cache")`) scope their logger to
`async_timer.timer.cache` — handy for per-timer log filtering.

## Exceptions

All library-raised errors derive from `async_timer.TimerError`, which
Expand Down
8 changes: 4 additions & 4 deletions docs/badges/tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions src/async_timer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ async def cancel_all(self):
"TimerGroup: cancelling %r raised %s",
timer,
type(result).__name__,
extra={
"event": "async_timer.group_cancel_failure",
"group_name": self.name,
"timer_name": timer.name,
"exception_type": type(result).__name__,
},
exc_info=result,
)

Expand Down
13 changes: 12 additions & 1 deletion src/async_timer/pacemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ def _on_cancel_fut_done(self, fut: asyncio.Future):
"cancel_aws awaitable %r raised %s; treating as stop signal",
fut,
exc,
extra={
"event": "async_timer.cancel_aws_raised",
"exception_type": type(exc).__name__,
},
exc_info=exc,
)
self.stop()
Expand Down Expand Up @@ -172,12 +176,19 @@ def _compute_fixed_rate_wait(self) -> float:
target_index += 1
next_tick_at = self._start_time + target_index * self.delay
if skipped:
behind_s = now - (next_tick_at - skipped * self.delay)
logger.warning(
"fixed_rate pacemaker fell behind: skipping %d tick(s) "
"(delay=%.3fs, behind by %.3fs)",
skipped,
self.delay,
now - (next_tick_at - skipped * self.delay),
behind_s,
extra={
"event": "async_timer.fixed_rate_skip",
"skipped_ticks": skipped,
"delay_s": self.delay,
"behind_s": behind_s,
},
)
self._tick_number = target_index
wait_for = next_tick_at - now
Expand Down
6 changes: 6 additions & 0 deletions src/async_timer/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ def _push_value(self, value: T) -> None:
self._name or "<unnamed>",
self._maxsize,
self.dropped_count,
extra={
"event": "async_timer.subscription_drop",
"subscription_name": self._name,
"maxsize": self._maxsize,
"dropped_count": self.dropped_count,
},
)
self._queue.put_nowait(value)

Expand Down
25 changes: 22 additions & 3 deletions src/async_timer/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,17 @@ def _noop_cb(*_, **__):
pass


def _default_main_loop_exception_callback(*_, **__):
"""Default exc_cb: log the target exception. Does not re-raise."""
logger.exception("An unexpected exception in the timer loop.")
def _default_main_loop_exception_callback(timer, target):
"""Default exc_cb: log the target exception via the timer's logger."""
timer._logger.exception(
"An unexpected exception in the timer loop.",
extra={
"event": "async_timer.target_exception",
"timer_name": timer.name,
"target": repr(target),
"hit_count": timer.hit_count,
},
)


class Timer(typing.Generic[T]):
Expand All @@ -127,6 +135,12 @@ class Timer(typing.Generic[T]):
cancel_callback: TimerCallbackT[T]
last_result: typing.Optional[T] = None
last_tick_at: typing.Optional[float] = None # time.monotonic() of last tick
# Target-exception telemetry. Cumulative across restarts (matches
# hit_count semantics). Updated before `exc_cb` fires, so the callback
# sees the post-increment values. Reads are atomic under the GIL.
exception_count: int = 0
last_exception: typing.Optional[BaseException] = None
last_exception_at: typing.Optional[float] = None # time.monotonic()
# WeakSet — dropped subscriptions get GC'd and auto-removed.
_subscriptions: "weakref.WeakSet[Subscription[T]]"
# Bound at start(); used by *_threadsafe methods to marshal calls
Expand Down Expand Up @@ -340,6 +354,11 @@ async def _loop_callback_routine(self):
except StopAsyncIteration:
break
except Exception as err:
# Record telemetry before exc_cb fires, so the callback
# observes the post-increment values.
self.exception_count += 1
self.last_exception = err
self.last_exception_at = time.monotonic()
self.result_fanout.send_exception(err)
# Snapshot: WeakSet may shrink mid-iter if a sub is GC'd.
for sub in list(self._subscriptions):
Expand Down
170 changes: 170 additions & 0 deletions tests/async_timer/test_observability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""Tests for layer-1 observability additions.

Covers:
* `Timer.exception_count` / `last_exception` / `last_exception_at`
counters wired into the loop's exception path.
* `extra={"event": "async_timer.*", ...}` payload on every WARNING /
EXCEPTION log call — JSON-log handlers read these as structured
fields without parsing message strings.
"""

import asyncio
import logging
import time

import pytest

import async_timer
from async_timer.subscription import Subscription


@pytest.mark.asyncio
async def test_exception_telemetry_is_recorded():
"""One target exception → counter=1, last_exception set, timestamp set."""
err = RuntimeError("boom")

def explode():
raise err

t0 = time.monotonic()
timer: async_timer.Timer = async_timer.Timer(0.001, explode, start=True)
with pytest.raises(RuntimeError, match="boom"):
await timer.wait() # FanoutRv re-raises the sticky target exception
await timer.cancel()
assert timer.exception_count == 1
assert timer.last_exception is err
assert timer.last_exception_at is not None
assert timer.last_exception_at >= t0


@pytest.mark.asyncio
async def test_exception_telemetry_cumulative_across_restarts():
"""Counters accumulate across cancel/restart — same semantics as hit_count."""
err1 = RuntimeError("first")
err2 = RuntimeError("second")
errors = iter([err1, err2])

def explode():
raise next(errors)

timer: async_timer.Timer = async_timer.Timer(0.001, explode)
timer.start()
with pytest.raises(RuntimeError, match="first"):
await timer.wait()
await timer.cancel()
assert timer.exception_count == 1
assert timer.last_exception is err1

timer.start() # restart
with pytest.raises(RuntimeError, match="second"):
await timer.wait()
await timer.cancel()
assert timer.exception_count == 2
assert timer.last_exception is err2


def test_exception_telemetry_defaults_zero_before_start():
"""A freshly-constructed Timer has the default zero/None values."""
timer: async_timer.Timer = async_timer.Timer(1.0, lambda: 42)
assert timer.exception_count == 0
assert timer.last_exception is None
assert timer.last_exception_at is None


@pytest.mark.asyncio
async def test_fixed_rate_skip_has_structured_extras(caplog):
"""The fall-behind warning carries `event`, `skipped_ticks`, `delay_s`,
`behind_s` as structured fields so JSON-log handlers don't have to
regex the message string."""
import async_timer.pacemaker as pacemaker

pm = pacemaker.TimerPacemaker(delay=0.001, mode="fixed_rate")
pm._start_time = time.monotonic() - 5.0 # five-second backlog
pm._tick_number = 0

caplog.set_level(logging.WARNING, logger="async_timer.pacemaker")
pm._compute_fixed_rate_wait()

skip_records = [
r
for r in caplog.records
if getattr(r, "event", None) == "async_timer.fixed_rate_skip"
]
assert len(skip_records) == 1
rec = skip_records[0]
assert rec.skipped_ticks >= 1
assert rec.delay_s == pytest.approx(0.001)
assert rec.behind_s > 0


@pytest.mark.asyncio
async def test_cancel_aws_raise_has_structured_extras(caplog):
"""`cancel_aws` exception warning carries `event` + `exception_type`."""
import async_timer.pacemaker as pacemaker

cancel_fut: asyncio.Future = asyncio.Future()
pm = pacemaker.TimerPacemaker(delay=10e-5)
pm.stop_on([cancel_fut])

caplog.set_level(logging.WARNING, logger="async_timer.pacemaker")
iter_count = 0
async for _ in pm:
iter_count += 1
if iter_count == 1:
cancel_fut.set_exception(RuntimeError("from-test"))

raise_records = [
r
for r in caplog.records
if getattr(r, "event", None) == "async_timer.cancel_aws_raised"
]
assert len(raise_records) == 1
assert raise_records[0].exception_type == "RuntimeError"


def test_subscription_drop_has_structured_extras(caplog):
"""The bounded-queue drop warning carries `event` + `subscription_name`
+ `dropped_count`."""
caplog.set_level(logging.WARNING, logger="async_timer.subscription")
sub: Subscription[int] = Subscription(maxsize=2, name="metrics")
for v in range(5):
sub._push_value(v)

drop_records = [
r
for r in caplog.records
if getattr(r, "event", None) == "async_timer.subscription_drop"
]
assert drop_records, "expected at least one drop warning"
last = drop_records[-1]
assert last.subscription_name == "metrics"
assert last.maxsize == 2
assert last.dropped_count >= 1


@pytest.mark.asyncio
async def test_default_exc_cb_uses_named_logger_and_extras(caplog):
"""Naming a Timer scopes the exception log to `async_timer.timer.<name>`
and the record carries `event`, `timer_name`, `hit_count` extras."""

def explode():
raise ValueError("named-boom")

caplog.set_level(logging.ERROR)
t: async_timer.Timer = async_timer.Timer(
0.001, explode, name="cache-warmup", start=True
)
with pytest.raises(ValueError, match="named-boom"):
await t.wait()
await t.cancel()

matching = [
r
for r in caplog.records
if getattr(r, "event", None) == "async_timer.target_exception"
]
assert matching, "default exc_cb should emit a structured-extras record"
rec = matching[0]
assert rec.timer_name == "cache-warmup"
assert rec.name == "async_timer.timer.cache-warmup"
assert rec.hit_count == t.hit_count
Loading