Skip to content
This repository was archived by the owner on Apr 8, 2026. It is now read-only.

Commit 7bccfa9

Browse files
committed
feat(telemetry): add lane.open and lane.close events
Add explicit lane lifecycle telemetry variants and session tracer helpers so lane open/close activity can be recorded with structured payloads and mirrored into session trace records. Constraint: Keep telemetry crate changes backward-compatible for existing HTTP and analytics event consumers Rejected: Reuse generic analytics events for lane lifecycle | loses dedicated typed telemetry variants Confidence: high Scope-risk: narrow Reversibility: clean Directive: Preserve lane_open/lane_close trace names and lane_id attribute keys unless all downstream consumers are updated together Tested: cargo build --workspace; cargo test --workspace Not-tested: Runtime wiring that emits lane open/close events from higher-level crates
1 parent 476b03e commit 7bccfa9

File tree

1 file changed

+103
-5
lines changed
  • rust/crates/telemetry/src

1 file changed

+103
-5
lines changed

rust/crates/telemetry/src/lib.rs

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,18 @@ pub enum TelemetryEvent {
198198
#[serde(default, skip_serializing_if = "Map::is_empty")]
199199
attributes: Map<String, Value>,
200200
},
201+
LaneOpen {
202+
session_id: String,
203+
lane_id: String,
204+
#[serde(default, skip_serializing_if = "Map::is_empty")]
205+
attributes: Map<String, Value>,
206+
},
207+
LaneClose {
208+
session_id: String,
209+
lane_id: String,
210+
#[serde(default, skip_serializing_if = "Map::is_empty")]
211+
attributes: Map<String, Value>,
212+
},
201213
Analytics(AnalyticsEvent),
202214
SessionTrace(SessionTraceRecord),
203215
}
@@ -394,6 +406,26 @@ impl SessionTracer {
394406
self.record("http_request_failed", trace_attributes);
395407
}
396408

409+
pub fn record_lane_open(&self, lane_id: impl Into<String>, attributes: Map<String, Value>) {
410+
let lane_id = lane_id.into();
411+
self.sink.record(TelemetryEvent::LaneOpen {
412+
session_id: self.session_id.clone(),
413+
lane_id: lane_id.clone(),
414+
attributes: attributes.clone(),
415+
});
416+
self.record("lane_open", merge_lane_trace_fields(lane_id, attributes));
417+
}
418+
419+
pub fn record_lane_close(&self, lane_id: impl Into<String>, attributes: Map<String, Value>) {
420+
let lane_id = lane_id.into();
421+
self.sink.record(TelemetryEvent::LaneClose {
422+
session_id: self.session_id.clone(),
423+
lane_id: lane_id.clone(),
424+
attributes: attributes.clone(),
425+
});
426+
self.record("lane_close", merge_lane_trace_fields(lane_id, attributes));
427+
}
428+
397429
pub fn record_analytics(&self, event: AnalyticsEvent) {
398430
let mut attributes = event.properties.clone();
399431
attributes.insert(
@@ -418,6 +450,14 @@ fn merge_trace_fields(
418450
attributes
419451
}
420452

453+
fn merge_lane_trace_fields(
454+
lane_id: String,
455+
mut attributes: Map<String, Value>,
456+
) -> Map<String, Value> {
457+
attributes.insert("lane_id".to_string(), Value::String(lane_id));
458+
attributes
459+
}
460+
421461
fn current_timestamp_ms() -> u64 {
422462
SystemTime::now()
423463
.duration_since(UNIX_EPOCH)
@@ -477,6 +517,12 @@ mod tests {
477517
let sink = Arc::new(MemoryTelemetrySink::default());
478518
let tracer = SessionTracer::new("session-123", sink.clone());
479519

520+
let mut lane_open_attributes = Map::new();
521+
lane_open_attributes.insert("worker".to_string(), Value::String("worker-1".to_string()));
522+
tracer.record_lane_open("lane-42", lane_open_attributes);
523+
let mut lane_close_attributes = Map::new();
524+
lane_close_attributes.insert("status".to_string(), Value::String("completed".to_string()));
525+
tracer.record_lane_close("lane-42", lane_close_attributes);
480526
tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new());
481527
tracer.record_analytics(
482528
AnalyticsEvent::new("cli", "prompt_sent")
@@ -486,6 +532,32 @@ mod tests {
486532
let events = sink.events();
487533
assert!(matches!(
488534
&events[0],
535+
TelemetryEvent::LaneOpen {
536+
session_id,
537+
lane_id,
538+
..
539+
} if session_id == "session-123" && lane_id == "lane-42"
540+
));
541+
assert!(matches!(
542+
&events[1],
543+
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, attributes, .. })
544+
if name == "lane_open" && attributes.get("lane_id") == Some(&Value::String("lane-42".to_string()))
545+
));
546+
assert!(matches!(
547+
&events[2],
548+
TelemetryEvent::LaneClose {
549+
session_id,
550+
lane_id,
551+
..
552+
} if session_id == "session-123" && lane_id == "lane-42"
553+
));
554+
assert!(matches!(
555+
&events[3],
556+
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, attributes, .. })
557+
if name == "lane_close" && attributes.get("lane_id") == Some(&Value::String("lane-42".to_string()))
558+
));
559+
assert!(matches!(
560+
&events[4],
489561
TelemetryEvent::HttpRequestStarted {
490562
session_id,
491563
attempt: 1,
@@ -495,18 +567,44 @@ mod tests {
495567
} if session_id == "session-123" && method == "POST" && path == "/v1/messages"
496568
));
497569
assert!(matches!(
498-
&events[1],
499-
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. })
570+
&events[5],
571+
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 2, name, .. })
500572
if name == "http_request_started"
501573
));
502-
assert!(matches!(&events[2], TelemetryEvent::Analytics(_)));
574+
assert!(matches!(&events[6], TelemetryEvent::Analytics(_)));
503575
assert!(matches!(
504-
&events[3],
505-
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. })
576+
&events[7],
577+
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 3, name, .. })
506578
if name == "analytics"
507579
));
508580
}
509581

582+
#[test]
583+
fn jsonl_sink_persists_lane_events() {
584+
let path = std::env::temp_dir().join(format!(
585+
"telemetry-jsonl-lane-{}.log",
586+
current_timestamp_ms()
587+
));
588+
let sink = JsonlTelemetrySink::new(&path).expect("sink should create file");
589+
590+
sink.record(TelemetryEvent::LaneOpen {
591+
session_id: "session-123".to_string(),
592+
lane_id: "lane-42".to_string(),
593+
attributes: Map::new(),
594+
});
595+
sink.record(TelemetryEvent::LaneClose {
596+
session_id: "session-123".to_string(),
597+
lane_id: "lane-42".to_string(),
598+
attributes: Map::new(),
599+
});
600+
601+
let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable");
602+
assert!(contents.contains("\"type\":\"lane_open\""));
603+
assert!(contents.contains("\"type\":\"lane_close\""));
604+
605+
let _ = std::fs::remove_file(path);
606+
}
607+
510608
#[test]
511609
fn jsonl_sink_persists_events() {
512610
let path =

0 commit comments

Comments
 (0)