Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
"test:cloudflare:updates": "vitest run tests/updates* --config vitest.config.cloudflare.ts",
"bench": "vitest bench --config vitest.config.bench.ts --run",
"admin:backfill-paid-product-activity": "bun scripts/backfill_paid_product_activity.ts",
"admin:cleanup-stuck-manifest-backlog": "bun scripts/cleanup_stuck_manifest_backlog.ts",
"admin:backfill-plugin-version-ladder": "bun scripts/backfill_plugin_version_ladder.ts",
"admin:backfill-missing-app-icons": "bun scripts/backfill_missing_app_icons.ts",
"admin:backfill-missing-store-urls": "bun scripts/backfill_missing_store_urls.ts",
Expand Down
248 changes: 248 additions & 0 deletions scripts/cleanup_stuck_manifest_backlog.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Audit and clear old manifest rows stuck behind soft-deleted bundles.
*
* Dry run:
* bun run admin:cleanup-stuck-manifest-backlog
*
* Apply:
* bun run admin:cleanup-stuck-manifest-backlog --apply
*
* Optional:
* bun run admin:cleanup-stuck-manifest-backlog --apply --db-url="$DATABASE_URL"
* bun run admin:cleanup-stuck-manifest-backlog --apply --env-file=./internal/cloudflare/.env.prod
* bun run admin:cleanup-stuck-manifest-backlog --apply --max-batches=1000 --pause-ms=250
* bun run admin:cleanup-stuck-manifest-backlog --apply --skip-vacuum
*/
import process from 'node:process'
import { setTimeout as sleep } from 'node:timers/promises'
import { Client } from 'pg'
import { DEFAULT_ENV_FILE, getArgValue, loadEnv, parsePositiveInteger } from './admin_stripe_backfill_utils.ts'

interface TableSizeRow {
heap: string
indexes: string
total: string
}

interface VacuumStatsRow {
last_autoanalyze: string | null
last_autovacuum: string | null
n_dead_tup: string
n_live_tup: string
}

interface BucketRow {
bucket: string
manifest_rows: string
versions: string
}

interface EligibleVersionRow {
eligible_versions: string
}

const DEFAULT_MAX_BATCHES = 1000
const DEFAULT_PAUSE_MS = 250

function printHelp() {
console.log(`Audit and clear old manifest rows stuck behind soft-deleted bundles.

Usage:
bun run admin:cleanup-stuck-manifest-backlog [options]

Options:
--apply Delete old soft-deleted versions by calling public.delete_old_deleted_versions().
--db-url=URL Postgres connection string. Overrides env file values.
--env-file=PATH Env file to load. Default: ${DEFAULT_ENV_FILE}.
--max-batches=N Maximum cleanup batches to run. Default: ${DEFAULT_MAX_BATCHES}.
--pause-ms=N Delay between batches. Default: ${DEFAULT_PAUSE_MS}.
--skip-vacuum Do not run VACUUM (ANALYZE) public.manifest after apply.
--help Show this help.

Required env:
DATABASE_URL, SUPABASE_DB_URL, POSTGRES_URL, or PGDATABASE_URL
`)
}

function parseNonNegativeInteger(value: string | null, label: string, fallback: number) {
if (value === null)
return fallback

const parsed = Number.parseInt(value, 10)
if (!Number.isInteger(parsed) || parsed < 0)
throw new Error(`${label} must be a non-negative integer`)

return parsed
}

function getDatabaseUrl(env: Record<string, string | undefined>, args: string[]) {
return getArgValue(args, '--db-url')
?? env.DATABASE_URL?.trim()
?? env.SUPABASE_DB_URL?.trim()
?? env.POSTGRES_URL?.trim()
?? env.PGDATABASE_URL?.trim()
?? null
}

function shouldUseSsl(databaseUrl: string) {
const url = new URL(databaseUrl)
const sslMode = url.searchParams.get('sslmode')
if (sslMode === 'disable')
return false
if (url.hostname === 'localhost' || url.hostname === '127.0.0.1')
return false
return true
}

async function getTableSize(client: Client) {
const result = await client.query<TableSizeRow>(`
SELECT
pg_size_pretty(pg_relation_size('public.manifest')) AS heap,
pg_size_pretty(pg_indexes_size('public.manifest')) AS indexes,
pg_size_pretty(pg_total_relation_size('public.manifest')) AS total
`)
return result.rows[0]
}

async function getVacuumStats(client: Client) {
const result = await client.query<VacuumStatsRow>(`
SELECT n_live_tup, n_dead_tup, last_autovacuum, last_autoanalyze
FROM pg_stat_user_tables
WHERE schemaname = 'public' AND relname = 'manifest'
`)
return result.rows[0]
}

