From 1c0d88f37edbc4d0045e5a185c464dcaa0a1ded3 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Sat, 23 May 2026 15:23:56 -0600 Subject: [PATCH] =?UTF-8?q?feat(0.18.0):=20yank=20production-trace-sink=20?= =?UTF-8?q?=E2=80=94=20no=20live=20Langfuse,=20no=20production=20consumer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit createProductionTraceSink shipped sanitized OTLP to Langfuse and composed a canonical ProductionRunRecord. The four production consumers (tax-agent, legal-agent, creative-agent, gtm-agent) have all removed the runtime chat backend in favor of router/sandbox direct paths. Langfuse is not deployed. The sink has no caller — production or otherwise. - src/agent/production-trace-sink.ts: removed (306 LOC) - src/agent/index.ts: drop ProductionTraceSink, ProductionTraceSinkOpts, ProductionRunRecord, ProductionRunRecordStore, RecordFeedbackInput types + createProductionTraceSink export - tests/production-trace-sink.test.ts: removed (322 LOC) - examples/production-trace-sink/: removed - README + examples/README: dropped trace-sink entries Net: 741 lines deleted, 1 added. Typecheck + 129 tests pass. Build clean. Consumers pinned to ^0.17.2 stay on 0.17.x; the type/export removal in 0.18 is breaking but no current consumer references it. --- README.md | 1 - examples/README.md | 2 - examples/production-trace-sink/README.md | 28 -- .../production-trace-sink.ts | 72 ---- package.json | 2 +- src/agent/index.ts | 9 - src/agent/production-trace-sink.ts | 306 ----------------- tests/production-trace-sink.test.ts | 322 ------------------ 8 files changed, 1 insertion(+), 741 deletions(-) delete mode 100644 examples/production-trace-sink/README.md delete mode 100644 examples/production-trace-sink/production-trace-sink.ts delete mode 100644 src/agent/production-trace-sink.ts delete mode 100644 tests/production-trace-sink.test.ts diff --git a/README.md b/README.md index b4b19cf..8f6bd66 100644 --- a/README.md +++ b/README.md @@ -232,7 +232,6 @@ Runnable in [`examples/`](./examples/). Every example imports from - [`model-resolution/`](./examples/model-resolution/) — router catalog + fail-closed admission - [`agent-into-reviewer/`](./examples/agent-into-reviewer/) — pipe one runtime's stream into a reviewer agent - [`chat-handler/`](./examples/chat-handler/) — `handleChatTurn` (the centerpiece production pattern) -- [`production-trace-sink/`](./examples/production-trace-sink/) — `createProductionTraceSink` data capture ## Tests diff --git a/examples/README.md b/examples/README.md index ea65c15..58ccc15 100644 --- a/examples/README.md +++ b/examples/README.md @@ -16,7 +16,6 @@ which needs an `OPENAI_API_KEY`. | [`runtime-run/`](./runtime-run/) | `startRuntimeRun` + cost ledger + persistence adapter | | [`agent-into-reviewer/`](./agent-into-reviewer/) | Pipe one runtime's stream into a reviewer agent (the "2-runtime" pattern) | | [`chat-handler/`](./chat-handler/) | `handleChatTurn` — the centerpiece production chat handler | -| [`production-trace-sink/`](./production-trace-sink/) | `createProductionTraceSink` — production data capture (RunRecord + OTLP + feedback) | ## Conventions @@ -44,7 +43,6 @@ pnpm tsx examples/sandbox-stream-backend/sandbox-stream-backend.ts pnpm tsx examples/runtime-run/runtime-run.ts pnpm tsx examples/agent-into-reviewer/agent-into-reviewer.ts pnpm tsx examples/chat-handler/chat-handler.ts -pnpm tsx examples/production-trace-sink/production-trace-sink.ts # requires creds OPENAI_API_KEY=... pnpm tsx examples/openai-stream-backend/openai-stream-backend.ts diff --git a/examples/production-trace-sink/README.md b/examples/production-trace-sink/README.md deleted file mode 100644 index faa2e3f..0000000 --- a/examples/production-trace-sink/README.md +++ /dev/null @@ -1,28 +0,0 @@ -# Production trace sink - -The data-capture primitive every vertical agent's chat handler wires in -once. Until this existed, eval runs captured everything and production -captured nothing — RL training corpora, the analyst loop, and research -all ran on synthetic personas. - -What `createProductionTraceSink` does: - -- Gives the chat handler a `TraceStore` to write spans to during the - request. -- On `endRun`, composes a canonical `ProductionRunRecord` (`projectId`, - `scenarioId`, `pass`, `score`, `spanCount`, …), persists it via your - `ProductionRunRecordStore` (Drizzle / D1 / Postgres). -- Optionally ships the run as OTLP to Langfuse (`otlp.endpoint`). -- `sink.recordFeedback({ runId, label })` writes the user's thumbs-up / - thumbs-down into a `FeedbackTrajectory` — the corpus DPO/KTO trainers - consume. - -Errors are logged, never thrown — the chat handler is unaffected by a -failing OTLP collector. - -The example uses an in-memory `runRecordStore` so it runs offline. Swap -in a real DB adapter and the wiring is unchanged. - -```bash -pnpm tsx examples/production-trace-sink/production-trace-sink.ts -``` diff --git a/examples/production-trace-sink/production-trace-sink.ts b/examples/production-trace-sink/production-trace-sink.ts deleted file mode 100644 index a6091e4..0000000 --- a/examples/production-trace-sink/production-trace-sink.ts +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Production trace sink — the capture primitive every vertical agent's - * chat handler wires in once. - * - * Until this primitive existed, eval runs captured everything and - * production captured *nothing*. The sink closes that gap: it gives the - * chat handler a `TraceStore` to write to during the request, then on - * `endRun` composes a canonical `ProductionRunRecord`, persists it - * (Drizzle / D1 / Postgres — your DB), and ships the run as OTLP to - * Langfuse (optional). Errors are logged, never thrown. - * - * Run with: - * pnpm tsx examples/production-trace-sink/production-trace-sink.ts - */ - -import { TraceEmitter } from '@tangle-network/agent-eval' -import { - createProductionTraceSink, - type ProductionRunRecord, - type ProductionRunRecordStore, -} from '@tangle-network/agent-runtime/agent' - -// ── 1. Your DB adapter. Real wiring: drizzleRunRecordStore(db) / -// D1 / Postgres. Here: an in-memory array. ──────────────────────── -const persisted: ProductionRunRecord[] = [] -const runRecordStore: ProductionRunRecordStore = { - async append(record) { - persisted.push(record) - }, -} - -// ── 2. The sink. Omit `otlp` to skip Langfuse forwarding; omit -// `feedbackStore` to skip user-feedback persistence. The sink still -// composes + persists the RunRecord. ─────────────────────────────── -const sink = createProductionTraceSink({ projectId: 'demo-agent', runRecordStore }) - -// ── 3. One TraceEmitter per chat session — the sink's `onRunComplete` -// hook fires once at `endRun` time. The sink's TraceStore is shared -// across emitters so spans from all sessions accumulate in one -// store for the request lifetime. ─────────────────────────────────── -async function runChatSession(sessionId: string, pass: boolean, score: number) { - const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete] }) - await emitter.startRun({ - scenarioId: sessionId, - projectId: 'demo-agent', - layer: 'app-runtime', - }) - - // …in production the chat handler emits LLM/tool spans here as the - // turn runs. The sink doesn't care what spans were emitted; it - // counts them and composes the RunRecord on endRun. - - await emitter.endRun({ pass, score }) - // The sink's onRunComplete just fired — RunRecord is in the store. -} - -async function main() { - await runChatSession('session-a', true, 0.92) - await runChatSession('session-b', false, 0.41) - - console.log(`persisted ${persisted.length} RunRecord(s):`) - for (const r of persisted) { - console.log( - ` ${r.runId.slice(0, 8)} ${r.scenarioId} pass=${r.pass} score=${r.score} spans=${r.spanCount}`, - ) - } -} - -main().catch((err) => { - console.error(err) - process.exit(1) -}) diff --git a/package.json b/package.json index 3c15c60..28fbdde 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tangle-network/agent-runtime", - "version": "0.17.2", + "version": "0.18.0", "description": "Reusable runtime lifecycle for domain-specific agents.", "homepage": "https://github.com/tangle-network/agent-runtime#readme", "repository": { diff --git a/src/agent/index.ts b/src/agent/index.ts index 75ad0ff..96fd519 100644 --- a/src/agent/index.ts +++ b/src/agent/index.ts @@ -39,14 +39,5 @@ export type { export { createSurfaceKnowledgeAdapter } from './knowledge-adapter' export type { OutcomeMeasurement, OutcomeMeasurementOpts } from './outcome' export { measureOutcome } from './outcome' -export type { - ProductionRunRecord, - ProductionRunRecordStore, - ProductionTraceSink, - ProductionTraceSinkOpts, - RecordFeedbackInput, -} from './production-trace-sink' -export { createProductionTraceSink } from './production-trace-sink' - export type { AgentSurfaces, ResolvedSurface, SurfaceValidationIssue } from './surfaces' export { renderSurfaceIssues, resolveSubjectPath, validateSurfaces } from './surfaces' diff --git a/src/agent/production-trace-sink.ts b/src/agent/production-trace-sink.ts deleted file mode 100644 index 7c1e53a..0000000 --- a/src/agent/production-trace-sink.ts +++ /dev/null @@ -1,306 +0,0 @@ -/** - * `createProductionTraceSink` — the production-side capture primitive - * every vertical agent's chat handler wires in once. - * - * Closes the data-leak: until now, every production chat session emitted - * zero replayable trace data. Eval runs captured everything; production - * captured nothing. RL training corpora, research analyses, and the - * self-improvement loop all ran on synthetic personas. This primitive - * makes every real user conversation a piece of data the downstream - * channels (Prime Intellect, GEPA, research, canaries, analyst loop) - * can consume. - * - * Wiring (per agent, ~10 lines in the production chat handler): - * - * ```ts - * const sink = createProductionTraceSink({ - * projectId: 'tax-agent', - * otlp: { endpoint: env.LANGFUSE_OTEL_ENDPOINT, authHeader: env.LANGFUSE_OTEL_AUTH }, - * runRecordStore: drizzleRunRecordStore(db), - * feedbackStore: drizzleFeedbackStore(db), - * }) - * - * const emitter = new TraceEmitter(sink.traceStore, { - * onRunComplete: [sink.onRunComplete], - * }) - * await emitter.startRun({ - * scenarioId: sessionId, - * projectId: 'tax-agent', - * layer: 'app-runtime', - * }) - * // ... existing chat flow, with LLM/tool spans emitted ... - * await emitter.endRun({ pass, score }) - * // sink.onRunComplete fires automatically: - * // 1. composes RunRecord, persists to runRecordStore - * // 2. exports run as OTLP, POSTs to Langfuse - * // 3. logs failures (does NOT throw — never crashes the chat handler) - * ``` - * - * Separately, the agent's feedback endpoint calls `sink.recordFeedback` - * to write user thumbs-up/thumbs-down (or richer labels) into the - * FeedbackTrajectory store — the corpus DPO/KTO trainers consume. - * - * Cloudflare Worker semantics: the sink buffers spans in memory through - * the request lifetime (via agent-eval's `InMemoryTraceStore`). - * `onRunComplete` is awaited (typically inside `ctx.waitUntil`) so the - * worker stays alive long enough to flush. The OTLP POST is fire-and- - * forget — failures are logged but never surface to the chat user. - */ - -import { - exportRunAsOtlp, - type FeedbackLabel, - type FeedbackTrajectoryStore, - InMemoryTraceStore, - type RunCompleteHook, - type RunCompleteHookContext, - type TraceStore, -} from '@tangle-network/agent-eval' - -// ── public API ─────────────────────────────────────────────────────── - -export interface ProductionTraceSinkOpts { - /** - * Stable agent identifier — appears in OTLP `service.name`, every - * RunRecord row, every FeedbackTrajectory row. MUST match the - * agent's repo name to keep cross-repo telemetry joinable. - */ - projectId: string - - /** - * OTLP forwarding target. Typically Langfuse's HTTP receiver. Omit to - * disable OTLP export (RunRecord persistence still works). - * - * `authHeader` is the full header value (e.g. `Basic `); the - * sink does NOT base64-encode for you. - */ - otlp?: { - endpoint: string - authHeader?: string - /** Optional resource attributes merged into every span batch. */ - resourceAttributes?: Record - } - - /** - * Durable RunRecord persistence. Per-vertical agents implement this - * over their own DB (Drizzle / D1 / Postgres). Optional — when omitted, - * RunRecords stay in-memory and are lost when the worker isolate ends. - */ - runRecordStore?: ProductionRunRecordStore - - /** - * Durable feedback persistence. Used by `recordFeedback`; agents wire - * their thumbs-up/down + free-text feedback endpoints to call into the - * sink, which writes a `FeedbackLabel` into a `FeedbackTrajectory`. - * - * Optional — when omitted, `recordFeedback` is a no-op. - */ - feedbackStore?: FeedbackTrajectoryStore - - /** - * Pluggable fetch — defaults to globalThis.fetch. Tests inject a - * mocked fetch. - */ - fetch?: typeof fetch - - /** - * Pluggable structured logger — defaults to console.warn for failures. - * The sink NEVER throws on flush failure; it logs and returns. - */ - log?: (msg: string, fields?: Record) => void -} - -/** - * Durable per-agent RunRecord persistence. Each vertical implements over - * its own DB. The sink calls `append` once per `endRun`. - */ -export interface ProductionRunRecordStore { - append(record: ProductionRunRecord): Promise -} - -/** - * Minimal canonical row the sink composes on `endRun`. Per-agent DB - * adapters extend with their own fields; the sink only writes what - * the runtime canonically captures. - */ -export interface ProductionRunRecord { - runId: string - projectId: string - scenarioId: string - variantId?: string - startedAt: string - endedAt: string - status: 'completed' | 'failed' | 'aborted' - pass?: boolean - score?: number - failureClass?: string - notes?: string - /** Echoed back from `emitter.startRun({ tags })`. */ - tags?: Record - /** Span row count — useful for diagnostics. */ - spanCount: number -} - -export interface ProductionTraceSink { - /** - * The TraceStore the agent's `TraceEmitter` writes to. In-memory by - * design: spans accumulate through the chat session, flush at - * `onRunComplete`. The runtime never reads from this store directly — - * the sink reads from it during the flush step. - */ - traceStore: TraceStore - - /** - * Hook the agent passes into - * `new TraceEmitter(store, { onRunComplete: [sink.onRunComplete] })`. - * Fires once per chat session at `endRun` time. Composes the - * RunRecord, persists, and ships OTLP. Errors are logged, never thrown. - */ - onRunComplete: RunCompleteHook - - /** - * Append a user feedback label (thumbs-up/down, correction, severity) - * to the FeedbackTrajectory for a completed run. Creates a minimal - * trajectory anchored to the run if one doesn't exist; appends the - * label if it does. No-op when `feedbackStore` is undefined. - * - * Returns the trajectory id (existing or freshly created) for the - * agent's API to link back to the session, or `null` on no-op / - * error. - */ - recordFeedback(input: RecordFeedbackInput): Promise -} - -export interface RecordFeedbackInput { - /** Run id from the original `emitter.startRun`. */ - runId: string - /** The user-supplied feedback label. */ - label: FeedbackLabel - /** Optional scenario id (mirrors the run's). */ - scenarioId?: string - /** Optional pre-existing trajectory id if the agent tracks them separately. */ - trajectoryId?: string -} - -// ── factory ────────────────────────────────────────────────────────── - -export function createProductionTraceSink(opts: ProductionTraceSinkOpts): ProductionTraceSink { - const log = opts.log ?? defaultLog - const fetchImpl = opts.fetch ?? globalThis.fetch - const traceStore = new InMemoryTraceStore() - - const onRunComplete: RunCompleteHook = async (ctx: RunCompleteHookContext) => { - // 1. Compose canonical RunRecord and persist if a store is wired. - if (opts.runRecordStore) { - try { - const record = await composeRunRecord(traceStore, ctx, opts.projectId) - await opts.runRecordStore.append(record) - } catch (err) { - log('runRecordStore.append failed', { - runId: ctx.runId, - error: err instanceof Error ? err.message : String(err), - }) - } - } - - // 2. Export the run as OTLP and POST to the configured collector. - if (opts.otlp) { - try { - const resourceAttrs: Record = { - 'service.name': opts.projectId, - ...(opts.otlp.resourceAttributes ?? {}), - } - const otlpPayload = await exportRunAsOtlp(traceStore, ctx.runId, resourceAttrs) - const headers: Record = { 'content-type': 'application/json' } - if (opts.otlp.authHeader) headers.authorization = opts.otlp.authHeader - const res = await fetchImpl(opts.otlp.endpoint, { - method: 'POST', - headers, - body: JSON.stringify(otlpPayload), - }) - if (!res.ok) { - log('OTLP POST non-2xx', { - runId: ctx.runId, - status: res.status, - endpoint: opts.otlp.endpoint, - }) - } - } catch (err) { - log('OTLP POST threw', { - runId: ctx.runId, - error: err instanceof Error ? err.message : String(err), - endpoint: opts.otlp.endpoint, - }) - } - } - } - - const recordFeedback = async (input: RecordFeedbackInput): Promise => { - if (!opts.feedbackStore) return null - const trajectoryId = input.trajectoryId ?? `traj-${input.runId}` - try { - const existing = await opts.feedbackStore.get(trajectoryId) - if (existing) { - await opts.feedbackStore.appendLabel(trajectoryId, input.label) - return trajectoryId - } - // Create a minimal trajectory anchored to the run; the agent's eval-time - // pipeline can backfill richer task / attempts data when it replays. - await opts.feedbackStore.save({ - id: trajectoryId, - projectId: opts.projectId, - scenarioId: input.scenarioId ?? input.runId, - task: { intent: 'chat', context: { runId: input.runId } }, - attempts: [], - labels: [input.label], - createdAt: new Date().toISOString(), - }) - return trajectoryId - } catch (err) { - log('feedbackStore write failed', { - runId: input.runId, - error: err instanceof Error ? err.message : String(err), - }) - return null - } - } - - return { traceStore, onRunComplete, recordFeedback } -} - -// ── helpers ────────────────────────────────────────────────────────── - -async function composeRunRecord( - store: TraceStore, - ctx: RunCompleteHookContext, - projectId: string, -): Promise { - const run = await store.getRun(ctx.runId) - const spans = await store.spans({ runId: ctx.runId }) - // Run timestamps are epoch ms in the trace schema; consumers downstream - // expect ISO strings (DB columns, dashboards, dataset joins) so we convert - // at the sink boundary. - const now = Date.now() - const startedAtMs = run?.startedAt ?? now - const endedAtMs = run?.endedAt ?? now - return { - runId: ctx.runId, - projectId, - scenarioId: run?.scenarioId ?? ctx.runId, - variantId: run?.variantId, - startedAt: new Date(startedAtMs).toISOString(), - endedAt: new Date(endedAtMs).toISOString(), - status: ctx.status, - pass: ctx.outcome?.pass, - score: ctx.outcome?.score, - failureClass: ctx.outcome?.failureClass, - notes: ctx.outcome?.notes, - tags: run?.tags, - spanCount: spans.length, - } -} - -function defaultLog(msg: string, fields?: Record): void { - if (fields) console.warn(`[production-trace-sink] ${msg}`, fields) - else console.warn(`[production-trace-sink] ${msg}`) -} diff --git a/tests/production-trace-sink.test.ts b/tests/production-trace-sink.test.ts deleted file mode 100644 index 5465f2d..0000000 --- a/tests/production-trace-sink.test.ts +++ /dev/null @@ -1,322 +0,0 @@ -import { - type FeedbackLabel, - type FeedbackTrajectory, - type FeedbackTrajectoryStore, - TraceEmitter, -} from '@tangle-network/agent-eval' -import { describe, expect, it, vi } from 'vitest' -import { - createProductionTraceSink, - type ProductionRunRecord, - type ProductionRunRecordStore, -} from '../src/agent/production-trace-sink' - -function memoryRunRecordStore(): ProductionRunRecordStore & { - rows: ProductionRunRecord[] -} { - const rows: ProductionRunRecord[] = [] - return { - rows, - async append(record) { - rows.push(record) - }, - } -} - -function memoryFeedbackStore(): FeedbackTrajectoryStore & { - data: Map -} { - const data = new Map() - return { - data, - async save(t) { - data.set(t.id, t) - }, - async get(id) { - return data.get(id) ?? null - }, - async list() { - return [...data.values()] - }, - async appendAttempt(id, attempt) { - const t = data.get(id) - if (!t) throw new Error(`no trajectory ${id}`) - const next = { ...t, attempts: [...t.attempts, attempt] } - data.set(id, next) - return next - }, - async appendLabel(id, label) { - const t = data.get(id) - if (!t) throw new Error(`no trajectory ${id}`) - const next = { ...t, labels: [...t.labels, label] } - data.set(id, next) - return next - }, - } -} - -describe('createProductionTraceSink — RunRecord persistence', () => { - it('writes a RunRecord on endRun with status / pass / score / scenarioId', async () => { - const runRecordStore = memoryRunRecordStore() - const sink = createProductionTraceSink({ - projectId: 'tax-agent', - runRecordStore, - }) - const emitter = new TraceEmitter(sink.traceStore, { - onRunComplete: [sink.onRunComplete], - }) - await emitter.startRun({ scenarioId: 'sess-1', projectId: 'tax-agent', layer: 'app-runtime' }) - await emitter.endRun({ pass: true, score: 0.92 }) - - expect(runRecordStore.rows).toHaveLength(1) - const row = runRecordStore.rows[0]! - expect(row.projectId).toBe('tax-agent') - expect(row.scenarioId).toBe('sess-1') - expect(row.status).toBe('completed') - expect(row.pass).toBe(true) - expect(row.score).toBe(0.92) - }) - - it('captures abortRun as status=aborted with no pass/score', async () => { - const runRecordStore = memoryRunRecordStore() - const sink = createProductionTraceSink({ projectId: 'tax-agent', runRecordStore }) - const emitter = new TraceEmitter(sink.traceStore, { - onRunComplete: [sink.onRunComplete], - }) - await emitter.startRun({ - scenarioId: 'sess-abort', - projectId: 'tax-agent', - layer: 'app-runtime', - }) - await emitter.abortRun('user-cancelled') - - expect(runRecordStore.rows).toHaveLength(1) - expect(runRecordStore.rows[0]!.status).toBe('aborted') - }) - - it('captures failureClass + notes when endRun supplies them', async () => { - const runRecordStore = memoryRunRecordStore() - const sink = createProductionTraceSink({ projectId: 'tax-agent', runRecordStore }) - const emitter = new TraceEmitter(sink.traceStore, { - onRunComplete: [sink.onRunComplete], - }) - await emitter.startRun({ - scenarioId: 'sess-fail', - projectId: 'tax-agent', - layer: 'app-runtime', - }) - await emitter.endRun({ - pass: false, - score: 0.1, - failureClass: 'TOOL_ERROR', - notes: 'sandbox timed out', - }) - - const row = runRecordStore.rows[0]! - expect(row.status).toBe('failed') // emitter maps pass:false → status:'failed' - expect(row.pass).toBe(false) - expect(row.failureClass).toBe('TOOL_ERROR') - expect(row.notes).toBe('sandbox timed out') - }) - - it('survives runRecordStore throwing (logs, does not crash chat handler)', async () => { - const log = vi.fn() - const sink = createProductionTraceSink({ - projectId: 'tax-agent', - runRecordStore: { - async append() { - throw new Error('db down') - }, - }, - log, - }) - const emitter = new TraceEmitter(sink.traceStore, { - onRunComplete: [sink.onRunComplete], - hookErrors: 'throw', // assert the hook itself doesn't throw - }) - await emitter.startRun({ scenarioId: 'sess', projectId: 'tax-agent', layer: 'app-runtime' }) - await emitter.endRun({ pass: true, score: 1 }) - expect(log).toHaveBeenCalledWith( - 'runRecordStore.append failed', - expect.objectContaining({ error: 'db down' }), - ) - }) -}) - -describe('createProductionTraceSink — OTLP forwarding', () => { - it('POSTs the run to the configured collector with service.name resource attr', async () => { - const fetchMock = vi.fn( - async () => new Response('', { status: 200 }), - ) as unknown as typeof fetch - const sink = createProductionTraceSink({ - projectId: 'tax-agent', - otlp: { - endpoint: 'https://langfuse.example/api/otel', - authHeader: 'Basic Zm9vOmJhcg==', - }, - fetch: fetchMock, - }) - const emitter = new TraceEmitter(sink.traceStore, { - onRunComplete: [sink.onRunComplete], - }) - await emitter.startRun({ - scenarioId: 'sess-otlp', - projectId: 'tax-agent', - layer: 'app-runtime', - }) - await emitter.endRun({ pass: true, score: 0.8 }) - - expect(fetchMock).toHaveBeenCalledOnce() - const [url, init] = (fetchMock as unknown as ReturnType).mock.calls[0]! - expect(url).toBe('https://langfuse.example/api/otel') - expect((init as RequestInit).method).toBe('POST') - const headers = (init as RequestInit).headers as Record - expect(headers.authorization).toBe('Basic Zm9vOmJhcg==') - expect(headers['content-type']).toBe('application/json') - const body = JSON.parse(String((init as RequestInit).body)) - // OtlpExport has `resourceSpans[]` with resource attributes — we just confirm - // our service.name landed somewhere recognisable. - expect(JSON.stringify(body)).toContain('tax-agent') - }) - - it('logs (does not throw) on OTLP fetch failure', async () => { - const log = vi.fn() - const fetchMock = vi.fn(async () => { - throw new Error('network down') - }) as unknown as typeof fetch - const sink = createProductionTraceSink({ - projectId: 'tax-agent', - otlp: { endpoint: 'https://x', authHeader: 'auth' }, - fetch: fetchMock, - log, - }) - const emitter = new TraceEmitter(sink.traceStore, { - onRunComplete: [sink.onRunComplete], - hookErrors: 'throw', - }) - await emitter.startRun({ scenarioId: 's', projectId: 'tax-agent', layer: 'app-runtime' }) - await emitter.endRun({ pass: true }) - expect(log).toHaveBeenCalledWith( - 'OTLP POST threw', - expect.objectContaining({ error: 'network down' }), - ) - }) - - it('logs (does not throw) on OTLP non-2xx', async () => { - const log = vi.fn() - const fetchMock = vi.fn( - async () => new Response('', { status: 500 }), - ) as unknown as typeof fetch - const sink = createProductionTraceSink({ - projectId: 'tax-agent', - otlp: { endpoint: 'https://x', authHeader: 'auth' }, - fetch: fetchMock, - log, - }) - const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete] }) - await emitter.startRun({ scenarioId: 's', projectId: 'tax-agent', layer: 'app-runtime' }) - await emitter.endRun({ pass: false, score: 0 }) - expect(log).toHaveBeenCalledWith('OTLP POST non-2xx', expect.objectContaining({ status: 500 })) - }) - - it('omits the auth header when authHeader is undefined', async () => { - const fetchMock = vi.fn( - async () => new Response('', { status: 200 }), - ) as unknown as typeof fetch - const sink = createProductionTraceSink({ - projectId: 'tax-agent', - otlp: { endpoint: 'https://x' }, - fetch: fetchMock, - }) - const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete] }) - await emitter.startRun({ scenarioId: 's', projectId: 'tax-agent', layer: 'app-runtime' }) - await emitter.endRun({ pass: true }) - const [, init] = (fetchMock as unknown as ReturnType).mock.calls[0]! - const headers = (init as RequestInit).headers as Record - expect(headers.authorization).toBeUndefined() - }) - - it('does NOT attempt OTLP when opts.otlp is undefined', async () => { - const fetchMock = vi.fn() as unknown as typeof fetch - const sink = createProductionTraceSink({ projectId: 'tax-agent', fetch: fetchMock }) - const emitter = new TraceEmitter(sink.traceStore, { onRunComplete: [sink.onRunComplete] }) - await emitter.startRun({ scenarioId: 's', projectId: 'tax-agent', layer: 'app-runtime' }) - await emitter.endRun({ pass: true }) - expect(fetchMock).not.toHaveBeenCalled() - }) -}) - -describe('createProductionTraceSink — recordFeedback', () => { - const label: FeedbackLabel = { - source: { kind: 'user' }, - kind: 'thumbs', - value: 1, - } - - it('returns null when no feedbackStore is wired', async () => { - const sink = createProductionTraceSink({ projectId: 'tax-agent' }) - const id = await sink.recordFeedback({ runId: 'r1', label }) - expect(id).toBeNull() - }) - - it('creates a new trajectory on first feedback for a run', async () => { - const feedbackStore = memoryFeedbackStore() - const sink = createProductionTraceSink({ projectId: 'tax-agent', feedbackStore }) - const id = await sink.recordFeedback({ runId: 'r1', label }) - expect(id).toBe('traj-r1') - expect(feedbackStore.data.get('traj-r1')?.labels).toEqual([label]) - expect(feedbackStore.data.get('traj-r1')?.projectId).toBe('tax-agent') - }) - - it('appends to an existing trajectory on subsequent feedback', async () => { - const feedbackStore = memoryFeedbackStore() - const sink = createProductionTraceSink({ projectId: 'tax-agent', feedbackStore }) - await sink.recordFeedback({ runId: 'r1', label }) - const secondLabel: FeedbackLabel = { source: { kind: 'user' }, kind: 'comment', value: 'great' } - await sink.recordFeedback({ runId: 'r1', label: secondLabel }) - expect(feedbackStore.data.get('traj-r1')?.labels).toEqual([label, secondLabel]) - }) - - it('returns null + logs on feedbackStore failure (does not throw)', async () => { - const log = vi.fn() - const sink = createProductionTraceSink({ - projectId: 'tax-agent', - feedbackStore: { - async save() { - throw new Error('write fail') - }, - async get() { - return null - }, - async list() { - return [] - }, - async appendAttempt() { - throw new Error('na') - }, - async appendLabel() { - throw new Error('na') - }, - }, - log, - }) - const result = await sink.recordFeedback({ runId: 'r1', label }) - expect(result).toBeNull() - expect(log).toHaveBeenCalledWith( - 'feedbackStore write failed', - expect.objectContaining({ error: 'write fail' }), - ) - }) - - it('honours an explicit trajectoryId when the agent provides one', async () => { - const feedbackStore = memoryFeedbackStore() - const sink = createProductionTraceSink({ projectId: 'tax-agent', feedbackStore }) - const id = await sink.recordFeedback({ - runId: 'r1', - trajectoryId: 'agent-supplied-id', - label, - }) - expect(id).toBe('agent-supplied-id') - expect(feedbackStore.data.get('agent-supplied-id')).toBeDefined() - }) -})