diff --git a/scripts/backfill_manifest_file_sizes.mjs b/scripts/backfill_manifest_file_sizes.mjs index 988fefd0f0..7dd8dd4f1b 100644 --- a/scripts/backfill_manifest_file_sizes.mjs +++ b/scripts/backfill_manifest_file_sizes.mjs @@ -11,34 +11,16 @@ * bun scripts/backfill_manifest_file_sizes.mjs --all --apply */ -import { mkdirSync, writeFileSync } from 'node:fs' +import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs' import { dirname, resolve } from 'node:path' import process from 'node:process' import { fileURLToPath } from 'node:url' import { S3Client } from '@bradenmacdonald/s3-lite-client' -import { config } from 'dotenv' +import { parse } from 'dotenv' import pg from 'pg' const __dirname = dirname(fileURLToPath(import.meta.url)) -for (const envPath of [ - '../.env', - '../.env.local', - '../internal/cloudflare/.env.prod', - '../internal/cloudflare/.env.local', -]) { - config({ path: resolve(__dirname, envPath), override: true, quiet: true }) -} - -const DB_URL_ENV_KEYS = [ - 'MAIN_SUPABASE_DB_URL', - 'DATABASE_URL', - 'POSTGRES_URL', - 'SUPABASE_DB_URL', - 'SUPABASE_DB_DIRECT_URL', - 'DIRECT_URL', -] - function hasFlag(name) { return process.argv.includes(name) } @@ -64,6 +46,55 @@ function getArgValue(name) { return undefined } +function getTarget() { + if (hasFlag('--help') || hasFlag('-h')) + return 'prod' + const target = getArgValue('--target') ?? (hasFlag('--local') ? 'local' : 'prod') + if (target !== 'prod' && target !== 'local') + throw new Error('--target must be "prod" or "local"') + return target +} + +const target = getTarget() +const sharedEnvPaths = [ + '../.env', +] +const targetEnvPaths = target === 'prod' + ? [ + '../internal/cloudflare/.env.prod', + ] + : [ + '../.env.local', + '../internal/cloudflare/.env.local', + ] + +function loadEnvFiles(envPaths) { + const loaded = {} + for (const envPath of envPaths) { + const resolvedPath = resolve(__dirname, envPath) + if (!existsSync(resolvedPath)) + continue + Object.assign(loaded, parse(readFileSync(resolvedPath))) + } + return loaded +} + +const targetEnv = loadEnvFiles(targetEnvPaths) +const runtimeEnv = loadEnvFiles([...sharedEnvPaths, ...targetEnvPaths]) + +for (const [key, value] of Object.entries(runtimeEnv)) { + process.env[key] = value +} + +const DB_URL_ENV_KEYS = [ + 'MAIN_SUPABASE_DB_URL', + 'DATABASE_URL', + 'POSTGRES_URL', + 'SUPABASE_DB_URL', + 'SUPABASE_DB_DIRECT_URL', + 'DIRECT_URL', +] + function getNumberArg(name, fallback) { const value = getArgValue(name) if (value === undefined) @@ -74,13 +105,46 @@ function getNumberArg(name, fallback) { return parsed } -function getDatabaseUrl() { +function getDatabaseUrl(databaseEnv) { for (const key of DB_URL_ENV_KEYS) { - const value = process.env[key] + const value = databaseEnv[key] if (value) return value } - throw new Error(`Missing Postgres URL. Set one of: ${DB_URL_ENV_KEYS.join(', ')}`) + throw new Error(`Missing Postgres URL in ${targetEnvPaths.join(', ')}. Set one of: ${DB_URL_ENV_KEYS.join(', ')}`) +} + +function isLocalDatabaseUrl(databaseUrl) { + try { + let { hostname } = new URL(databaseUrl) + if (hostname.startsWith('[') && hostname.endsWith(']')) + hostname = hostname.slice(1, -1) + return ['127.0.0.1', 'localhost', '::1'].includes(hostname) + } + catch { + return databaseUrl.includes('127.0.0.1') + || databaseUrl.includes('localhost') + || databaseUrl.includes('::1') + || databaseUrl.includes('[::1]') + } +} + +function getSafeDatabaseUrl() { + const databaseUrl = getDatabaseUrl(targetEnv) + if (target === 'prod' && isLocalDatabaseUrl(databaseUrl)) { + throw new Error('Refusing to use a local Postgres URL for the default prod target. Pass --target=local only when you intentionally want local.') + } + return databaseUrl +} + +function describeDatabaseUrl(databaseUrl) { + try { + const { host } = new URL(databaseUrl) + return host + } + catch { + return 'unknown host' + } } function getRequiredEnv(name) { @@ -257,6 +321,8 @@ Options: --batch-size DB page size. Default: 500. --concurrency Storage HEAD/RANGE concurrency. Default: 20. --include-deleted Include deleted bundles. + --target prod|local Env target. Default: prod. + --local Alias for --target=local. --verbose Print every checked row. `) return @@ -278,9 +344,12 @@ Options: if (appVersionIdRaw && (!Number.isFinite(appVersionId) || appVersionId <= 0)) throw new Error('--app-version-id must be a positive integer') + const databaseUrl = getSafeDatabaseUrl() + console.log(`Using ${target} database target: ${describeDatabaseUrl(databaseUrl)}`) + const pool = new pg.Pool({ - connectionString: getDatabaseUrl(), - ssl: { rejectUnauthorized: false }, + connectionString: databaseUrl, + ssl: target === 'prod' ? { rejectUnauthorized: false } : false, }) const s3 = new S3Client({ accessKey: getRequiredEnv('S3_ACCESS_KEY_ID'), @@ -301,6 +370,7 @@ Options: includeDeleted, missingSize: 0, scannedAt: new Date().toISOString(), + target, unchanged: 0, } diff --git a/supabase/functions/_backend/triggers/on_manifest_create.ts b/supabase/functions/_backend/triggers/on_manifest_create.ts index fae8526f2c..dfe316e94e 100644 --- a/supabase/functions/_backend/triggers/on_manifest_create.ts +++ b/supabase/functions/_backend/triggers/on_manifest_create.ts @@ -14,7 +14,14 @@ const SIZE_RETRY_DELAY_MS = 500 const MANIFEST_UPDATE_RETRY_ATTEMPTS = 3 const MANIFEST_UPDATE_RETRY_DELAY_MS = 300 -function getQueueLogMetadata(c: Context) { +interface QueueLogMetadata { + queueName: string | null + queueMsgId: string | null + queueReadCount: string | null + cfId: string | null +} + +function getQueueLogMetadata(c: Context): QueueLogMetadata { return { queueName: c.req.header('x-capgo-queue-name') ?? null, queueMsgId: c.req.header('x-capgo-queue-msg-id') ?? null, @@ -79,8 +86,7 @@ async function runManifestUpdateWithRetry( } } -async function updateManifestSize(c: Context, record: Database['public']['Tables']['manifest']['Row']) { - const queue = getQueueLogMetadata(c) +export async function updateManifestSize(c: Context, record: Database['public']['Tables']['manifest']['Row'], queue = getQueueLogMetadata(c)) { if (!record.s3_path) { cloudlog({ requestId: c.get('requestId'), message: 'No s3 path', id: record.id, app_version_id: record.app_version_id, file_name: record.file_name, queue }) throw simpleError('no_s3_path', 'No s3 path', { record }) diff --git a/supabase/functions/_backend/triggers/queue_consumer.ts b/supabase/functions/_backend/triggers/queue_consumer.ts index 743394de0c..0adaca355d 100644 --- a/supabase/functions/_backend/triggers/queue_consumer.ts +++ b/supabase/functions/_backend/triggers/queue_consumer.ts @@ -1,20 +1,23 @@ import type { Context } from 'hono' import type { MiddlewareKeyVariables } from '../utils/hono.ts' +import type { Database } from '../utils/supabase.types.ts' import { type } from 'arktype' import { Hono } from 'hono/tiny' // --- Worker logic imports --- import { safeParseSchema } from '../utils/ark_validation.ts' import { sendDiscordAlert } from '../utils/discord.ts' import { BRES, middlewareAPISecret, parseBody, simpleError } from '../utils/hono.ts' -import { cloudlog, cloudlogErr } from '../utils/logging.ts' +import { cloudlog, cloudlogErr, serializeError } from '../utils/logging.ts' import { closeClient, getPgClient } from '../utils/pg.ts' import { backgroundTask, getEnv } from '../utils/utils.ts' +import { updateManifestSize } from './on_manifest_create.ts' // Define constants const DEFAULT_BATCH_SIZE = 950 // Default batch size for queue reads limit of CF is 1000 fetches so we take a safe margin const MANIFEST_QUEUE_BATCH_SIZE = 100 const DEFAULT_QUEUE_HTTP_CONCURRENCY = 25 const MANIFEST_QUEUE_HTTP_CONCURRENCY = 10 +const QUEUE_HTTP_TIMEOUT_MS = 15_000 export const MAX_QUEUE_READS = 5 const DISCORD_IGNORED_ERROR_CODES = new Set(['version_not_found', 'no_channel']) @@ -61,6 +64,17 @@ interface FailureDetail { response_body?: string payload_size: number cf_id: string + duration_ms?: number + target_url?: string +} + +interface ProcessedQueueMessage extends Message { + httpResponse: Response + errorDetails: Awaited> + cfId: string + payloadSize: number + durationMs: number + targetUrl: string | null } function extractMessageBody(message: Message): Record { @@ -172,6 +186,189 @@ function generateUUID(): string { return crypto.randomUUID() } +function resolveFunctionUrl(c: Context, function_name: string, function_type: string | null | undefined): string { + const cfPpUrl = getEnv(c, 'CLOUDFLARE_PP_FUNCTION_URL') + const cfUrl = getEnv(c, 'CLOUDFLARE_FUNCTION_URL') + const normalizedType = (function_type ?? '').trim() + + if (normalizedType === 'cloudflare_pp' && cfPpUrl) + return `${cfPpUrl}/triggers/${function_name}` + + if (normalizedType === 'cloudflare' && cfUrl) + return `${cfUrl}/triggers/${function_name}` + + if (normalizedType === '' && cfUrl) + return `${cfUrl}/triggers/${function_name}` + + return `${getEnv(c, 'SUPABASE_URL')}/functions/v1/triggers/${function_name}` +} + +function queueFailureResponse(errorCode: string, message: string, moreInfo: Record = {}, status = 599): Response { + return new Response(JSON.stringify({ + error: errorCode, + message, + moreInfo, + }), { + headers: { + 'content-type': 'application/json', + }, + status, + statusText: status === 599 ? 'Queue Transport Error' : undefined, + }) +} + +function httpExceptionToQueueResponse(error: unknown): Response | null { + if (!error || typeof error !== 'object') + return null + + const maybeException = error as { + cause?: unknown + message?: unknown + status?: unknown + } + if (typeof maybeException.status !== 'number') + return null + + const cause = maybeException.cause + if (cause && typeof cause === 'object' && 'error' in cause) { + const causeData = cause as { + error?: unknown + message?: unknown + moreInfo?: unknown + } + const resolvedMessage = typeof causeData.message === 'string' + ? causeData.message + : typeof maybeException.message === 'string' + ? maybeException.message + : 'Queue handler failed' + + return queueFailureResponse( + typeof causeData.error === 'string' ? causeData.error : 'http_exception', + resolvedMessage, + causeData.moreInfo && typeof causeData.moreInfo === 'object' + ? causeData.moreInfo as Record + : {}, + maybeException.status, + ) + } + + return queueFailureResponse( + 'http_exception', + typeof maybeException.message === 'string' ? maybeException.message : 'Queue handler failed', + {}, + maybeException.status, + ) +} + +function getManifestRecordFromQueueBody(body: Record): Database['public']['Tables']['manifest']['Row'] { + const record = body.record + if (body.type !== 'INSERT' || body.table !== 'manifest' || !record || typeof record !== 'object') { + throw simpleError('invalid_manifest_queue_payload', 'Invalid manifest queue payload', { body }) + } + + return record as Database['public']['Tables']['manifest']['Row'] +} + +async function dispatchQueueMessage( + c: Context, + function_name: string, + function_type: string | null | undefined, + body: Record, + cfId: string, + metadata: QueueMessageMetadata, + targetUrl: string, +): Promise<{ response: Response, targetUrl: string }> { + if (function_name === 'on_manifest_create') { + const record = getManifestRecordFromQueueBody(body) + const response = await updateManifestSize(c, record, { + cfId, + queueMsgId: String(metadata.msgId), + queueName: metadata.queueName, + queueReadCount: String(metadata.readCount), + }) + return { response, targetUrl } + } + + const response = await http_post_helper(c, function_name, function_type, body, cfId, metadata, targetUrl) + return { response, targetUrl } +} + +async function processQueueMessage(c: Context, queueName: string, message: Message): Promise { + const function_name = message.message?.function_name ?? 'unknown' + const function_type = message.message?.function_type ?? 'supabase' + const body = extractMessageBody(message) + if (message.message?.payload === undefined && Object.keys(body).length > 0) { + cloudlog({ + requestId: c.get('requestId'), + message: `[${queueName}] Using legacy queue message body shape for ${function_name}.`, + msgId: message.msg_id, + }) + } + + const cfId = generateUUID() + const payloadSize = JSON.stringify(body).length + const start = Date.now() + const targetUrl = function_name === 'on_manifest_create' ? 'direct:on_manifest_create' : resolveFunctionUrl(c, function_name, function_type) + + try { + const result = await dispatchQueueMessage(c, function_name, function_type, body, cfId, { + msgId: message.msg_id, + queueName, + readCount: message.read_ct, + }, targetUrl) + const errorDetails = await extractErrorDetails(result.response) + + return { + httpResponse: result.response, + errorDetails, + cfId, + payloadSize, + durationMs: Date.now() - start, + targetUrl, + ...message, + } + } + catch (error) { + const serializedError = serializeError(error) + const durationMs = Date.now() - start + const httpResponse = httpExceptionToQueueResponse(error) ?? queueFailureResponse('queue_message_failed', serializedError.message ?? 'Queue message failed before receiving a response', { + cfId, + durationMs, + error: serializedError, + function_name, + function_type, + msgId: message.msg_id, + queueName, + readCount: message.read_ct, + targetUrl, + }) + cloudlogErr({ + requestId: c.get('requestId'), + message: `[${queueName}] Queue message failed during processing.`, + cfId, + durationMs, + error: serializedError, + responseStatus: httpResponse.status, + function_name, + function_type, + msgId: message.msg_id, + readCount: message.read_ct, + targetUrl, + }) + const errorDetails = await extractErrorDetails(httpResponse) + + return { + httpResponse, + errorDetails, + cfId, + payloadSize, + durationMs, + targetUrl, + ...message, + } + } +} + function getQueueBatchSize(queueName: string, requestedBatchSize: number): number { if (queueName === 'on_manifest_create') return Math.min(requestedBatchSize, MANIFEST_QUEUE_BATCH_SIZE) @@ -241,33 +438,7 @@ async function processQueue(c: Context, db: ReturnType, queu } // Process messages that are still within the retry budget. - const results = await mapWithConcurrency(messagesToProcess, processConcurrency, async (message) => { - const function_name = message.message?.function_name ?? 'unknown' - const function_type = message.message?.function_type ?? 'supabase' - const body = extractMessageBody(message) - if (message.message?.payload === undefined && Object.keys(body).length > 0) { - cloudlog({ - requestId: c.get('requestId'), - message: `[${queueName}] Using legacy queue message body shape for ${function_name}.`, - msgId: message.msg_id, - }) - } - const cfId = generateUUID() - const httpResponse = await http_post_helper(c, function_name, function_type, body, cfId, { - msgId: message.msg_id, - queueName, - readCount: message.read_ct, - }) - const errorDetails = await extractErrorDetails(httpResponse) - - return { - httpResponse, - errorDetails, - cfId, - payloadSize: JSON.stringify(body).length, - ...message, - } - }) + const results = await mapWithConcurrency(messagesToProcess, processConcurrency, async message => processQueueMessage(c, queueName, message)) // Update all messages with their CF IDs const cfIdUpdates = results.map(result => ({ @@ -278,7 +449,18 @@ async function processQueue(c: Context, db: ReturnType, queu if (cfIdUpdates.length > 0) { cloudlog({ requestId: c.get('requestId'), message: `[${queueName}] Updating ${cfIdUpdates.length} messages with CF IDs.` }) - await mass_edit_queue_messages_cf_ids(c, db, cfIdUpdates) + try { + await mass_edit_queue_messages_cf_ids(c, db, cfIdUpdates) + } + catch (error) { + cloudlogErr({ + requestId: c.get('requestId'), + message: `[${queueName}] Failed to persist queue CF IDs. Continuing queue cleanup.`, + error: serializeError(error), + queueName, + updateCount: cfIdUpdates.length, + }) + } } // Batch remove all messages that have succeeded @@ -307,6 +489,8 @@ async function processQueue(c: Context, db: ReturnType, queu response_body: msg.errorDetails.bodyPreview ?? undefined, payload_size: msg.payloadSize, cf_id: msg.cfId, + duration_ms: msg.durationMs, + target_url: msg.targetUrl ?? undefined, })) const actionableFailures = getActionableQueueFailures(failureDetails) @@ -343,7 +527,9 @@ async function processQueue(c: Context, db: ReturnType, queu const cfLogUrl = `https://dash.cloudflare.com/${getEnv(c, 'CF_ACCOUNT_ANALYTICS_ID')}/workers/services/view/capgo_api-prod/production/observability/logs?workers-observability-view=%22invocations%22&filters=%5B%7B%22key%22%3A%22%24workers.event.request.headers.x-capgo-cf-id%22%2C%22type%22%3A%22string%22%2C%22value%22%3A%22${detail.cf_id}%22%2C%22operation%22%3A%22eq%22%7D%5D` const errorInfo = detail.error_code ? ` | Error: ${detail.error_code}` : '' const messageInfo = detail.error_message ? ` | ${truncateDiscordField(detail.error_message.replace(/\s+/g, ' ').trim(), 180)}` : '' - return `**${detail.function_name}** | Status: ${detail.status} | Read: ${detail.read_count}/${MAX_QUEUE_READS}${errorInfo}${messageInfo} | [CF Logs](${cfLogUrl})` + const durationInfo = typeof detail.duration_ms === 'number' ? ` | ${detail.duration_ms}ms` : '' + const targetInfo = detail.target_url ? ` | Target: ${truncateDiscordField(detail.target_url, 120)}` : '' + return `**${detail.function_name}** | Status: ${detail.status} | Read: ${detail.read_count}/${MAX_QUEUE_READS}${durationInfo}${errorInfo}${messageInfo}${targetInfo} | [CF Logs](${cfLogUrl})` }).join('\n')), inline: false, }, @@ -509,6 +695,7 @@ export async function http_post_helper( body: any, cfId: string, metadata?: QueueMessageMetadata, + targetUrl = resolveFunctionUrl(c, function_name, function_type), ): Promise { const headers: Record = { 'Content-Type': 'application/json', @@ -521,43 +708,30 @@ export async function http_post_helper( headers['x-capgo-queue-read-count'] = String(metadata.readCount) } - let url: string - const cfPpUrl = getEnv(c, 'CLOUDFLARE_PP_FUNCTION_URL') - const cfUrl = getEnv(c, 'CLOUDFLARE_FUNCTION_URL') - const normalizedType = (function_type ?? '').trim() - - if (normalizedType === 'cloudflare_pp' && cfPpUrl) { - url = `${cfPpUrl}/triggers/${function_name}` - } - else if (normalizedType === 'cloudflare' && cfUrl) { - url = `${cfUrl}/triggers/${function_name}` - } - else if (normalizedType === '' && cfUrl) { - // Backward compatibility: older queue messages may not have function_type set. - // If a Cloudflare URL is configured, prefer it. - url = `${cfUrl}/triggers/${function_name}` - } - else { - url = `${getEnv(c, 'SUPABASE_URL')}/functions/v1/triggers/${function_name}` - } - - // Create an AbortController for timeout const controller = new AbortController() - const timeoutId = setTimeout(() => controller.abort(), 15000) - // 15 second timeout, as the queue consumer is running every 10 seconds and the visibility timeout is 60 seconds + const timeoutId = setTimeout(() => controller.abort(), QUEUE_HTTP_TIMEOUT_MS) try { - cloudlog({ requestId: c.get('requestId'), message: `[${function_name}] Making HTTP POST request to "${url}" with body:`, body }) - const response = await fetch(url, { + cloudlog({ requestId: c.get('requestId'), message: `[${function_name}] Making HTTP POST request to "${targetUrl}" with body:`, body }) + const response = await fetch(targetUrl, { method: 'POST', headers, body: JSON.stringify(body), - // signal: controller.signal, + signal: controller.signal, }) return response } catch (error) { - throw simpleError('request_timeout', 'Request Timeout (Internal QUEUE handling error)', { function_name }, error) + const serializedError = serializeError(error) + throw simpleError('queue_http_post_failed', 'Queue HTTP POST failed', { + cfId, + error: serializedError, + function_name, + function_type: function_type ?? null, + metadata: metadata ?? null, + targetUrl, + timeoutMs: QUEUE_HTTP_TIMEOUT_MS, + }, error) } finally { clearTimeout(timeoutId) @@ -705,5 +879,8 @@ export const __queueConsumerTestUtils__ = { getActionableQueueFailures, getQueueBatchSize, getQueueHttpConcurrency, + httpExceptionToQueueResponse, + queueFailureResponse, + resolveFunctionUrl, sanitizeDiscordResponseBody, } diff --git a/tests/app-error-cases.test.ts b/tests/app-error-cases.test.ts index 3f4aa686e8..6d5b190ed4 100644 --- a/tests/app-error-cases.test.ts +++ b/tests/app-error-cases.test.ts @@ -1,6 +1,6 @@ import { randomUUID } from 'node:crypto' import { afterAll, beforeAll, describe, expect, it } from 'vitest' -import { BASE_URL, getAuthHeaders, getSupabaseClient, NON_ACCESS_APP_NAME, resetAndSeedAppData, resetAppData, USER_ID } from './test-utils.ts' +import { BASE_URL, fetchWithRetry, getAuthHeaders, getSupabaseClient, NON_ACCESS_APP_NAME, resetAndSeedAppData, resetAppData, USER_ID } from './test-utils.ts' const id = randomUUID().replace(/-/g, '').slice(0, 12) const APPNAME = `com.app.error.${id}` @@ -22,7 +22,7 @@ beforeAll(async () => { stripeCustomerId: testStripeCustomerId, }) - const createResponse = await fetch(`${BASE_URL}/apikey`, { + const createResponse = await fetchWithRetry(`${BASE_URL}/apikey`, { method: 'POST', headers: authHeaders, body: JSON.stringify({ @@ -50,7 +50,7 @@ afterAll(async () => { await getSupabaseClient().from('apps').delete().eq('app_id', PUT_APP_ID) await getSupabaseClient().from('apps').delete().eq('app_id', NOT_FOUND_APP_ID) if (testApiKeyId !== null) { - await fetch(`${BASE_URL}/apikey/${testApiKeyId}`, { + await fetchWithRetry(`${BASE_URL}/apikey/${testApiKeyId}`, { method: 'DELETE', headers: authHeaders, }) @@ -62,7 +62,7 @@ afterAll(async () => { describe('[POST] /app - Error Cases', () => { it('should return 400 when name is missing', async () => { - const response = await fetch(`${BASE_URL}/app`, { + const response = await fetchWithRetry(`${BASE_URL}/app`, { method: 'POST', headers: testHeaders, body: JSON.stringify({ @@ -78,7 +78,7 @@ describe('[POST] /app - Error Cases', () => { it('should return 403 when organization access is denied', async () => { const nonExistentOrgId = randomUUID() - const response = await fetch(`${BASE_URL}/app`, { + const response = await fetchWithRetry(`${BASE_URL}/app`, { method: 'POST', headers: testHeaders, body: JSON.stringify({ @@ -94,7 +94,7 @@ describe('[POST] /app - Error Cases', () => { it('should return 409 when app creation fails due to duplicate app id', async () => { // Try to create another app with the same app_id - const response2 = await fetch(`${BASE_URL}/app`, { + const response2 = await fetchWithRetry(`${BASE_URL}/app`, { method: 'POST', headers: testHeaders, body: JSON.stringify({ @@ -109,7 +109,7 @@ describe('[POST] /app - Error Cases', () => { }) it('should return 400 with invalid JSON body', async () => { - const response = await fetch(`${BASE_URL}/app`, { + const response = await fetchWithRetry(`${BASE_URL}/app`, { method: 'POST', headers: testHeaders, body: 'invalid json', @@ -120,7 +120,7 @@ describe('[POST] /app - Error Cases', () => { describe('[GET] /app - Error Cases', () => { it('should return 400 when user cannot access the app', async () => { - const response = await fetch(`${BASE_URL}/app/nonexistent.app`, { + const response = await fetchWithRetry(`${BASE_URL}/app/nonexistent.app`, { method: 'GET', headers: testHeaders, }) @@ -141,7 +141,7 @@ describe('[GET] /app - Error Cases', () => { await getSupabaseClient().from('apps').delete().eq('app_id', NOT_FOUND_APP_ID) // Try to get the deleted app - const response = await fetch(`${BASE_URL}/app/${NOT_FOUND_APP_ID}`, { + const response = await fetchWithRetry(`${BASE_URL}/app/${NOT_FOUND_APP_ID}`, { method: 'GET', headers: testHeaders, }) @@ -153,7 +153,7 @@ describe('[GET] /app - Error Cases', () => { it('should return 403 when user does not have access to organization', async () => { // This test would need a more complex setup with different users // For now, we test with a response structure check - const response = await fetch(`${BASE_URL}/app/${NON_ACCESS_APP_NAME}`, { + const response = await fetchWithRetry(`${BASE_URL}/app/${NON_ACCESS_APP_NAME}`, { method: 'GET', headers: testHeaders, }) @@ -174,7 +174,7 @@ describe('[PUT] /app - Error Cases', () => { }) it('should return 400 when user cannot access the app', async () => { - const response = await fetch(`${BASE_URL}/app/nonexistent.app`, { + const response = await fetchWithRetry(`${BASE_URL}/app/nonexistent.app`, { method: 'PUT', headers: testHeaders, body: JSON.stringify({ @@ -188,7 +188,7 @@ describe('[PUT] /app - Error Cases', () => { it('should return 400 when update fails', async () => { // Try to update with invalid data that would cause a database error - const response = await fetch(`${BASE_URL}/app/${PUT_APP_ID}`, { + const response = await fetchWithRetry(`${BASE_URL}/app/${PUT_APP_ID}`, { method: 'PUT', headers: testHeaders, body: JSON.stringify({ @@ -202,7 +202,7 @@ describe('[PUT] /app - Error Cases', () => { }) it('should handle invalid JSON body', async () => { - const response = await fetch(`${BASE_URL}/app/${PUT_APP_ID}`, { + const response = await fetchWithRetry(`${BASE_URL}/app/${PUT_APP_ID}`, { method: 'PUT', headers: testHeaders, body: 'invalid json', @@ -213,7 +213,7 @@ describe('[PUT] /app - Error Cases', () => { describe('[DELETE] /app - Error Cases', () => { it('should return 400 when user cannot access the app', async () => { - const response = await fetch(`${BASE_URL}/app/nonexistent.app`, { + const response = await fetchWithRetry(`${BASE_URL}/app/nonexistent.app`, { method: 'DELETE', headers: testHeaders, }) @@ -231,7 +231,7 @@ describe('[DELETE] /app - Error Cases', () => { }) // Try to delete the app (this should work) - const response = await fetch(`${BASE_URL}/app/${DELETE_APP_ID}`, { + const response = await fetchWithRetry(`${BASE_URL}/app/${DELETE_APP_ID}`, { method: 'DELETE', headers: testHeaders, }) @@ -240,7 +240,7 @@ describe('[DELETE] /app - Error Cases', () => { expect(response.status).toBe(200) // Try to delete the same app again (should fail) - const response2 = await fetch(`${BASE_URL}/app/${DELETE_APP_ID}`, { + const response2 = await fetchWithRetry(`${BASE_URL}/app/${DELETE_APP_ID}`, { method: 'DELETE', headers: testHeaders, }) diff --git a/tests/queue-consumer-message-shape.unit.test.ts b/tests/queue-consumer-message-shape.unit.test.ts index 3a191c0e12..79cdc13ff2 100644 --- a/tests/queue-consumer-message-shape.unit.test.ts +++ b/tests/queue-consumer-message-shape.unit.test.ts @@ -1,3 +1,4 @@ +import { HTTPException } from 'hono/http-exception' import { describe, expect, it } from 'vitest' import { __queueConsumerTestUtils__, MAX_QUEUE_READS, messagesArraySchema } from '../supabase/functions/_backend/triggers/queue_consumer.ts' import { parseSchema } from '../supabase/functions/_backend/utils/ark_validation.ts' @@ -162,4 +163,44 @@ describe('queue_consumer legacy message compatibility', () => { errorMessage: 'builder unavailable', }) }) + + it.concurrent('turns queue transport failures into retryable per-message responses', async () => { + const response = __queueConsumerTestUtils__.queueFailureResponse('queue_message_failed', 'fetch failed', { + cfId: 'cf-transport', + msgId: 12, + queueName: 'on_manifest_create', + targetUrl: 'direct:on_manifest_create', + }) + + const details = await __queueConsumerTestUtils__.extractErrorDetails(response) + + expect(response.status).toBe(599) + expect(details.errorCode).toBe('queue_message_failed') + expect(details.errorMessage).toBe('fetch failed') + expect(details.bodyPreview).toContain('"queueName":"on_manifest_create"') + expect(details.bodyPreview).toContain('"targetUrl":"direct:on_manifest_create"') + }) + + it.concurrent('preserves direct handler HTTP error details for queue retries', async () => { + const response = __queueConsumerTestUtils__.httpExceptionToQueueResponse(new HTTPException(503, { + message: 'Manifest file size metadata was not found', + cause: { + error: 'manifest_size_not_found', + message: 'Manifest file size metadata was not found', + moreInfo: { + id: 123, + queue: { queueName: 'on_manifest_create' }, + }, + }, + })) + + expect(response).not.toBeNull() + const details = await __queueConsumerTestUtils__.extractErrorDetails(response!) + + expect(response!.status).toBe(503) + expect(details.errorCode).toBe('manifest_size_not_found') + expect(details.errorMessage).toBe('Manifest file size metadata was not found') + expect(details.bodyPreview).toContain('"id":123') + expect(details.bodyPreview).toContain('"queueName":"on_manifest_create"') + }) })