async function getManifestBuckets(client: Client) {
const result = await client.query<BucketRow>(`
SELECT
CASE
WHEN av.deleted = false THEN 'active'
WHEN av.deleted_at < now() - interval '3 months' THEN 'past_hard_delete'
ELSE 'soft_deleted_waiting'
END AS bucket,
count(*)::text AS manifest_rows,
count(DISTINCT av.id)::text AS versions
FROM public.manifest m
JOIN public.app_versions av ON av.id = m.app_version_id
GROUP BY 1
ORDER BY count(*) DESC
`)
return result.rows
}

async function getEligibleVersionCount(client: Client) {
const result = await client.query<EligibleVersionRow>(`
SELECT count(*)::text AS eligible_versions
FROM public.app_versions av
WHERE av.deleted_at IS NOT NULL
AND av.deleted_at < now() - interval '3 months'
AND av.name NOT IN ('builtin', 'unknown')
AND NOT EXISTS (
SELECT 1
FROM public.channels
WHERE channels.version = av.id
)
`)
return Number.parseInt(result.rows[0]?.eligible_versions ?? '0', 10)
}

function printAudit(title: string, size: TableSizeRow, stats: VacuumStatsRow | undefined, buckets: BucketRow[]) {
console.log(`\n${title}`)
console.table([size])
if (stats)
console.table([stats])
console.table(buckets)
}

async function runCleanupLoop(client: Client, maxBatches: number, pauseMs: number) {
let batches = 0
let previousRemaining = await getEligibleVersionCount(client)
console.log(`Eligible old deleted versions before cleanup: ${previousRemaining}`)

while (previousRemaining > 0 && batches < maxBatches) {
batches += 1
await client.query('SELECT public.delete_old_deleted_versions()')

const remaining = await getEligibleVersionCount(client)
const deleted = Math.max(previousRemaining - remaining, 0)
console.log(`Batch ${batches}: deleted about ${deleted} versions, ${remaining} eligible remain`)

if (remaining >= previousRemaining) {
console.log('No progress detected; stopping to avoid looping on a blocked cleanup.')
break
}

previousRemaining = remaining
if (pauseMs > 0)
await sleep(pauseMs)
}

return {
batches,
remaining: previousRemaining,
}
}

