diff --git a/scripts/backfill_manifest_file_sizes.mjs b/scripts/backfill_manifest_file_sizes.mjs index aff85362e7..f5e7606f4e 100644 --- a/scripts/backfill_manifest_file_sizes.mjs +++ b/scripts/backfill_manifest_file_sizes.mjs @@ -8,10 +8,10 @@ * * Apply updates: * bun scripts/backfill_manifest_file_sizes.mjs --app-version-id=180988804 --apply - * bun scripts/backfill_manifest_file_sizes.mjs --all --apply --workers=8 --concurrency=160 + * bun scripts/backfill_manifest_file_sizes.mjs --all --apply --workers=16 --concurrency=256 */ -import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs' +import { appendFileSync, existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs' import { dirname, resolve } from 'node:path' import process from 'node:process' import { fileURLToPath } from 'node:url' @@ -94,6 +94,22 @@ const DB_URL_ENV_KEYS = [ 'SUPABASE_DB_DIRECT_URL', 'DIRECT_URL', ] +const MAX_CANDIDATE_BATCH_SIZE = 1000 +const FAILED_CSV_HEADERS = [ + 'id', + 'app_id', + 'app_version_id', + 'version_name', + 'file_name', + 's3_path', + 'status', + 'method', + 'reason', + 'attempts', + 'error_name', + 'error_status', + 'error_message', +] function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)) @@ -119,6 +135,18 @@ function getOptionalNumberArg(name) { return parsed } +function getFixedBatchSizeArg() { + const value = getArgValue('--batch-size') + if (value === undefined) + return MAX_CANDIDATE_BATCH_SIZE + const parsed = Number.parseInt(value, 10) + if (!Number.isFinite(parsed) || parsed <= 0) + throw new Error('--batch-size must be a positive integer') + if (parsed !== MAX_CANDIDATE_BATCH_SIZE) + console.warn(`--batch-size=${parsed} is ignored; candidate reads are fixed to ${MAX_CANDIDATE_BATCH_SIZE}`) + return MAX_CANDIDATE_BATCH_SIZE +} + function getDatabaseUrl(databaseEnv) { for (const key of DB_URL_ENV_KEYS) { const value = databaseEnv[key] @@ -359,38 +387,6 @@ function appendCommonFilters(where, params, { appId, appVersionId, endId, includ } } -function buildBoundsQuery({ appId, appVersionId, endId, includeDeleted, startId }) { - const params = [] - const where = [] - appendCommonFilters(where, params, { appId, appVersionId, endId, includeDeleted, startId }) - - return { - params, - // ORDER BY id ASC/DESC LIMIT 1 keeps this on the manifest primary-key index. - sql: ` - WITH first_row AS ( - SELECT m.id - FROM public.manifest m - INNER JOIN public.app_versions av ON av.id = m.app_version_id - WHERE ${where.join(' AND ')} - ORDER BY m.id ASC - LIMIT 1 - ), - last_row AS ( - SELECT m.id - FROM public.manifest m - INNER JOIN public.app_versions av ON av.id = m.app_version_id - WHERE ${where.join(' AND ')} - ORDER BY m.id DESC - LIMIT 1 - ) - SELECT - (SELECT id FROM first_row) AS min_id, - (SELECT id FROM last_row) AS max_id - `, - } -} - function buildCandidateQuery({ afterId, appId, appVersionId, endId, includeDeleted, limit }) { const params = [afterId] const where = [ @@ -443,28 +439,18 @@ function buildBulkUpdateQuery(rows) { } } -function createIdRanges(minId, maxId, workers) { - if (!Number.isFinite(minId) || !Number.isFinite(maxId) || minId <= 0 || maxId <= 0 || minId > maxId) - return [] - - const rangeSize = Math.ceil((maxId - minId + 1) / workers) - return Array.from({ length: workers }, (_, index) => { - const start = minId + (index * rangeSize) - const end = Math.min(maxId, start + rangeSize - 1) - return start <= end ? { end, index, start } : null - }).filter(Boolean) -} - -function createWorkerReport(range) { +function createWorkerReport() { return { + batches: 0, checked: 0, + currentBatch: null, + currentFirstId: null, + currentLastId: null, done: false, - endId: range.end, fixed: 0, - lastId: range.start - 1, + lastId: null, missingSize: 0, pages: 0, - startId: range.start, unchanged: 0, } } @@ -473,6 +459,41 @@ function writeReport(outputPath, report) { writeFileSync(outputPath, `${JSON.stringify(report, null, 2)}\n`) } +function csvValue(value) { + if (value === undefined || value === null) + return '' + const text = typeof value === 'object' ? JSON.stringify(value) : String(value) + return /[",\n\r]/.test(text) ? `"${text.replaceAll('"', '""')}"` : text +} + +function failedCsvRow(error) { + return FAILED_CSV_HEADERS.map((header) => { + if (header === 'error_message') + return csvValue(error.error?.message) + if (header === 'error_name') + return csvValue(error.error?.name) + if (header === 'error_status') + return csvValue(error.error?.status) + return csvValue(error[header]) + }).join(',') +} + +function createFailedCsv(outputPath) { + writeFileSync(outputPath, `${FAILED_CSV_HEADERS.join(',')}\n`) +} + +function appendFailedCsvRows(outputPath, errors) { + if (errors.length === 0) + return + appendFileSync(outputPath, `${errors.map(failedCsvRow).join('\n')}\n`) +} + +function defaultFailedCsvPath(outputPath) { + return outputPath.toLowerCase().endsWith('.json') + ? outputPath.replace(/\.json$/i, '-failed.csv') + : `${outputPath}-failed.csv` +} + function createProgressLogger(report, apply) { const startedAt = Date.now() let lastLogAt = 0 @@ -485,8 +506,7 @@ function createProgressLogger(report, apply) { const elapsedSeconds = Math.max(1, Math.round((now - startedAt) / 1000)) const rate = Math.round(report.checked / elapsedSeconds) const activeWorkers = Object.values(report.workers).filter(worker => !worker.done).length - const maxLastId = Math.max(0, ...Object.values(report.workers).map(worker => worker.lastId ?? 0)) - console.log(`Checked ${report.checked}, ${apply ? 'fixed' : 'fixable'} ${apply ? report.fixed : report.unchanged}, missing ${report.missingSize}, rate ${rate}/s, active workers ${activeWorkers}, last id ${maxLastId}`) + console.log(`Checked ${report.checked}, ${apply ? 'fixed' : 'fixable'} ${apply ? report.fixed : report.unchanged}, missing ${report.missingSize}, rate ${rate}/s, active workers ${activeWorkers}, batches ${report.claimedBatches}, cursor id ${report.lastClaimedId}`) } } @@ -538,12 +558,13 @@ async function processCandidates({ apply, dbAttempts, pool, s3, storageAttempts, } } -function mergePageReport(report, workerReport, pageReport) { +function mergePageReport(report, workerReport, pageReport, failedCsvPath) { report.checked += pageReport.checked report.fixed += pageReport.fixed report.missingSize += pageReport.missingSize report.unchanged += pageReport.unchanged report.errors.push(...pageReport.missingErrors) + appendFailedCsvRows(failedCsvPath, pageReport.missingErrors) workerReport.checked += pageReport.checked workerReport.fixed += pageReport.fixed @@ -552,29 +573,72 @@ function mergePageReport(report, workerReport, pageReport) { workerReport.pages += 1 } -async function runRangeWorker({ appId, appVersionId, batchSize, dbAttempts, includeDeleted, options, pool, range, report, writeProgress }) { - const workerReport = report.workers[range.index] - let afterId = range.start - 1 +function createBatchClaimer({ appId, appVersionId, batchSize, dbAttempts, endId, includeDeleted, limit, pool, report, startId, writeProgress }) { + let afterId = startId + let claimChain = Promise.resolve() + let done = false + let remaining = limit + + return async function claimBatch(workerIndex) { + const claim = claimChain.then(async () => { + if (done || remaining <= 0) + return null - try { - while (afterId < range.end) { const query = buildCandidateQuery({ afterId, appId, appVersionId, - endId: range.end, + endId, includeDeleted, - limit: batchSize, + limit: Math.min(batchSize, remaining), }) - const candidates = (await queryWithRetry(pool, query.sql, query.params, `worker ${range.index} candidate read`, dbAttempts)).rows - if (candidates.length === 0) + const candidates = (await queryWithRetry(pool, query.sql, query.params, `worker ${workerIndex} candidate read`, dbAttempts)).rows + if (candidates.length === 0) { + done = true + return null + } + + const firstId = candidates[0].id + const lastId = candidates[candidates.length - 1].id + const batch = { + candidates, + firstId, + index: report.claimedBatches, + lastId, + } + + afterId = lastId + remaining -= candidates.length + report.claimedBatches += 1 + report.lastClaimedId = lastId + writeProgress() + + return batch + }) + + claimChain = claim.catch(() => {}) + return claim + } +} + +async function runBatchWorker({ claimBatch, failedCsvPath, options, report, workerIndex, writeProgress }) { + const workerReport = report.workers[workerIndex] + + try { + while (true) { + const batch = await claimBatch(workerIndex) + if (!batch) break - afterId = candidates[candidates.length - 1].id - workerReport.lastId = afterId + workerReport.batches += 1 + workerReport.currentBatch = batch.index + workerReport.currentFirstId = batch.firstId + workerReport.currentLastId = batch.lastId + workerReport.lastId = batch.lastId + writeProgress() - const pageReport = await processCandidates(options, candidates) - mergePageReport(report, workerReport, pageReport) + const pageReport = await processCandidates(options, batch.candidates) + mergePageReport(report, workerReport, pageReport, failedCsvPath) writeProgress() } } @@ -600,15 +664,16 @@ Options: --all Scan all manifest rows with missing size. --app-version-id Restrict to one bundle id. --app-id Restrict to one app id. - --limit Max rows to scan without --all. Default: 500. - --batch-size DB page size per worker. Default: 1000 for --all, 500 otherwise. - --workers Parallel manifest.id range workers. Default: 8 for --all, 1 otherwise. + --limit Max rows to scan without --all. Default: 1000. + --batch-size Backward-compatible alias; candidate reads are always 1000. + --workers Parallel shared-cursor workers. Default: 8 for --all, 1 otherwise. --concurrency Total storage HEAD/RANGE concurrency. Default: 120 for --all, 20 otherwise. --storage-attempts Storage metadata attempts per file. Default: 3. --db-attempts DB read/update attempts. Default: 5. --start-id Exclusive lower manifest.id bound for resume. --end-id Inclusive upper manifest.id bound. --report Report JSON output path. + --failed-csv Failed metadata CSV output path. --include-deleted Include deleted bundles. --target prod|local Env target. Default: prod. --local Alias for --target=local. @@ -624,15 +689,16 @@ Options: const appVersionIdRaw = getArgValue('--app-version-id') const appVersionId = appVersionIdRaw ? Number.parseInt(appVersionIdRaw, 10) : null const appId = getArgValue('--app-id') ?? null - const limit = all ? Number.POSITIVE_INFINITY : getNumberArg('--limit', 500) + const limit = all ? Number.POSITIVE_INFINITY : getNumberArg('--limit', 1000) const workers = getNumberArg('--workers', all ? 8 : 1) - const batchSize = getNumberArg('--batch-size', all ? 1000 : 500) + const batchSize = getFixedBatchSizeArg() const concurrency = getNumberArg('--concurrency', all ? 120 : 20) const storageAttempts = getNumberArg('--storage-attempts', 3) const dbAttempts = getNumberArg('--db-attempts', 5) const startId = getOptionalNumberArg('--start-id') ?? 0 const endId = getOptionalNumberArg('--end-id') const reportPathArg = getArgValue('--report') + const failedCsvPathArg = getArgValue('--failed-csv') if (!all && !appVersionId && !appId) throw new Error('Pass --app-version-id, --app-id, or --all') @@ -650,7 +716,12 @@ Options: const outputPath = reportPathArg ? resolve(process.cwd(), reportPathArg) : resolve(outputDir, `manifest-file-size-backfill-${Date.now()}.json`) + const failedCsvPath = failedCsvPathArg + ? resolve(process.cwd(), failedCsvPathArg) + : defaultFailedCsvPath(outputPath) mkdirSync(dirname(outputPath), { recursive: true }) + mkdirSync(dirname(failedCsvPath), { recursive: true }) + createFailedCsv(failedCsvPath) const workerCount = all ? workers : 1 const storageConcurrencyPerWorker = Math.max(1, Math.floor(concurrency / workerCount)) @@ -677,13 +748,17 @@ Options: appVersionId, batchSize, checked: 0, + claimedBatches: 0, concurrency, dbAttempts, endedAt: null, endId, errors: [], + failedCsvPath, fixed: 0, includeDeleted, + lastClaimedId: startId, + maxBatchSize: MAX_CANDIDATE_BATCH_SIZE, missingSize: 0, poolMax, scannedAt: new Date().toISOString(), @@ -704,91 +779,48 @@ Options: } try { - if (all) { - const boundsQuery = buildBoundsQuery({ - appId, - appVersionId, - endId, - includeDeleted, - startId, - }) - const bounds = (await queryWithRetry(pool, boundsQuery.sql, boundsQuery.params, 'manifest id bounds', dbAttempts)).rows[0] - const minId = Number.parseInt(bounds?.min_id ?? '0', 10) - const maxId = Number.parseInt(bounds?.max_id ?? '0', 10) - const ranges = createIdRanges(minId, maxId, workerCount) - - console.log(`Scanning manifest.id ${minId}-${maxId} with ${ranges.length} workers, page size ${batchSize}, effective storage concurrency ${effectiveStorageConcurrency}, DB pool max ${poolMax}`) + console.log(`Scanning manifest.id > ${startId}${endId ? ` and <= ${endId}` : ''} with ${workerCount} workers, page size ${batchSize}, effective storage concurrency ${effectiveStorageConcurrency}, DB pool max ${poolMax}`) + + const claimBatch = createBatchClaimer({ + appId, + appVersionId, + batchSize, + dbAttempts, + endId, + includeDeleted, + limit, + pool, + report, + startId, + writeProgress, + }) - for (const range of ranges) { - report.workers[range.index] = createWorkerReport(range) - } - writeProgress(true) + for (let workerIndex = 0; workerIndex < workerCount; workerIndex++) { + report.workers[workerIndex] = createWorkerReport() + } + writeProgress(true) - const workerResults = await Promise.allSettled(ranges.map(range => runRangeWorker({ - appId, - appVersionId, - batchSize, + const workerResults = await Promise.allSettled(Array.from({ length: workerCount }, (_, workerIndex) => runBatchWorker({ + claimBatch, + failedCsvPath, + options: { + apply, dbAttempts, - includeDeleted, - options: { - apply, - dbAttempts, - pool, - s3, - storageAttempts, - storageConcurrency: storageConcurrencyPerWorker, - verbose, - }, pool, - range, - report, - writeProgress, - }))) - const failedWorkers = workerResults - .map((result, index) => ({ index, result })) - .filter(({ result }) => result.status === 'rejected') - if (failedWorkers.length > 0) { - throw new Error(`${failedWorkers.length} backfill workers failed: ${failedWorkers.map(({ index, result }) => `worker ${index}: ${result.reason?.message ?? result.reason}`).join('; ')}`) - } - } - else { - const range = { - end: endId ?? Number.MAX_SAFE_INTEGER, - index: 0, - start: startId + 1, - } - report.workers[0] = createWorkerReport(range) - let remaining = limit - while (remaining > 0) { - const pageLimit = Math.min(batchSize, remaining) - const query = buildCandidateQuery({ - afterId: report.workers[0].lastId, - appId, - appVersionId, - endId, - includeDeleted, - limit: pageLimit, - }) - const candidates = (await queryWithRetry(pool, query.sql, query.params, 'candidate read', dbAttempts)).rows - if (candidates.length === 0) - break - - report.workers[0].lastId = candidates[candidates.length - 1].id - remaining -= candidates.length - - const pageReport = await processCandidates({ - apply, - dbAttempts, - pool, - s3, - storageAttempts, - storageConcurrency: concurrency, - verbose, - }, candidates) - mergePageReport(report, report.workers[0], pageReport) - writeProgress() - } - report.workers[0].done = true + s3, + storageAttempts, + storageConcurrency: storageConcurrencyPerWorker, + verbose, + }, + report, + workerIndex, + writeProgress, + }))) + const failedWorkers = workerResults + .map((result, index) => ({ index, result })) + .filter(({ result }) => result.status === 'rejected') + if (failedWorkers.length > 0) { + throw new Error(`${failedWorkers.length} backfill workers failed: ${failedWorkers.map(({ index, result }) => `worker ${index}: ${result.reason?.message ?? result.reason}`).join('; ')}`) } } finally { @@ -803,6 +835,7 @@ Options: console.log(` ${apply ? 'Fixed' : 'Fixable'}: ${apply ? report.fixed : report.unchanged}`) console.log(` Missing size: ${report.missingSize}`) console.log(` Report: ${outputPath}`) + console.log(` Failed CSV: ${failedCsvPath}`) if (report.missingSize > 0) process.exitCode = 1