Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/backends.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,42 @@ function mapCommonBackendEvent(
timestamp: nowIso(),
}
}
if (type === 'artifact') {
const artifactId =
stringValue(data.artifactId) ?? stringValue(data.id) ?? stringValue(record.artifactId)
if (!artifactId) return undefined
return {
type: 'artifact',
task: context.task,
session: context.session,
artifactId,
name: stringValue(data.name) ?? stringValue(record.name),
mimeType: stringValue(data.mimeType) ?? stringValue(record.mimeType),
uri: stringValue(data.uri) ?? stringValue(record.uri),
content: stringValue(data.content) ?? stringValue(data.body) ?? stringValue(record.content),
metadata:
data.metadata && typeof data.metadata === 'object'
? (data.metadata as Record<string, unknown>)
: undefined,
timestamp: nowIso(),
}
}
if (type === 'proposal_created' || type === 'proposal' || type === 'filing') {
const proposalId =
stringValue(data.proposalId) ?? stringValue(data.id) ?? stringValue(record.proposalId)
if (!proposalId) return undefined
const status = stringValue(data.status) ?? stringValue(record.status)
return {
type: 'proposal_created',
task: context.task,
session: context.session,
proposalId,
title: stringValue(data.title) ?? stringValue(record.title) ?? proposalId,
status:
status === 'pending' || status === 'approved' || status === 'rejected' ? status : undefined,
timestamp: nowIso(),
}
}
if (type === 'result' || type === 'final') {
const text = stringValue(data.finalText) ?? stringValue(data.text) ?? stringValue(record.text)
return text
Expand Down
5 changes: 3 additions & 2 deletions src/durable/tests/chat-engine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ async function drain(body: ReadableStream<Uint8Array>): Promise<ChatStreamEvent[
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let nl: number
while ((nl = buffer.indexOf('\n')) !== -1) {
for (;;) {
const nl = buffer.indexOf('\n')
if (nl === -1) break
const line = buffer.slice(0, nl).trim()
buffer = buffer.slice(nl + 1)
if (line) events.push(JSON.parse(line) as ChatStreamEvent)
Expand Down
12 changes: 12 additions & 0 deletions src/sanitize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,21 @@ export function sanitizeRuntimeStreamEvent(
name: event.name,
mimeType: event.mimeType,
uri: options.includeEvidenceIds ? event.uri : undefined,
content: options.includeControlPayloads ? event.content : undefined,
metadata: options.includeMetadata ? event.metadata : undefined,
}
}
if (event.type === 'proposal_created') {
return {
type: event.type,
...withTask,
...withSession,
timestamp: event.timestamp,
proposalId: event.proposalId,
title: options.includeControlPayloads ? event.title : undefined,
status: event.status,
}
}
if (event.type === 'final') {
return {
type: event.type,
Expand Down
10 changes: 10 additions & 0 deletions src/trace-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ function projectToTraceEvent(event: RuntimeStreamEvent): TraceProjection | undef
mimeType: event.mimeType,
},
}
case 'proposal_created':
return {
kind: 'state_mutation',
payload: {
phase: 'proposal_created',
proposalId: event.proposalId,
title: event.title,
status: event.status,
},
}
case 'task_end':
return {
kind: event.status === 'failed' || event.status === 'aborted' ? 'error' : 'log',
Expand Down
10 changes: 10 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,19 @@ export type RuntimeStreamEvent =
name?: string
mimeType?: string
uri?: string
content?: string
metadata?: Record<string, unknown>
timestamp?: string
}
| {
type: 'proposal_created'
task?: AgentTaskSpec
session?: RuntimeSession
proposalId: string
title: string
status?: 'pending' | 'approved' | 'rejected'
timestamp?: string
}
| {
type: 'backend_error'
task: AgentTaskSpec
Expand Down
68 changes: 68 additions & 0 deletions tests/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,43 @@ describe('runAgentTask', () => {
expect(events.at(-1)).toMatchObject({ type: 'final', status: 'completed', text: 'hello' })
})

it('maps wire artifact and proposal events from a streamed backend', async () => {
const backend = createOpenAICompatibleBackend({
apiKey: 'sk-test',
baseUrl: 'https://router.example/v1',
model: 'model-a',
fetchImpl: async () =>
new Response(
'data: {"type":"artifact","artifactId":"art-9","name":"brief.md","mimeType":"text/markdown","content":"# Brief body"}\n\n' +
'data: {"type":"proposal_created","proposalId":"prop-9","title":"File amendment","status":"pending"}\n\n' +
'data: [DONE]\n\n',
{ status: 200 },
),
})
const events = await collect(
runAgentTaskStream({
task: { id: 'wire-task', intent: 'produce a brief', requiredKnowledge: [readyReq] },
backend,
input: { message: 'go' },
}),
)
const artifact = events.find((event) => event.type === 'artifact')
expect(artifact).toMatchObject({
type: 'artifact',
artifactId: 'art-9',
name: 'brief.md',
mimeType: 'text/markdown',
content: '# Brief body',
})
const proposal = events.find((event) => event.type === 'proposal_created')
expect(proposal).toMatchObject({
type: 'proposal_created',
proposalId: 'prop-9',
title: 'File amendment',
status: 'pending',
})
})

it('retries a thrown fetch error and succeeds on a later attempt', async () => {
let calls = 0
const backend = createOpenAICompatibleBackend({
Expand Down Expand Up @@ -679,9 +716,19 @@ describe('runAgentTask', () => {
name: 'report.json',
mimeType: 'application/json',
uri: 's3://internal/secret-bucket/key',
content: 'confidential-deliverable-body',
metadata: { customerId: 'cust-99' },
timestamp: '2026-05-10T00:00:00.000Z',
}
yield {
type: 'proposal_created',
task: ctx.task,
session: ctx.session,
proposalId: 'p1',
title: 'confidential-proposal-title',
status: 'pending',
timestamp: '2026-05-10T00:00:00.000Z',
}
yield {
type: 'text_delta',
task: ctx.task,
Expand Down Expand Up @@ -711,10 +758,14 @@ describe('runAgentTask', () => {
expect(serialized).not.toContain('secret-bucket')
expect(serialized).not.toContain('cust-99')
expect(serialized).not.toContain('redact@example.com')
// Produced artifact body and proposal title are payloads — redacted by default.
expect(serialized).not.toContain('confidential-deliverable-body')
expect(serialized).not.toContain('confidential-proposal-title')
// The collector still records each event by `type` so consumers can act on the stream.
expect(collector.summary().eventCountsByType.tool_call).toBe(1)
expect(collector.summary().eventCountsByType.tool_result).toBe(1)
expect(collector.summary().eventCountsByType.artifact).toBe(1)
expect(collector.summary().eventCountsByType.proposal_created).toBe(1)
expect(collector.summary().finalStatus).toBe('completed')
expect(collector.summary().finalText).toBe('hi from agent')
})
Expand Down Expand Up @@ -744,9 +795,19 @@ describe('runAgentTask', () => {
artifactId: 'a1',
name: 'r.json',
uri: 's3://bucket/key',
content: 'the produced deliverable body',
metadata: { customerId: 'cust-1' },
timestamp: '2026-05-10T00:00:00.000Z',
}
yield {
type: 'proposal_created',
task: ctx.task,
session: ctx.session,
proposalId: 'prop-1',
title: 'Amend filing X for cust-1',
status: 'pending',
timestamp: '2026-05-10T00:00:00.000Z',
}
},
})
const task: AgentTaskSpec = {
Expand All @@ -763,6 +824,13 @@ describe('runAgentTask', () => {
expect(serialized).toContain('pnpm test')
expect(serialized).toContain('s3://bucket/key')
expect(serialized).toContain('cust-1')
// includeControlPayloads exposes the produced artifact body and proposal title.
expect(serialized).toContain('the produced deliverable body')
expect(serialized).toContain('Amend filing X for cust-1')
expect(collector.summary().eventCountsByType.artifact).toBe(1)
expect(collector.summary().eventCountsByType.proposal_created).toBe(1)
const proposal = collector.events.find((e) => e.type === 'proposal_created')
expect(proposal).toMatchObject({ proposalId: 'prop-1', status: 'pending' })
})

it('createRuntimeStreamEventCollector summary tracks session ids and final reason', async () => {
Expand Down
20 changes: 20 additions & 0 deletions tests/trace-bridge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,26 @@ describe('createTraceBridge', () => {
expect(traces.map((trace) => trace.eventId)).toEqual(['evt-1', 'evt-2', 'evt-3'])
})

it('maps proposal_created into a state_mutation trace event carrying id, title, status', () => {
const bridge = createTraceBridge({ runId: 'run-prop' })
const trace = bridge.toTraceEvent({
type: 'proposal_created',
task,
session,
proposalId: 'prop-1',
title: 'Amend filing X',
status: 'pending',
timestamp: '2026-05-10T00:00:00.000Z',
})
expect(trace?.kind).toBe('state_mutation')
expect(trace?.payload).toMatchObject({
phase: 'proposal_created',
proposalId: 'prop-1',
title: 'Amend filing X',
status: 'pending',
})
})

it('toAgentEvalTrace() one-shot matches createTraceBridge.toTraceEvent()', () => {
const event: RuntimeStreamEvent = {
type: 'task_start',
Expand Down
Loading