diff --git a/backend/src/indexer/ledger_follower.ts b/backend/src/indexer/ledger_follower.ts index 34927baf..a2d96f7e 100644 --- a/backend/src/indexer/ledger_follower.ts +++ b/backend/src/indexer/ledger_follower.ts @@ -19,6 +19,8 @@ export interface LedgerFollowerConfig { retryDelayMs?: number; maxRetries?: number; ledgerGapWarningThreshold?: number; + maxBlockLimit?: number; + highEventDensityThreshold?: number; } export interface IndexerStatus { @@ -26,6 +28,8 @@ export interface IndexerStatus { lastProcessedLedger: number; lastPollAt: string | null; consecutiveErrors: number; + currentBatchSize: number; + isThrottled: boolean; } const INDEXER_STATE_ID = 1; @@ -34,6 +38,8 @@ const DEFAULT_MAX_POLL_INTERVAL_MS = 60_000; const DEFAULT_RETRY_DELAY_MS = 1_000; const DEFAULT_MAX_RETRIES = 5; const DEFAULT_GAP_THRESHOLD = 10; +const DEFAULT_MAX_BLOCK_LIMIT = 100; +const DEFAULT_HIGH_EVENT_DENSITY_THRESHOLD = 5.0; async function fetchWithRetry( url: string, @@ -91,6 +97,8 @@ export class LedgerFollower { private lastPollAt: string | null = null; private consecutiveErrors = 0; private currentPollIntervalMs: number; + private currentBatchSize: number; + private isThrottled = false; constructor( private readonly pool: Pool, @@ -105,9 +113,12 @@ export class LedgerFollower { retryDelayMs: config.retryDelayMs ?? DEFAULT_RETRY_DELAY_MS, maxRetries: config.maxRetries ?? DEFAULT_MAX_RETRIES, ledgerGapWarningThreshold: config.ledgerGapWarningThreshold ?? DEFAULT_GAP_THRESHOLD, + maxBlockLimit: config.maxBlockLimit ?? DEFAULT_MAX_BLOCK_LIMIT, + highEventDensityThreshold: config.highEventDensityThreshold ?? DEFAULT_HIGH_EVENT_DENSITY_THRESHOLD, }; this.currentPollIntervalMs = this.config.pollIntervalMs; + this.currentBatchSize = this.config.maxBlockLimit; this.guard = new IdempotencyGuard(pool); this.topicFilter = buildTopicFilter(config.contractIds, this.config.allowedTopicHashes); } @@ -118,6 +129,8 @@ export class LedgerFollower { lastProcessedLedger: this.lastProcessedLedger, lastPollAt: this.lastPollAt, consecutiveErrors: this.consecutiveErrors, + currentBatchSize: this.currentBatchSize, + isThrottled: this.isThrottled, }; } @@ -137,9 +150,9 @@ export class LedgerFollower { private async loop(): Promise { while (this.running) { try { - const newEvents = await this.pollOnce(); + const { processed, hasMore } = await this.pollOnce(); - if (newEvents === 0) { + if (processed === 0 && !hasMore) { this.currentPollIntervalMs = Math.min( this.currentPollIntervalMs * 1.5, this.config.maxPollIntervalMs, @@ -149,6 +162,13 @@ export class LedgerFollower { } this.consecutiveErrors = 0; + this.lastPollAt = new Date().toISOString(); + + if (hasMore) { + // Catching up: proceed to next batch immediately with a minor delay to prevent event loop exhaustion + await sleep(50); + continue; + } } catch (err) { this.consecutiveErrors++; const backoff = Math.min( @@ -164,16 +184,15 @@ export class LedgerFollower { continue; } - this.lastPollAt = new Date().toISOString(); await sleep(this.currentPollIntervalMs); } } - private async pollOnce(): Promise { + private async pollOnce(): Promise<{ processed: number; hasMore: boolean }> { const latestLedger = await this.fetchLatestLedgerSequence(); if (latestLedger <= this.lastProcessedLedger) { - return 0; + return { processed: 0, hasMore: false }; } const gap = latestLedger - this.lastProcessedLedger; @@ -186,7 +205,8 @@ export class LedgerFollower { }); } - const events = await this.fetchEvents(this.lastProcessedLedger + 1, latestLedger); + const targetLedger = Math.min(latestLedger, this.lastProcessedLedger + this.currentBatchSize); + const events = await this.fetchEvents(this.lastProcessedLedger + 1, targetLedger); const relevant = events.filter(this.topicFilter); let processed = 0; @@ -202,18 +222,43 @@ export class LedgerFollower { processed++; } - await this.saveLastProcessedLedger(latestLedger); - this.lastProcessedLedger = latestLedger; + await this.saveLastProcessedLedger(targetLedger); + const prevProcessedLedger = this.lastProcessedLedger; + this.lastProcessedLedger = targetLedger; + + // Calculate event density in this batch + const batchRange = targetLedger - prevProcessedLedger; + const density = events.length / Math.max(1, batchRange); + + if (density > this.config.highEventDensityThreshold) { + this.isThrottled = true; + // Multiplicative decrease + this.currentBatchSize = Math.max(10, Math.floor(this.currentBatchSize * 0.5)); + logger.warn("High event density detected. Applying backpressure and throttling batch size", { + density, + threshold: this.config.highEventDensityThreshold, + newBatchSize: this.currentBatchSize, + eventsCount: events.length, + }); + // Apply backpressure throttle delay + await sleep(500); + } else { + this.isThrottled = false; + // Additive increase + this.currentBatchSize = Math.min(this.config.maxBlockLimit, this.currentBatchSize + 10); + } logger.info("Poll cycle complete", { - fromLedger: this.lastProcessedLedger - (latestLedger - this.lastProcessedLedger), - toLedger: latestLedger, + fromLedger: prevProcessedLedger + 1, + toLedger: targetLedger, totalEvents: events.length, relevantEvents: relevant.length, newlyProcessed: processed, + currentBatchSize: this.currentBatchSize, + isThrottled: this.isThrottled, }); - return processed; + return { processed, hasMore: targetLedger < latestLedger }; } private async fetchLatestLedgerSequence(): Promise { diff --git a/backend/tests/ledger_follower.test.ts b/backend/tests/ledger_follower.test.ts new file mode 100644 index 00000000..a90aeef2 --- /dev/null +++ b/backend/tests/ledger_follower.test.ts @@ -0,0 +1,189 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import Module from "node:module"; +import { EventEmitter } from "node:events"; + +// ── Mock network clients ── +let mockGetHandler: (url: string, options: any, callback: (res: any) => void) => any = () => { + throw new Error("mockGetHandler not set"); +}; + +const originalLoad = (Module as any)._load; +(Module as any)._load = function patchedLoad(request: string, parent: unknown, isMain: boolean) { + if (request === "http" || request === "https") { + return { + get: (url: string, options: any, callback: (res: any) => void) => { + return mockGetHandler(url, options, callback); + } + }; + } + return originalLoad.apply(this, [request, parent, isMain]); +}; + +// Now import the LedgerFollower +import { LedgerFollower } from "../src/indexer/ledger_follower"; + +function createMockResponse(body: string) { + const res = new EventEmitter() as any; + process.nextTick(() => { + res.emit("data", Buffer.from(body)); + res.emit("end"); + }); + return res; +} + +function createMockRequest() { + const req = new EventEmitter() as any; + req.destroy = () => {}; + return req; +} + +// ── Test Cases ── + +test("LedgerFollower uses dynamic pagination and batches query ranges correctly", async () => { + let dbSavedLedger = 0; + const mockPool = { + query: async (queryText: string, params?: any[]) => { + if (queryText.includes("SELECT last_processed_ledger")) { + return { rowCount: 1, rows: [{ last_processed_ledger: "500" }] }; + } + if (queryText.includes("INSERT INTO indexer_state")) { + dbSavedLedger = params?.[1]; + return { rowCount: 1, rows: [] }; + } + return { rowCount: 0, rows: [] }; + } + } as any; + + // Set up mock HTTP response + mockGetHandler = (url: string, options: any, callback: any) => { + const req = createMockRequest(); + if (url.includes("/ledgers?")) { + callback(createMockResponse(JSON.stringify({ + _embedded: { records: [{ sequence: 1000 }] } + }))); + } else if (url.includes("/soroban/events?")) { + // Return 0 events + callback(createMockResponse(JSON.stringify({ + events: [] + }))); + } + return req; + }; + + const follower = new LedgerFollower(mockPool, { + stellarRpcUrl: "https://mock-stellar.com", + contractIds: ["mock-contract"], + maxBlockLimit: 100, + pollIntervalMs: 1000, + }); + + // Manually set internal state instead of start() to avoid background loop + (follower as any).lastProcessedLedger = 500; + (follower as any).running = true; + + const result = await (follower as any).pollOnce(); + + assert.equal(result.processed, 0); + assert.equal(result.hasMore, true); // Since 600 < 1000 + assert.equal(follower.status.lastProcessedLedger, 600); // 500 + maxBlockLimit (100) + assert.equal(dbSavedLedger, 600); // Sequence saved persistently to indexer_state +}); + +test("LedgerFollower applies multiplicative decrease throttling when event density is high", async () => { + let dbSavedLedger = 0; + const mockPool = { + query: async (queryText: string, params?: any[]) => { + if (queryText.includes("SELECT last_processed_ledger")) { + return { rowCount: 1, rows: [{ last_processed_ledger: "500" }] }; + } + if (queryText.includes("INSERT INTO indexer_state")) { + dbSavedLedger = params?.[1]; + return { rowCount: 1, rows: [] }; + } + return { rowCount: 0, rows: [] }; + } + } as any; + + // Mock high density events return: 600 events inside the batch of 100 ledgers + mockGetHandler = (url: string, options: any, callback: any) => { + const req = createMockRequest(); + if (url.includes("/ledgers?")) { + callback(createMockResponse(JSON.stringify({ + _embedded: { records: [{ sequence: 1000 }] } + }))); + } else if (url.includes("/soroban/events?")) { + const mockEvents = Array.from({ length: 600 }, (_, i) => ({ + id: `event-${i}`, + ledger: 550, + contractId: "mock-contract", + topic: ["test"], + value: "value" + })); + callback(createMockResponse(JSON.stringify({ + events: mockEvents + }))); + } + return req; + }; + + const follower = new LedgerFollower(mockPool, { + stellarRpcUrl: "https://mock-stellar.com", + contractIds: ["mock-contract"], + maxBlockLimit: 100, + highEventDensityThreshold: 5.0, // Throttles if density > 5.0 events/block + pollIntervalMs: 1000, + }); + + (follower as any).lastProcessedLedger = 500; + (follower as any).running = true; + + await (follower as any).pollOnce(); + + // Halved from 100 to 50 + assert.equal(follower.status.currentBatchSize, 50); + assert.equal(follower.status.isThrottled, true); + assert.equal(dbSavedLedger, 600); +}); + +test("LedgerFollower applies additive increase recovery when event density is low", async () => { + const mockPool = { + query: async (queryText: string) => { + if (queryText.includes("SELECT last_processed_ledger")) { + return { rowCount: 1, rows: [{ last_processed_ledger: "500" }] }; + } + return { rowCount: 0, rows: [] }; + } + } as any; + + mockGetHandler = (url: string, options: any, callback: any) => { + const req = createMockRequest(); + if (url.includes("/ledgers?")) { + callback(createMockResponse(JSON.stringify({ + _embedded: { records: [{ sequence: 1000 }] } + }))); + } else if (url.includes("/soroban/events?")) { + callback(createMockResponse(JSON.stringify({ events: [] }))); + } + return req; + }; + + const follower = new LedgerFollower(mockPool, { + stellarRpcUrl: "https://mock-stellar.com", + contractIds: ["mock-contract"], + maxBlockLimit: 100, + pollIntervalMs: 1000, + }); + + (follower as any).lastProcessedLedger = 500; + (follower as any).running = true; + + // Set initial batch size to a low value to test additive increase + (follower as any).currentBatchSize = 50; + + await (follower as any).pollOnce(); + + // Additively increased from 50 to 60 (+10) + assert.equal(follower.status.currentBatchSize, 60); + assert.equal(follower.status.isThrottled, false); +});