diff --git a/src/domain/models/models-dev.ts b/src/domain/models/models-dev.ts index 5be26f4..6833011 100644 --- a/src/domain/models/models-dev.ts +++ b/src/domain/models/models-dev.ts @@ -37,6 +37,7 @@ const CODEX_ALLOWED_OPENAI_MODEL_IDS = new Set([ "gpt-5.2", "gpt-5.2-codex", "gpt-5.3-codex", + "gpt-5.3-codex-spark", "gpt-5.4", "gpt-5.4-mini", "gpt-5.5", diff --git a/src/http/routes/proxy.ts b/src/http/routes/proxy.ts index c5d5ffe..1483aa1 100644 --- a/src/http/routes/proxy.ts +++ b/src/http/routes/proxy.ts @@ -20,7 +20,7 @@ import { isTokenUsagePopulated, type TokenUsage, } from "../../usage/token-usage"; -import { isObjectRecord } from "../../utils/object"; +import { isObjectRecord, readBooleanField } from "../../utils/object"; import { parseModelForProxyRoute, proxyRouteTable, @@ -35,6 +35,31 @@ const proxyErrorResponse = (message: string, type = "proxy_error") => ({ }, }); +const CODEX_SSE_HEADER_TIMEOUT_MS = 10_000; + +const createCodexSseHeaderTimeout = (): { + signal: AbortSignal; + clear(): void; + error(): Error | undefined; +} => { + const controller = new AbortController(); + let error: Error | undefined; + const timeout = setTimeout(() => { + error = new Error( + `Codex SSE response headers timed out after ${CODEX_SSE_HEADER_TIMEOUT_MS}ms` + ); + controller.abort(error); + }, CODEX_SSE_HEADER_TIMEOUT_MS); + + return { + signal: controller.signal, + clear(): void { + clearTimeout(timeout); + }, + error: () => error, + }; +}; + const removeProxyAuthHeaders = (headers: Headers): void => { headers.delete("authorization"); headers.delete("x-api-key"); @@ -222,6 +247,7 @@ const proxyRequest = async ( let upstreamUrl = ""; let responseTransformer: ((response: Response) => Promise) | null = null; + let useCodexSseHeaderTimeout = false; switch (route.provider) { case "codex": { @@ -246,6 +272,8 @@ const proxyRequest = async ( upstreamUrl = codexProxy.upstreamUrl; requestBody = codexProxy.bodyText; responseTransformer = codexProxy.transformResponse; + useCodexSseHeaderTimeout = + readBooleanField(codexProxy.bodyJson, "stream") === true; const webSocketResponse = await tryProxyCodexWebSocket({ headers, @@ -312,6 +340,9 @@ const proxyRequest = async ( let upstreamResponse: Response; try { + const headerTimeout = useCodexSseHeaderTimeout + ? createCodexSseHeaderTimeout() + : null; const upstreamRequestInit: BunFetchRequestInit = { method: context.req.method, headers, @@ -319,7 +350,22 @@ const proxyRequest = async ( // Provider streams can pause for minutes while a model is thinking. timeout: false, }; - upstreamResponse = await fetch(upstreamUrl, upstreamRequestInit); + if (headerTimeout) { + upstreamRequestInit.signal = AbortSignal.any([ + context.req.raw.signal, + headerTimeout.signal, + ]); + } + try { + upstreamResponse = await fetch(upstreamUrl, upstreamRequestInit); + } catch (error) { + const timeoutError = headerTimeout?.error(); + throw timeoutError && !context.req.raw.signal.aborted + ? timeoutError + : error; + } finally { + headerTimeout?.clear(); + } } catch (error) { usageRecorder.recordImmediate(500); throw error; diff --git a/src/providers/codex.ts b/src/providers/codex.ts index 1901842..5687a61 100644 --- a/src/providers/codex.ts +++ b/src/providers/codex.ts @@ -35,6 +35,7 @@ const CODEX_DEVICE_USER_CODE_URL = `${CODEX_ISSUER}/api/accounts/deviceauth/user const CODEX_DEVICE_TOKEN_URL = `${CODEX_ISSUER}/api/accounts/deviceauth/token`; const CODEX_OAUTH_STATE_TTL_MS = 15 * 60 * 1000; const CODEX_POLLING_SAFETY_MARGIN_MS = 3000; +const CODEX_SLOW_DOWN_INCREMENT_MS = 5000; const codexStartOptionsSchema = z.object({ mode: z.enum(["browser", "headless"]).optional(), @@ -84,6 +85,10 @@ type CodexDeviceTokenResponse = { code_verifier?: string; }; +type CodexDeviceTokenErrorResponse = { + error?: string | { code?: string }; +}; + const resolveCodexOAuthMode = ( options: Record | undefined ): "browser" | "headless" => { @@ -172,6 +177,42 @@ const parseTokenResponse = async ( return body; }; +const readCodexDeviceTokenErrorCode = async ( + response: Response +): Promise => { + const text = await response + .clone() + .text() + .catch(() => ""); + if (!text) { + return null; + } + + try { + const body = JSON.parse(text) as CodexDeviceTokenErrorResponse; + const error = body.error; + if (typeof error === "string") { + return error; + } + return typeof error?.code === "string" ? error.code : null; + } catch { + return null; + } +}; + +const waitForCodexDevicePoll = async (input: { + intervalMs: number; + expiresAt: number; +}): Promise => { + const remainingMs = input.expiresAt - Date.now(); + if (remainingMs <= 0) { + return; + } + await sleep( + Math.min(input.intervalMs + CODEX_POLLING_SAFETY_MARGIN_MS, remainingMs) + ); +}; + const exchangeAuthorizationCodeForTokens = async (input: { code: string; redirectUri: string; @@ -238,6 +279,7 @@ const pollDeviceAuthorizationCode = async (input: { intervalMs: number; expiresAt: number; }): Promise<{ authorizationCode: string; codeVerifier: string }> => { + let intervalMs = input.intervalMs; while (Date.now() < input.expiresAt) { const response = await fetch(CODEX_DEVICE_TOKEN_URL, { method: "POST", @@ -262,8 +304,17 @@ const pollDeviceAuthorizationCode = async (input: { }; } - if (response.status === 403 || response.status === 404) { - await sleep(input.intervalMs + CODEX_POLLING_SAFETY_MARGIN_MS); + const errorCode = await readCodexDeviceTokenErrorCode(response); + if (errorCode === "slow_down") { + intervalMs += CODEX_SLOW_DOWN_INCREMENT_MS; + } + if ( + errorCode === "slow_down" || + errorCode === "deviceauth_authorization_pending" || + response.status === 403 || + response.status === 404 + ) { + await waitForCodexDevicePoll({ intervalMs, expiresAt: input.expiresAt }); continue; } diff --git a/src/providers/constants.ts b/src/providers/constants.ts index e135907..f3bbddf 100644 --- a/src/providers/constants.ts +++ b/src/providers/constants.ts @@ -6,6 +6,7 @@ export const CODEX_RESPONSE_ENDPOINT = export const CODEX_WEBSOCKET_BETA_HEADER = "responses_websockets=2026-02-06"; // https://github.com/anomalyco/opencode/blob/d848c9b6a32f408e8b9bf6448b83af05629454d0/packages/opencode/src/plugin/codex.ts#L619 export const CODEX_ORIGINATOR = "opencode"; +export const CODEX_USER_AGENT = "opencode"; // https://github.com/anomalyco/opencode/blob/d848c9b6a32f408e8b9bf6448b83af05629454d0/packages/opencode/src/plugin/copilot.ts#L121-L131 // https://github.com/badlogic/pi-mono/blob/5c0ec26c28c918c5301f218e8c13fcc540d8e3a4/packages/ai/src/providers/github-copilot-headers.ts#L27-L34 diff --git a/src/providers/proxies/codex-proxy.ts b/src/providers/proxies/codex-proxy.ts index 6550c13..214973f 100644 --- a/src/providers/proxies/codex-proxy.ts +++ b/src/providers/proxies/codex-proxy.ts @@ -16,6 +16,7 @@ import { CODEX_ACCOUNT_ID_HEADER, CODEX_ORIGINATOR, CODEX_RESPONSE_ENDPOINT, + CODEX_USER_AGENT, } from "../constants"; const trimString = (value: unknown): string => @@ -78,7 +79,8 @@ export const transformCodexBodyJson = ( // https://github.com/anomalyco/opencode/blob/d848c9b6a32f408e8b9bf6448b83af05629454d0/packages/opencode/src/session/llm.ts#L65-L112 // - Codex-native clients include `instructions` explicitly in the request body. // https://github.com/badlogic/pi-mono/blob/5c0ec26c28c918c5301f218e8c13fcc540d8e3a4/packages/ai/src/providers/openai-codex-responses.ts#L286-L291 - // - Codex-native clients also omit `max_output_tokens` / `max_completion_tokens`. + // - Codex-native clients also omit `max_output_tokens` / `max_completion_tokens` + // and force `store: false`. // https://github.com/badlogic/pi-mono/blob/5c0ec26c28c918c5301f218e8c13fcc540d8e3a4/packages/ai/src/providers/openai-codex-responses.ts#L286-L315 const { max_output_tokens: _maxOutputTokens, @@ -96,6 +98,7 @@ export const transformCodexBodyJson = ( return { ...nextBody, instructions, + store: false, ...(sessionId ? { prompt_cache_key: sessionId } : {}), }; }; @@ -139,6 +142,11 @@ export const prepareCodexProxyRequest = ( const bodyJson = transformCodexBodyJson(input.bodyJson, input.sessionId); input.headers.set("authorization", `Bearer ${input.accessToken}`); + input.headers.set("content-type", "application/json"); + input.headers.set("User-Agent", CODEX_USER_AGENT); + if (isStreamingRequest) { + input.headers.set("accept", "text/event-stream"); + } if (!input.headers.get("originator")) { input.headers.set("originator", CODEX_ORIGINATOR); } diff --git a/src/providers/proxies/codex-websocket.ts b/src/providers/proxies/codex-websocket.ts index 2cbe7de..25d8c83 100644 --- a/src/providers/proxies/codex-websocket.ts +++ b/src/providers/proxies/codex-websocket.ts @@ -13,6 +13,12 @@ import { isObjectRecord, readBooleanField } from "../../utils/object"; const SESSION_SOCKET_TTL_MS = 5 * 60 * 1000; const CONNECT_TIMEOUT_MS = 15_000; +const RESPONSE_IDLE_TIMEOUT_MS = 5 * 60 * 1000; +const MAX_SOCKET_AGE_MS = 55 * 60 * 1000; +const CONNECTION_LIMIT_RETRIES = 5; +const STREAM_FAILURE_RETRIES = 5; +const CONNECTION_LIMIT_REACHED_CODE = "websocket_connection_limit_reached"; +const WEBSOCKET_MESSAGE_TOO_BIG_CLOSE_CODE = 1009; type WebSocketEventType = "open" | "message" | "error" | "close"; type WebSocketListener = (event: unknown) => void; @@ -51,6 +57,7 @@ type ContinuationState = { type CachedSocket = { socket: WebSocketLike; + connectedAt: number; busy: boolean; idleTimer: ReturnType | null; continuation: ContinuationState | null; @@ -58,6 +65,8 @@ type CachedSocket = { }; const socketCache = new Map(); +const fallbackSocketKeys = new Map>(); +const streamFailureCounts = new Map(); const pendingSocketKeys = new Set(); const suppressContinuationStoreKeys = new Set(); @@ -94,6 +103,9 @@ const readString = (value: unknown): string | null => { const isSocketOpen = (socket: WebSocketLike): boolean => socket.readyState === undefined || socket.readyState === 1; +const isSocketFresh = (cached: CachedSocket): boolean => + Date.now() - cached.connectedAt < MAX_SOCKET_AGE_MS; + const closeSocket = (socket: WebSocketLike): void => { try { socket.close(1000, "done"); @@ -102,6 +114,84 @@ const closeSocket = (socket: WebSocketLike): void => { } }; +const extractWebSocketError = (event: unknown): Error => { + if (isObjectRecord(event)) { + const message = readString(event.message); + if (message) { + return new Error(message); + } + + const nestedError = event.error; + if (nestedError instanceof Error && nestedError.message) { + return nestedError; + } + if (isObjectRecord(nestedError)) { + const nestedMessage = readString(nestedError.message); + if (nestedMessage) { + return new Error(nestedMessage); + } + } + } + + return new Error("WebSocket error"); +}; + +const extractWebSocketCloseError = (event: unknown): Error => { + if (!isObjectRecord(event)) { + return new Error("WebSocket closed"); + } + + const code = typeof event.code === "number" ? event.code : null; + const reason = readString(event.reason); + const codeText = code === null ? "" : ` ${code}`; + const reasonText = + reason ?? + (code === WEBSOCKET_MESSAGE_TOO_BIG_CLOSE_CODE ? "message too big" : null); + return new Error( + `WebSocket closed${codeText}${reasonText ? ` ${reasonText}` : ""}`.trim() + ); +}; + +const clearSessionFallback = (key: string): void => { + const timer = fallbackSocketKeys.get(key); + if (timer) { + clearTimeout(timer); + fallbackSocketKeys.delete(key); + } +}; + +const markSessionFallback = (key: string | null): void => { + if (!key) { + return; + } + clearSessionFallback(key); + const timer = setTimeout(() => { + fallbackSocketKeys.delete(key); + streamFailureCounts.delete(key); + }, SESSION_SOCKET_TTL_MS); + fallbackSocketKeys.set(key, timer); +}; + +const clearSessionStreamFailures = (key: string | null): void => { + if (!key) { + return; + } + streamFailureCounts.delete(key); +}; + +const recordSessionStreamFailure = (key: string | null): number => { + if (!key) { + return 0; + } + + const failures = (streamFailureCounts.get(key) ?? 0) + 1; + streamFailureCounts.set(key, failures); + if (failures > STREAM_FAILURE_RETRIES) { + markSessionFallback(key); + } + return failures; +}; + const scheduleExpiry = (key: string, cached: CachedSocket): void => { if (cached.idleTimer) { clearTimeout(cached.idleTimer); @@ -114,6 +204,8 @@ const scheduleExpiry = (key: string, cached: CachedSocket): void => { closeSocket(cached.socket); socketCache.delete(key); + clearSessionFallback(key); + clearSessionStreamFailures(key); }, SESSION_SOCKET_TTL_MS); }; @@ -167,15 +259,19 @@ const connectWebSocket = ( cleanup(); resolve(socket); }; - const onError = (): void => fail(new Error("WebSocket error")); - const onClose = (): void => fail(new Error("WebSocket closed")); + const onError = (event: unknown): void => + fail(extractWebSocketError(event)); + const onClose = (event: unknown): void => + fail(extractWebSocketCloseError(event)); const onAbort = (): void => { closeSocket(socket); fail(new Error("Request was aborted")); }; const onTimeout = (): void => { closeSocket(socket); - fail(new Error("WebSocket connect timed out")); + fail( + new Error(`WebSocket connect timeout after ${CONNECT_TIMEOUT_MS}ms`) + ); }; try { @@ -206,11 +302,12 @@ const acquireSocket = async ( }> => { if (!cacheKey) { const socket = await connectWebSocket(headers, signal); - return { + const acquired = { socket, cached: null, - release: () => closeSocket(socket), + release: (): void => closeSocket(acquired.socket), }; + return acquired; } const existing = socketCache.get(cacheKey); @@ -226,7 +323,7 @@ const acquireSocket = async ( existing.idleTimer = null; } - if (isSocketOpen(existing.socket)) { + if (isSocketOpen(existing.socket) && isSocketFresh(existing)) { existing.busy = true; return { socket: existing.socket, @@ -264,6 +361,7 @@ const acquireSocket = async ( } const cached: CachedSocket = { socket, + connectedAt: Date.now(), busy: true, idleTimer: null, continuation: null, @@ -274,8 +372,8 @@ const acquireSocket = async ( socket, cached, release(keep: boolean): void { - if (!(keep && isSocketOpen(socket))) { - closeSocket(socket); + if (!(keep && isSocketOpen(cached.socket))) { + closeSocket(cached.socket); socketCache.delete(cacheKey); return; } @@ -293,6 +391,11 @@ export const closeCodexWebSocketSessions = (): void => { closeSocket(cached.socket); } socketCache.clear(); + for (const timer of fallbackSocketKeys.values()) { + clearTimeout(timer); + } + fallbackSocketKeys.clear(); + streamFailureCounts.clear(); }; const withoutContinuationFields = ( @@ -513,12 +616,24 @@ const buildRequestBody = ( const encodeSse = (payload: unknown): Uint8Array => new TextEncoder().encode(`data: ${JSON.stringify(payload)}\n\n`); -const decodeMessageData = (data: unknown): string | null => { +const encodeDoneSse = (): Uint8Array => + new TextEncoder().encode("data: [DONE]\n\n"); + +const decodeMessageData = async (data: unknown): Promise => { if (typeof data === "string") { return data; } - if (data instanceof ArrayBuffer || ArrayBuffer.isView(data)) { - return new TextDecoder().decode(data); + if (data instanceof ArrayBuffer) { + return new TextDecoder().decode(new Uint8Array(data)); + } + if (ArrayBuffer.isView(data)) { + return new TextDecoder().decode( + new Uint8Array(data.buffer, data.byteOffset, data.byteLength) + ); + } + if (isObjectRecord(data) && typeof data.arrayBuffer === "function") { + const arrayBuffer = (await data.arrayBuffer()) as ArrayBuffer; + return new TextDecoder().decode(new Uint8Array(arrayBuffer)); } return null; }; @@ -533,6 +648,25 @@ const readPayloadStatus = (payload: Record): number => { const isErrorPayload = (payload: Record): boolean => payload.type === "error" || payload.type === "response.failed"; +const isTerminalPayload = (payload: Record): boolean => + payload.type === "response.completed" || + payload.type === "response.done" || + payload.type === "response.incomplete" || + isErrorPayload(payload); + +const isConnectionLimitPayload = (payload: Record): boolean => + payload.type === "error" && + isObjectRecord(payload.error) && + payload.error.code === CONNECTION_LIMIT_REACHED_CODE; + +const isSessionConcurrencyError = (error: unknown): boolean => + error instanceof Error && + (error.message === "Codex WebSocket session is busy" || + error.message === "Codex WebSocket session is connecting"); + +const isUserCancelledStage = (stage: string): boolean => + stage === "request_aborted" || stage === "downstream_cancel"; + const isCacheableFinalEvent = ( eventType: unknown, responseStatus: string | null @@ -546,8 +680,13 @@ const buildWebSocketHeaders = ( ): Headers => { const nextHeaders = new Headers(headers); nextHeaders.delete("accept"); + nextHeaders.delete("connection"); + nextHeaders.delete("content-length"); nextHeaders.delete("content-type"); + nextHeaders.delete("host"); nextHeaders.delete("openai-beta"); + nextHeaders.delete("transfer-encoding"); + nextHeaders.delete("upgrade"); nextHeaders.set("OpenAI-Beta", CODEX_WEBSOCKET_BETA_HEADER); applyCodexSessionHeaders(nextHeaders, requestId); return nextHeaders; @@ -574,6 +713,10 @@ export const tryProxyCodexWebSocket = async ( const cacheKey = sessionId ? `${input.accountKey}:${sessionId}` : null; const headers = buildWebSocketHeaders(input.headers, requestId); + if (cacheKey && fallbackSocketKeys.has(cacheKey)) { + return null; + } + let acquired: Awaited> | null = null; const fullBody = withoutTransportFields( sessionId ? { ...body, prompt_cache_key: requestId } : body @@ -582,8 +725,11 @@ export const tryProxyCodexWebSocket = async ( try { acquired = await acquireSocket(headers, cacheKey, input.signal); requestBody = buildRequestBody(fullBody, acquired.cached); - } catch { + } catch (error) { acquired?.release(false); + if (!input.signal?.aborted && !isSessionConcurrencyError(error)) { + recordSessionStreamFailure(cacheKey); + } return null; } @@ -597,6 +743,11 @@ export const tryProxyCodexWebSocket = async ( let settled = false; let terminal = false; let failure: unknown = null; + let emittedPayload = false; + let connectionLimitAttempts = 0; + let retryingConnectionLimit = false; + let responseIdleTimer: ReturnType | null = null; + let messageChain = Promise.resolve(); let wake: (() => void) | null = null; const queue: Record[] = []; @@ -609,6 +760,24 @@ export const tryProxyCodexWebSocket = async ( resolve(); }; + const clearResponseIdleTimer = (): void => { + if (!responseIdleTimer) { + return; + } + clearTimeout(responseIdleTimer); + responseIdleTimer = null; + }; + + const resetResponseIdleTimer = (stage: string): void => { + if (settled) { + return; + } + clearResponseIdleTimer(); + responseIdleTimer = setTimeout(() => { + fail(stage, new Error(stage)); + }, RESPONSE_IDLE_TIMEOUT_MS); + }; + let firstPayloadResolve: ((payload: Record) => void) | null = null; let firstPayloadReject: ((error: unknown) => void) | null = null; @@ -620,13 +789,74 @@ export const tryProxyCodexWebSocket = async ( ); const cleanup = (): void => { + clearResponseIdleTimer(); active.socket.removeEventListener("message", onMessage); active.socket.removeEventListener("error", onError); active.socket.removeEventListener("close", onClose); input.signal?.removeEventListener("abort", onAbort); }; - const fail = (error: unknown): void => { + const attachSocketListeners = (): void => { + active.socket.addEventListener("message", onMessage); + active.socket.addEventListener("error", onError); + active.socket.addEventListener("close", onClose); + }; + + const sendRequest = (): void => { + if (settled) { + return; + } + if (input.signal?.aborted) { + fail("request_aborted", new Error("Request was aborted")); + return; + } + try { + active.socket.send( + JSON.stringify({ ...requestBody, type: "response.create" }) + ); + resetResponseIdleTimer("idle_timeout_waiting_for_websocket"); + } catch (error) { + fail("send_failed", error); + } + }; + + const retryConnectionLimit = async (): Promise => { + if (settled || retryingConnectionLimit) { + return; + } + if (connectionLimitAttempts >= CONNECTION_LIMIT_RETRIES) { + fail( + "connection_limit_retries_exhausted", + new Error(CONNECTION_LIMIT_REACHED_CODE) + ); + return; + } + + connectionLimitAttempts++; + retryingConnectionLimit = true; + clearResponseIdleTimer(); + active.socket.removeEventListener("message", onMessage); + active.socket.removeEventListener("error", onError); + active.socket.removeEventListener("close", onClose); + closeSocket(active.socket); + + try { + const nextSocket = await connectWebSocket(headers, input.signal); + active.socket = nextSocket; + if (active.cached) { + active.cached.socket = nextSocket; + active.cached.connectedAt = Date.now(); + } + attachSocketListeners(); + sendRequest(); + } catch (error) { + fail("connection_limit_retry_failed", error); + } finally { + retryingConnectionLimit = false; + } + }; + + const fail = (stage: string, error: unknown): void => { if (settled) { return; } @@ -635,6 +865,9 @@ export const tryProxyCodexWebSocket = async ( failure = error; cleanup(); active.release(false); + if (!isUserCancelledStage(stage)) { + recordSessionStreamFailure(cacheKey); + } firstPayloadReject?.(error); wakePull(); }; @@ -645,13 +878,15 @@ export const tryProxyCodexWebSocket = async ( } settled = true; cleanup(); - if (active.cached) { - if ( + const canStoreContinuation = Boolean( + active.cached && keepSocket && responseId && isCacheableFinalEvent(finalEventType, finalResponseStatus) && !active.cached.skipNextContinuationStore - ) { + ); + if (active.cached) { + if (canStoreContinuation && responseId) { active.cached.continuation = { lastRequestBody: fullBody, lastResponseId: responseId, @@ -662,68 +897,88 @@ export const tryProxyCodexWebSocket = async ( } active.cached.skipNextContinuationStore = false; } + if (isCacheableFinalEvent(finalEventType, finalResponseStatus)) { + clearSessionStreamFailures(cacheKey); + } active.release(keepSocket); wakePull(); }; - const onAbort = (): void => fail(new Error("Request was aborted")); + const onAbort = (): void => + fail("request_aborted", new Error("Request was aborted")); + + const onError = (event: unknown): void => + fail("socket_error", extractWebSocketError(event)); - const onError = (): void => fail(new Error("WebSocket error")); + const onClose = (event: unknown): void => { + messageChain = messageChain + .then(() => { + if (terminal) { + wakePull(); + return; + } + fail( + "socket_closed_before_terminal", + extractWebSocketCloseError(event) + ); + }) + .catch((error: unknown) => { + fail("message_parse_failed", error); + }); + }; - const onClose = (): void => { - if (terminal) { - wakePull(); + const handleMessage = async (event: unknown): Promise => { + if (settled) { + return; + } + const text = await decodeMessageData( + isObjectRecord(event) ? event.data : null + ); + if (settled || !text) { return; } - fail(new Error("WebSocket closed")); - }; - const onMessage = (event: unknown): void => { - try { - const text = decodeMessageData(isObjectRecord(event) ? event.data : null); - if (!text) { - return; - } + const payload = JSON.parse(text) as unknown; + if (!isObjectRecord(payload)) { + return; + } + resetResponseIdleTimer("idle_timeout_waiting_for_websocket"); - const payload = JSON.parse(text) as unknown; - if (!isObjectRecord(payload)) { - return; - } + if (!emittedPayload && isConnectionLimitPayload(payload)) { + retryConnectionLimit().catch((error: unknown) => { + fail("connection_limit_retry_failed", error); + }); + return; + } - if ( - payload.type === "response.completed" || - payload.type === "response.done" || - payload.type === "response.incomplete" || - isErrorPayload(payload) - ) { - terminal = true; - finalEventType = payload.type; - finalResponseStatus = isObjectRecord(payload.response) - ? readString(payload.response.status) - : null; - } - queue.push(payload); - if (queue.length === 1) { - firstPayloadResolve?.(payload); - } - wakePull(); - } catch (error) { - fail(error); + if (isTerminalPayload(payload)) { + clearResponseIdleTimer(); + terminal = true; + finalEventType = payload.type; + finalResponseStatus = isObjectRecord(payload.response) + ? readString(payload.response.status) + : null; + } + emittedPayload = true; + queue.push(payload); + if (queue.length === 1) { + firstPayloadResolve?.(payload); } + wakePull(); }; - active.socket.addEventListener("message", onMessage); - active.socket.addEventListener("error", onError); - active.socket.addEventListener("close", onClose); + const onMessage = (event: unknown): void => { + messageChain = messageChain + .then(() => handleMessage(event)) + .catch((error: unknown) => { + fail("message_parse_failed", error); + }); + }; + + attachSocketListeners(); input.signal?.addEventListener("abort", onAbort); - try { - active.socket.send( - JSON.stringify({ ...requestBody, type: "response.create" }) - ); - } catch (error) { - fail(error); - } + sendRequest(); let first: Record; try { @@ -755,24 +1010,19 @@ export const tryProxyCodexWebSocket = async ( } controller.enqueue(encodeSse(payload)); - if ( - payload.type === "response.completed" || - payload.type === "response.done" || - payload.type === "response.incomplete" - ) { - terminal = true; - finalEventType = payload.type; - finalResponseStatus = isObjectRecord(payload.response) - ? readString(payload.response.status) - : null; - finish(); - } else if (isErrorPayload(payload)) { - keepSocket = false; + if (isTerminalPayload(payload)) { + if (payload.type === "response.incomplete") { + keepSocket = false; + } + if (isErrorPayload(payload)) { + keepSocket = false; + } terminal = true; finalEventType = payload.type; finalResponseStatus = isObjectRecord(payload.response) ? readString(payload.response.status) : null; + controller.enqueue(encodeDoneSse()); finish(); } }; @@ -788,7 +1038,12 @@ export const tryProxyCodexWebSocket = async ( if (queue.length) { const payload = queue.shift(); if (payload) { - processPayload(payload, controller); + try { + processPayload(payload, controller); + } catch (error) { + fail("downstream_enqueue_failed", error); + controller.error(error); + } } return; } @@ -804,7 +1059,7 @@ export const tryProxyCodexWebSocket = async ( if (settled) { return; } - fail(new Error("Response stream was cancelled")); + fail("downstream_cancel", new Error("Response stream was cancelled")); }, }); diff --git a/tests/domain/models-dev-contract.test.ts b/tests/domain/models-dev-contract.test.ts index 61bbf7d..8ace65f 100644 --- a/tests/domain/models-dev-contract.test.ts +++ b/tests/domain/models-dev-contract.test.ts @@ -16,6 +16,14 @@ const upstreamRegistry = { npm: "@ai-sdk/openai", }, }, + "gpt-5.3-codex-spark": { + id: "gpt-5.3-codex-spark", + name: "GPT-5.3 Codex Spark", + provider: { + api: "https://api.openai.com/v1", + npm: "@ai-sdk/openai", + }, + }, "gpt-5": { id: "gpt-5", name: "GPT-5", @@ -130,6 +138,9 @@ describe("models registry contract", () => { }; expect(kleis.env).toEqual(["KLEIS_API_KEY"]); expect(kleis.models?.["gpt-5.3-codex"]?.id).toBe("gpt-5.3-codex"); + expect(kleis.models?.["gpt-5.3-codex-spark"]?.id).toBe( + "gpt-5.3-codex-spark" + ); expect(kleis.models?.["openai/gpt-5.3-codex"]).toBeUndefined(); expect(kleis.models?.["github-copilot/gpt-5"]?.id).toBe( "github-copilot/gpt-5" @@ -212,7 +223,7 @@ describe("models registry contract", () => { models?: Record; }; expect(openai.env).toEqual(["OPENAI_API_KEY"]); - expect(Object.keys(openai.models ?? {})).toHaveLength(3); + expect(Object.keys(openai.models ?? {})).toHaveLength(4); const kleis = registry.kleis as { models?: Record; @@ -234,7 +245,7 @@ describe("models registry contract", () => { models?: Record; }; expect(openai.env).toEqual(["OPENAI_API_KEY"]); - expect(Object.keys(openai.models ?? {})).toHaveLength(3); + expect(Object.keys(openai.models ?? {})).toHaveLength(4); expect(registry.anthropic).toBeDefined(); expect(registry["github-copilot"]).toBeDefined(); @@ -273,6 +284,7 @@ describe("models registry contract", () => { expect(openai.env).toEqual(["OPENAI_API_KEY"]); expect(Object.keys(openai.models ?? {})).toEqual([ "gpt-5.3-codex", + "gpt-5.3-codex-spark", "gpt-5", "text-embedding-3-large", ]); @@ -325,7 +337,10 @@ describe("models registry contract", () => { const kleis = registry.kleis as { models?: Record; }; - expect(Object.keys(kleis.models ?? {})).toEqual(["gpt-5.3-codex"]); + expect(Object.keys(kleis.models ?? {})).toEqual([ + "gpt-5.3-codex", + "gpt-5.3-codex-spark", + ]); }); test("account-scoped mode only narrows the kleis aggregate provider", () => { diff --git a/tests/providers/proxy-contract.test.ts b/tests/providers/proxy-contract.test.ts index 7090d86..cca3bc7 100644 --- a/tests/providers/proxy-contract.test.ts +++ b/tests/providers/proxy-contract.test.ts @@ -6,6 +6,7 @@ import { CODEX_ACCOUNT_ID_HEADER, CODEX_ORIGINATOR, CODEX_RESPONSE_ENDPOINT, + CODEX_USER_AGENT, CODEX_WEBSOCKET_BETA_HEADER, COPILOT_INITIATOR_HEADER, COPILOT_VISION_HEADER, @@ -291,9 +292,11 @@ describe("proxy contract: codex", () => { expect(headers.get("authorization")).toBe("Bearer codex-access"); expect(headers.get(CODEX_ACCOUNT_ID_HEADER)).toBe("acct-meta"); + expect(headers.get("content-type")).toBe("application/json"); expect(headers.get("originator")).toBe(CODEX_ORIGINATOR); + expect(headers.get("User-Agent")).toBe(CODEX_USER_AGENT); expect(result.upstreamUrl).toBe(CODEX_RESPONSE_ENDPOINT); - expect(result.bodyText).toBe(bodyText); + expect(JSON.parse(result.bodyText)).toEqual({ ...bodyJson, store: false }); }); test("uses account id when metadata is absent", () => { @@ -401,6 +404,7 @@ describe("proxy contract: codex", () => { instructions: "Keep responses concise", max_output_tokens: 4096, max_completion_tokens: 4096, + store: true, input: [ { role: "user", @@ -421,9 +425,11 @@ describe("proxy contract: codex", () => { const transformed = JSON.parse(result.bodyText) as { max_output_tokens?: number; max_completion_tokens?: number; + store?: boolean; }; expect(transformed.max_output_tokens).toBeUndefined(); expect(transformed.max_completion_tokens).toBeUndefined(); + expect(transformed.store).toBe(false); }); test("extracts normalized usage from non-streaming responses", async () => { @@ -614,6 +620,7 @@ describe("proxy contract: codex", () => { const headers = new Headers({ authorization: "Bearer codex-access", [CODEX_ACCOUNT_ID_HEADER]: "acct_1", + "content-length": "123", "session-id": "raw-session-id", "x-session-affinity": "session-1", }); @@ -695,6 +702,7 @@ describe("proxy contract: codex", () => { ); expect(lowerHeaderEntries["openai-beta"]).toBe(CODEX_WEBSOCKET_BETA_HEADER); expect(lowerHeaderEntries.authorization).toBe("Bearer codex-access"); + expect(lowerHeaderEntries["content-length"]).toBeUndefined(); expect(lowerHeaderEntries.session_id).toBeUndefined(); expect(lowerHeaderEntries["x-session-affinity"]).toBeUndefined(); expect(lowerHeaderEntries["session-id"]).toMatch(/^kleis_/); @@ -1295,6 +1303,206 @@ describe("proxy contract: codex", () => { expect(thirdBody.input).toEqual(thirdInput); }); + test("retries websocket connection limit errors before streaming output", async () => { + const sentBodies: unknown[] = []; + const sockets = installManualCodexWebSocketMock(sentBodies); + const headers = new Headers({ + authorization: "Bearer codex-access", + [CODEX_ACCOUNT_ID_HEADER]: "acct_1", + "x-session-affinity": "retry-session", + }); + + const responsePromise = tryProxyCodexWebSocket({ + headers, + bodyJson: { + model: "gpt-5-codex", + stream: true, + input: [ + { role: "user", content: [{ type: "input_text", text: "Retry" }] }, + ], + }, + accountKey: "key-1:account-1", + }); + + await waitFor(() => sentBodies.length === 1); + sockets[0]?.dispatch("message", { + data: JSON.stringify({ + type: "error", + error: { code: "websocket_connection_limit_reached" }, + }), + }); + + await waitFor(() => sentBodies.length === 2 && sockets.length === 2); + sockets[1]?.dispatch("message", { + data: JSON.stringify({ + type: "response.created", + response: { id: "resp_1" }, + }), + }); + sockets[1]?.dispatch("message", { + data: JSON.stringify({ + type: "response.completed", + response: { + id: "resp_1", + usage: { + input_tokens: 10, + output_tokens: 2, + input_tokens_details: { cached_tokens: 3 }, + }, + status: "completed", + }, + }), + }); + + const response = await responsePromise; + expect(response).not.toBeNull(); + const text = await response?.text(); + expect(text).toContain("response.completed"); + expect(sentBodies).toHaveLength(2); + }); + + test("decodes binary websocket messages", async () => { + const sentBodies: unknown[] = []; + const sockets = installManualCodexWebSocketMock(sentBodies); + const headers = new Headers({ + authorization: "Bearer codex-access", + [CODEX_ACCOUNT_ID_HEADER]: "acct_1", + }); + const encoder = new TextEncoder(); + + const responsePromise = tryProxyCodexWebSocket({ + headers, + bodyJson: { + model: "gpt-5-codex", + stream: true, + input: [ + { role: "user", content: [{ type: "input_text", text: "Binary" }] }, + ], + }, + accountKey: "key-1:account-1", + }); + + await waitFor(() => sentBodies.length === 1); + for (const event of [ + { type: "response.created", response: { id: "resp_1" } }, + { + type: "response.completed", + response: { id: "resp_1", status: "completed" }, + }, + ]) { + sockets[0]?.dispatch("message", { + data: encoder.encode(JSON.stringify(event)).buffer, + }); + } + + const response = await responsePromise; + expect(response).not.toBeNull(); + const text = await response?.text(); + expect(text).toContain("response.completed"); + expect(text).toContain("data: [DONE]"); + }); + + test("preserves websocket message order for async binary messages", async () => { + const sentBodies: unknown[] = []; + const sockets = installManualCodexWebSocketMock(sentBodies); + const headers = new Headers({ + authorization: "Bearer codex-access", + [CODEX_ACCOUNT_ID_HEADER]: "acct_1", + }); + const encoder = new TextEncoder(); + let resolveBuffer: ((buffer: ArrayBuffer) => void) | null = null; + const delayedBuffer = new Promise((resolve) => { + resolveBuffer = resolve; + }); + + const responsePromise = tryProxyCodexWebSocket({ + headers, + bodyJson: { + model: "gpt-5-codex", + stream: true, + input: [ + { role: "user", content: [{ type: "input_text", text: "Order" }] }, + ], + }, + accountKey: "key-1:account-1", + }); + + await waitFor(() => sentBodies.length === 1); + sockets[0]?.dispatch("message", { + data: { arrayBuffer: () => delayedBuffer }, + }); + sockets[0]?.dispatch("message", { + data: JSON.stringify({ + type: "response.completed", + response: { id: "resp_1", status: "completed" }, + }), + }); + sockets[0]?.dispatch("close", { code: 1000 }); + + let response: Response | null | undefined; + responsePromise.then((value) => { + response = value; + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(response).toBeUndefined(); + + const createdBytes = encoder.encode( + JSON.stringify({ type: "response.created", response: { id: "resp_1" } }) + ); + resolveBuffer?.( + createdBytes.buffer.slice( + createdBytes.byteOffset, + createdBytes.byteOffset + createdBytes.byteLength + ) + ); + await waitFor(() => response !== undefined); + if (!response) { + throw new Error("Expected websocket response"); + } + const text = await response.text(); + expect(text.indexOf("response.created")).toBeLessThan( + text.indexOf("response.completed") + ); + }); + + test("keeps retrying websocket setup failures before session fallback", async () => { + const sentBodies: unknown[] = []; + const sockets = installManualCodexWebSocketMock(sentBodies, { + autoOpen: false, + }); + const headers = new Headers({ + authorization: "Bearer codex-access", + [CODEX_ACCOUNT_ID_HEADER]: "acct_1", + "x-session-affinity": "retry-budget-session", + }); + const bodyJson = { + model: "gpt-5-codex", + stream: true, + input: [ + { role: "user", content: [{ type: "input_text", text: "Retry" }] }, + ], + }; + + for (let attempt = 0; attempt < 6; attempt++) { + const responsePromise = tryProxyCodexWebSocket({ + headers, + bodyJson, + accountKey: "key-1:account-1", + }); + await waitFor(() => sockets.length === attempt + 1); + sockets[attempt]?.dispatch("close", { code: 1006 }); + expect(await responsePromise).toBeNull(); + } + + const fallback = await tryProxyCodexWebSocket({ + headers, + bodyJson, + accountKey: "key-1:account-1", + }); + expect(fallback).toBeNull(); + expect(sockets).toHaveLength(6); + }); + test("treats same-session websocket connect races as busy", async () => { const sentBodies: unknown[] = []; const sockets = installManualCodexWebSocketMock(sentBodies, {