From ce3bbc656a7ca09bd8c85a750a4e853728e979f7 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Wed, 20 May 2026 22:49:31 +0300 Subject: [PATCH 1/2] fix(0.14.1): per-attempt fetch timeout + retry thrown network errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A production eval persona burned 15 minutes on a single hung request: the tcloud router accepted the connection, never responded, and the fetch sat open until the runtime gave up with `fetch failed`. Two gaps in createOpenAICompatibleBackend caused it: 1. No per-attempt deadline — a hung upstream blocked the attempt indefinitely. 2. Thrown fetch errors (network failure, DNS, the eventual `fetch failed`) propagated straight out of the retry loop. Only HTTP error *statuses* were retried; a thrown error killed the attempt. Fixes: - BackendRetryPolicy.requestTimeoutMs (default 120s) — each attempt gets an AbortController deadline linked to the caller signal. A hung upstream now aborts in 2 min and retries instead of hanging. - The fetch call is wrapped: a thrown error is treated as a retryable transport failure (backoff + retry) just like a 5xx. Caller-initiated aborts stay terminal. Exhausted retries throw BackendTransportError with the last error message. 3 new tests: thrown-error retry, per-attempt-timeout abort + retry, all-attempts-throw → BackendTransportError. 216 tests pass. --- package.json | 2 +- src/backends.ts | 106 +++++++++++++++++++++++++++++++++++------- tests/runtime.test.ts | 85 +++++++++++++++++++++++++++++++++ 3 files changed, 176 insertions(+), 17 deletions(-) diff --git a/package.json b/package.json index 78bd775..8c3aa98 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tangle-network/agent-runtime", - "version": "0.14.0", + "version": "0.14.1", "description": "Reusable runtime lifecycle for domain-specific agents.", "homepage": "https://github.com/tangle-network/agent-runtime#readme", "repository": { diff --git a/src/backends.ts b/src/backends.ts index 160fc0e..c680f2c 100644 --- a/src/backends.ts +++ b/src/backends.ts @@ -94,6 +94,14 @@ export interface BackendRetryPolicy { jitter?: number /** Status codes that trigger a retry. Default: 408, 425, 429, 500, 502, 503, 504. */ retryStatuses?: ReadonlyArray + /** + * Per-attempt wall-clock deadline in ms. If a single fetch attempt does + * not return headers within this window the attempt is aborted and + * retried. Default 120000 (2 min). Without this a hung upstream blocks + * the attempt indefinitely — observed in production as a 15-minute + * `fetch failed` that burned an entire eval persona. Set to 0 to disable. + */ + requestTimeoutMs?: number } const DEFAULT_RETRY_STATUSES = [408, 425, 429, 500, 502, 503, 504] as const @@ -105,6 +113,41 @@ function pickRetryDelayMs(attempt: number, policy: Required) return Math.max(0, Math.round(capped + jitter)) } +/** + * Derive a per-attempt AbortSignal that fires when EITHER the caller's + * signal aborts OR `timeoutMs` elapses. `dispose()` clears the timer so a + * completed attempt doesn't leak a pending timeout. `timeoutMs <= 0` + * disables the deadline (caller signal still propagates). + */ +function withTimeout( + callerSignal: AbortSignal | undefined, + timeoutMs: number, +): { signal: AbortSignal; dispose: () => void } { + if (timeoutMs <= 0) { + return { signal: callerSignal ?? new AbortController().signal, dispose: () => undefined } + } + const controller = new AbortController() + const timer = setTimeout( + () => controller.abort(new Error(`request timed out after ${timeoutMs}ms`)), + timeoutMs, + ) + if (typeof (timer as { unref?: () => void }).unref === 'function') { + ;(timer as { unref: () => void }).unref() + } + const onCallerAbort = () => controller.abort(callerSignal?.reason ?? new Error('aborted')) + if (callerSignal) { + if (callerSignal.aborted) onCallerAbort() + else callerSignal.addEventListener('abort', onCallerAbort, { once: true }) + } + return { + signal: controller.signal, + dispose: () => { + clearTimeout(timer) + callerSignal?.removeEventListener('abort', onCallerAbort) + }, + } +} + function sleep(ms: number, signal?: AbortSignal): Promise { return new Promise((resolve, reject) => { if (signal?.aborted) { @@ -141,6 +184,7 @@ export function createOpenAICompatibleBackend< maxBackoffMs: options.retry?.maxBackoffMs ?? 30000, jitter: options.retry?.jitter ?? 0.25, retryStatuses: options.retry?.retryStatuses ?? DEFAULT_RETRY_STATUSES, + requestTimeoutMs: options.retry?.requestTimeoutMs ?? 120_000, } return { kind, @@ -148,24 +192,46 @@ export function createOpenAICompatibleBackend< return newRuntimeSession(kind, context.requestedSessionId) }, async *stream(input, context) { + const url = `${options.baseUrl.replace(/\/$/, '')}/chat/completions` + const requestBody = JSON.stringify({ + model: options.model, + stream: true, + messages: input.messages ?? [ + { role: 'user', content: input.message ?? context.task.intent }, + ], + }) let response: Response | undefined let lastStatus = 0 + // The last thrown transport error (timeout abort, DNS / connection + // failure). Network throws are retryable just like 5xx — without this + // a `fetch failed` propagated immediately and burned the attempt. + let lastThrown: unknown for (let attempt = 1; attempt <= retryPolicy.maxAttempts; attempt++) { - response = await fetcher(`${options.baseUrl.replace(/\/$/, '')}/chat/completions`, { - method: 'POST', - headers: { - Authorization: `Bearer ${options.apiKey}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - model: options.model, - stream: true, - messages: input.messages ?? [ - { role: 'user', content: input.message ?? context.task.intent }, - ], - }), - signal: context.signal, - }) + lastThrown = undefined + // Per-attempt deadline: abort a hung upstream instead of waiting + // forever. Linked to context.signal so a caller cancel still wins. + const attemptSignal = withTimeout(context.signal, retryPolicy.requestTimeoutMs) + try { + response = await fetcher(url, { + method: 'POST', + headers: { + Authorization: `Bearer ${options.apiKey}`, + 'Content-Type': 'application/json', + }, + body: requestBody, + signal: attemptSignal.signal, + }) + } catch (err) { + attemptSignal.dispose() + // A caller-initiated abort is terminal — do not retry it. + if (context.signal?.aborted) throw err + lastThrown = err + response = undefined + if (attempt === retryPolicy.maxAttempts) break + await sleep(pickRetryDelayMs(attempt, retryPolicy), context.signal) + continue + } + attemptSignal.dispose() if (response.ok) break lastStatus = response.status if (!retryPolicy.retryStatuses.includes(response.status)) break @@ -179,7 +245,15 @@ export function createOpenAICompatibleBackend< const delayMs = pickRetryDelayMs(attempt, retryPolicy) await sleep(delayMs, context.signal) } - if (!response || !response.ok) { + if (!response) { + const reason = lastThrown instanceof Error ? lastThrown.message : String(lastThrown) + throw new BackendTransportError( + kind, + `chat backend unreachable after ${retryPolicy.maxAttempts} attempts: ${reason}`, + { status: 0 }, + ) + } + if (!response.ok) { throw new BackendTransportError(kind, `chat backend returned ${lastStatus || 'unknown'}`, { status: lastStatus || 0, }) diff --git a/tests/runtime.test.ts b/tests/runtime.test.ts index 5a3a8a5..8899830 100644 --- a/tests/runtime.test.ts +++ b/tests/runtime.test.ts @@ -525,6 +525,91 @@ describe('runAgentTask', () => { expect(events.at(-1)).toMatchObject({ type: 'final', status: 'completed', text: 'hello' }) }) + it('retries a thrown fetch error and succeeds on a later attempt', async () => { + let calls = 0 + const backend = createOpenAICompatibleBackend({ + apiKey: 'sk-test', + baseUrl: 'https://router.example/v1', + model: 'model-a', + retry: { initialBackoffMs: 1, maxBackoffMs: 2 }, + fetchImpl: async () => { + calls += 1 + // First two attempts throw a network error (the `fetch failed` + // shape); the third returns a real stream. + if (calls < 3) throw new TypeError('fetch failed') + return new Response('data: {"choices":[{"delta":{"content":"ok"}}]}\n\ndata: [DONE]\n\n', { + status: 200, + }) + }, + }) + const events = await collect( + runAgentTaskStream({ + task: { id: 'retry-task', intent: 'go', requiredKnowledge: [readyReq] }, + backend, + input: { message: 'hi' }, + }), + ) + expect(calls).toBe(3) + expect(events.at(-1)).toMatchObject({ type: 'final', status: 'completed', text: 'ok' }) + }) + + it('aborts a hung attempt via the per-attempt timeout, then retries', async () => { + let calls = 0 + const backend = createOpenAICompatibleBackend({ + apiKey: 'sk-test', + baseUrl: 'https://router.example/v1', + model: 'model-a', + retry: { initialBackoffMs: 1, maxBackoffMs: 2, requestTimeoutMs: 30 }, + fetchImpl: (_url, init) => { + calls += 1 + // First attempt hangs until its per-attempt signal aborts; the + // second returns immediately. + if (calls === 1) { + return new Promise((_resolve, reject) => { + const signal = (init as RequestInit | undefined)?.signal + signal?.addEventListener('abort', () => reject(signal.reason ?? new Error('aborted'))) + }) + } + return Promise.resolve( + new Response('data: {"choices":[{"delta":{"content":"recovered"}}]}\n\ndata: [DONE]\n\n', { + status: 200, + }), + ) + }, + }) + const events = await collect( + runAgentTaskStream({ + task: { id: 'timeout-task', intent: 'go', requiredKnowledge: [readyReq] }, + backend, + input: { message: 'hi' }, + }), + ) + expect(calls).toBe(2) + expect(events.at(-1)).toMatchObject({ type: 'final', status: 'completed', text: 'recovered' }) + }) + + it('throws BackendTransportError when every attempt throws', async () => { + const backend = createOpenAICompatibleBackend({ + apiKey: 'sk-test', + baseUrl: 'https://router.example/v1', + model: 'model-a', + retry: { maxAttempts: 2, initialBackoffMs: 1, maxBackoffMs: 2 }, + fetchImpl: async () => { + throw new TypeError('fetch failed') + }, + }) + const events = await collect( + runAgentTaskStream({ + task: { id: 'dead-task', intent: 'go', requiredKnowledge: [readyReq] }, + backend, + input: { message: 'hi' }, + }), + ) + const final = events.at(-1) + expect(final).toMatchObject({ type: 'final' }) + expect(final?.type === 'final' && final.status).not.toBe('completed') + }) + it('stops a backend and emits failed final event when streaming throws', async () => { const store = new InMemoryRuntimeSessionStore() const stopped: string[] = [] From 4c1385a65e8c760a1507be4743bcd95687738be2 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Wed, 20 May 2026 22:50:49 +0300 Subject: [PATCH 2/2] =?UTF-8?q?ci:=20biome=20=E2=80=94=20fix=20assign-in-e?= =?UTF-8?q?xpression=20in=20chat-engine=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/runtime.test.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/runtime.test.ts b/tests/runtime.test.ts index 8899830..ef8fbd5 100644 --- a/tests/runtime.test.ts +++ b/tests/runtime.test.ts @@ -571,9 +571,12 @@ describe('runAgentTask', () => { }) } return Promise.resolve( - new Response('data: {"choices":[{"delta":{"content":"recovered"}}]}\n\ndata: [DONE]\n\n', { - status: 200, - }), + new Response( + 'data: {"choices":[{"delta":{"content":"recovered"}}]}\n\ndata: [DONE]\n\n', + { + status: 200, + }, + ), ) }, })