async function main() {
const args = Bun.argv.slice(2)
if (args.includes('--help')) {
printHelp()
return
}

const apply = args.includes('--apply')
const skipVacuum = args.includes('--skip-vacuum')
const envFile = getArgValue(args, '--env-file') ?? DEFAULT_ENV_FILE
const env = { ...process.env, ...await loadEnv(envFile) }
const databaseUrl = getDatabaseUrl(env, args)
if (!databaseUrl)
throw new Error('Missing database URL. Set DATABASE_URL, SUPABASE_DB_URL, POSTGRES_URL, PGDATABASE_URL, or pass --db-url.')

const maxBatches = parsePositiveInteger(getArgValue(args, '--max-batches'), '--max-batches', DEFAULT_MAX_BATCHES)
const pauseMs = parseNonNegativeInteger(getArgValue(args, '--pause-ms'), '--pause-ms', DEFAULT_PAUSE_MS)
const client = new Client({
application_name: 'capgo_cleanup_stuck_manifest_backlog',
connectionString: databaseUrl,
ssl: shouldUseSsl(databaseUrl) ? { rejectUnauthorized: true } : undefined,
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.

await client.connect()
try {
await client.query('SELECT set_config($1, $2, false)', ['statement_timeout', '15min'])
await client.query('SELECT set_config($1, $2, false)', ['lock_timeout', '10s'])

printAudit(
'Before cleanup',
await getTableSize(client),
await getVacuumStats(client),
await getManifestBuckets(client),
)

if (!apply) {
console.log('\nDry run only. Re-run with --apply to delete old soft-deleted versions and cascade stuck manifest rows.')
return
}

const cleanup = await runCleanupLoop(client, maxBatches, pauseMs)
if (cleanup.remaining > 0)
console.log(`Stopped with ${cleanup.remaining} eligible versions still remaining. Increase --max-batches after checking database load.`)

if (!skipVacuum) {
console.log('\nRunning VACUUM (ANALYZE) public.manifest...')
await client.query('VACUUM (ANALYZE) public.manifest')
}

printAudit(
'After cleanup',
await getTableSize(client),
await getVacuumStats(client),
await getManifestBuckets(client),
)
}
finally {
await client.end()
}
}

await main()
6 changes: 3 additions & 3 deletions supabase/functions/_backend/files/files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,9 @@ async function getSupabaseStorageResponse(c: Context, fileId: string): Promise<R

async function getHandler(c: Context): Promise<Response> {
const fileId = c.get('fileId')
// It is imperative and inalthat we read file without any Database READ to avoid any potential bottlenecks and ensure high performance and availability of file downloads, especially under heavy load.
// This had beed designed that way and access to file going to be delete is non imporant compared to availability of file download, so we are not doing any check in DB or R2 before serving the file, if file is missing in R2 it will be 404 and that is expected and we want to avoid any potential bottlenecks.
// File access security is not a matter here and will NERVER BE.
// It is imperative that we read files without any database read to avoid bottlenecks and keep file downloads available under heavy load.
// This was designed that way: access to a file being deleted is less important than download availability, so we are not doing any check in DB or R2 before serving the file. If the file is missing in R2 it will be 404, and that is expected.
// File access security is not a matter here and will never be.

cloudlog({ requestId: c.get('requestId'), message: 'getHandler files', fileId })
if (getRuntimeKey() !== 'workerd') {
Expand Down
24 changes: 12 additions & 12 deletions supabase/functions/_backend/triggers/on_version_update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ async function deleteManifest(c: Context, record: Database['public']['Tables']['
.select('*', { count: 'exact', head: true })
.eq('file_name', entry.file_name)
.eq('file_hash', entry.file_hash)
.eq('s3_path', entry.s3_path)
})
.then((v) => {
if (!v)
Expand Down Expand Up @@ -317,19 +318,18 @@ export async function deleteIt(c: Context, record: Database['public']['Tables'][
.single()
if (dbError || !data) {
cloudlog({ requestId: c.get('requestId'), message: 'Cannot find version meta', id: record.id })
return c.json(BRES)
}
const { error: errorCreateStatsMeta } = await createStatsMeta(c, record.app_id, record.id, -data.size)
if (errorCreateStatsMeta)
cloudlog({ requestId: c.get('requestId'), message: 'error createStatsMeta', error: errorCreateStatsMeta })
// set app_versions_meta versionSize = 0
const { error: errorUpdate } = await supabaseAdmin(c)
.from('app_versions_meta')
.update({ size: 0 })
.eq('id', record.id)
if (errorUpdate) {
cloudlog({ requestId: c.get('requestId'), message: 'error', error: errorUpdate })
throw simpleError('cannot_update_version_meta', 'Cannot update version metadata for deleted version', { id: record.id }, errorUpdate)
else {
const { error: errorCreateStatsMeta } = await createStatsMeta(c, record.app_id, record.id, -data.size)
if (errorCreateStatsMeta)
cloudlog({ requestId: c.get('requestId'), message: 'error createStatsMeta', error: errorCreateStatsMeta })
// set app_versions_meta versionSize = 0
const { error: errorUpdate } = await supabaseAdmin(c)
.from('app_versions_meta')
.update({ size: 0 })
.eq('id', record.id)
if (errorUpdate)
cloudlog({ requestId: c.get('requestId'), message: 'error update version meta size during delete', id: record.id, error: errorUpdate })
}

await deleteManifest(c, record)
Expand Down
44 changes: 44 additions & 0 deletions supabase/migrations/20260513003104_manifest_cleanup_health.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-- Keep hard-deleted bundle cleanup bounded so manifest cascades do not create
-- one very large delete transaction when retention has a backlog.
CREATE OR REPLACE FUNCTION "public"."delete_old_deleted_versions"() RETURNS "void"
LANGUAGE "plpgsql"
SECURITY DEFINER
SET search_path = ''
AS $$
DECLARE
deleted_count bigint;
BEGIN
WITH deleted_versions AS (
SELECT "app_versions"."id"
FROM "public"."app_versions"
WHERE "app_versions"."deleted_at" IS NOT NULL
AND "app_versions"."deleted_at" < now() - INTERVAL '3 months'
AND "app_versions"."name" NOT IN ('builtin', 'unknown')
AND NOT EXISTS (
SELECT 1
FROM "public"."channels"
WHERE "channels"."version" = "app_versions"."id"
)
ORDER BY "app_versions"."deleted_at", "app_versions"."id"
LIMIT 500
FOR UPDATE SKIP LOCKED
)
DELETE FROM "public"."app_versions"
USING deleted_versions
WHERE "app_versions"."id" = deleted_versions."id";

GET DIAGNOSTICS deleted_count = ROW_COUNT;

IF deleted_count > 0 THEN
RAISE NOTICE 'delete_old_deleted_versions: permanently deleted % app versions', deleted_count;
END IF;
END;
$$;

ALTER FUNCTION "public"."delete_old_deleted_versions"() OWNER TO "postgres";
COMMENT ON FUNCTION "public"."delete_old_deleted_versions"() IS 'Permanently deletes up to 500 soft-deleted app versions older than 3 months per run; related manifest rows cascade through foreign keys.';

REVOKE ALL ON FUNCTION "public"."delete_old_deleted_versions"() FROM PUBLIC;
REVOKE ALL ON FUNCTION "public"."delete_old_deleted_versions"() FROM "anon";
REVOKE ALL ON FUNCTION "public"."delete_old_deleted_versions"() FROM "authenticated";
GRANT EXECUTE ON FUNCTION "public"."delete_old_deleted_versions"() TO "service_role";
Loading
Loading