This document summarizes the core data path through the ZMeta stack. The flow is the same whether packets arrive via HTTP, UDP, or one of the bundled simulators.
source -> ingest transport -> validation/adaptation -> dispatch -> rules/alerts -> WebSocket/UI & recorder
- Implemented in
backend/app/routes/ingest_http.py:13-47. - Optional shared-secret header/query enforced through
Settings.verify_shared_secret. - Payloads are forwarded to
ingest_payload(..., context="http")for normalization.
backend/app/udp.py:11-50wrapsasyncio.DatagramProtocol, pushes frames onto a bounded queue, and hands them toingest_payload(..., context="udp").- Back-pressure telemetry is surfaced via the metrics provider (
metrics.note_received,metrics.note_dropped).
- CLI simulators under
tools/simulators/speak the same HTTP/UDP contracts. - The NDJSON recorder feeds
scripts/replay.*, which POST the archived events back through the HTTP ingest route.
backend/app/ingest.py:24-44callsvalidate_or_adapt:- First tries
schemas.zmeta.ZMeta.model_validate. - Falls back to
tools.ingest_adapters.adapt_to_zmetawhen needed. - Manages auto-incrementing
sequencevalues viametrics.next_sequence().
- First tries
- Adapter usage is tracked with
metrics.note_adapter, while success/failure go into the metrics snapshot for/api/v1/healthz.
dispatch_zmetabroadcasts the validated packet to:backend/app/ws.hub.broadcast_text– feeds live WebSocket subscribers (/ws).tools.recorder.NDJSONRecorder.enqueue– appends todata/records/.
- Every accepted packet calls
metrics.note_validated()so health checks and log consumers can monitor throughput and last-packet age.
- Normalized payloads run through
tools.rules.rules.apply(...)(backend/app/ingest.py:52-66). - Each emitted alert passes through
backend/app/state.AlertDeduperbefore broadcast, preventing stormy duplicates (3s TTL by default). - When an alert survives dedupe, it is:
- JSON-serialized with
backend/app/json_utils.dumps. - Broadcast over the same WebSocket hub.
- Counted via
metrics.note_alert().
- JSON-serialized with
- The FastAPI WebSocket endpoint (
backend/app/routes/ws_routes.py:17-48) accepts clients, enforces the optional shared-secret, and then defers to the hub. WSHubnow uses timedqueue.putcalls with structured log warnings when clients fall behind (backend/app/ws.py:54-111).- Persistent slow consumers are disconnected once
max_backpressure_retriesis exceeded. - Drops and sends are reflected in
metrics.snapshot()and surfaced through/api/v1/healthz.
- Persistent slow consumers are disconnected once
- New sources: call
ingest_payloaddirectly (after populating aServicesbundle) or reuse the HTTP/UDP adapters. - New adapters: register in
tools/ingest_adapterssovalidate_or_adaptcan discover them automatically. - New rules: add YAML under
config/rules/and reload via the control scripts; alerts instantly inherit dedupe and broadcast behaviour. - Alternate storage/analytics: hook additional sinks inside
dispatch_zmeta(e.g., forward to Kafka) by extending theServicesdependency or by registering FastAPI dependencies that decorate the dispatch step.
backend/app/config.py– typed settings, shared-secret helpers, and URLs.backend/app/services.py– providesMetricsProvider, recorder, hub, and rules to the ingest path.backend/app/metrics.py– thread-safe counters/snapshots reported via/statusand/healthz.docs/(this file) – keep architecture notes here as the pipeline evolves.