Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 56 additions & 11 deletions backend/src/indexer/ledger_follower.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ export interface LedgerFollowerConfig {
retryDelayMs?: number;
maxRetries?: number;
ledgerGapWarningThreshold?: number;
maxBlockLimit?: number;
highEventDensityThreshold?: number;
}

export interface IndexerStatus {
running: boolean;
lastProcessedLedger: number;
lastPollAt: string | null;
consecutiveErrors: number;
currentBatchSize: number;
isThrottled: boolean;
}

const INDEXER_STATE_ID = 1;
Expand All @@ -34,6 +38,8 @@ const DEFAULT_MAX_POLL_INTERVAL_MS = 60_000;
const DEFAULT_RETRY_DELAY_MS = 1_000;
const DEFAULT_MAX_RETRIES = 5;
const DEFAULT_GAP_THRESHOLD = 10;
const DEFAULT_MAX_BLOCK_LIMIT = 100;
const DEFAULT_HIGH_EVENT_DENSITY_THRESHOLD = 5.0;

async function fetchWithRetry(
url: string,
Expand Down Expand Up @@ -91,6 +97,8 @@ export class LedgerFollower {
private lastPollAt: string | null = null;
private consecutiveErrors = 0;
private currentPollIntervalMs: number;
private currentBatchSize: number;
private isThrottled = false;

constructor(
private readonly pool: Pool,
Expand All @@ -105,9 +113,12 @@ export class LedgerFollower {
retryDelayMs: config.retryDelayMs ?? DEFAULT_RETRY_DELAY_MS,
maxRetries: config.maxRetries ?? DEFAULT_MAX_RETRIES,
ledgerGapWarningThreshold: config.ledgerGapWarningThreshold ?? DEFAULT_GAP_THRESHOLD,
maxBlockLimit: config.maxBlockLimit ?? DEFAULT_MAX_BLOCK_LIMIT,
highEventDensityThreshold: config.highEventDensityThreshold ?? DEFAULT_HIGH_EVENT_DENSITY_THRESHOLD,
};

this.currentPollIntervalMs = this.config.pollIntervalMs;
this.currentBatchSize = this.config.maxBlockLimit;
this.guard = new IdempotencyGuard(pool);
this.topicFilter = buildTopicFilter(config.contractIds, this.config.allowedTopicHashes);
}
Expand All @@ -118,6 +129,8 @@ export class LedgerFollower {
lastProcessedLedger: this.lastProcessedLedger,
lastPollAt: this.lastPollAt,
consecutiveErrors: this.consecutiveErrors,
currentBatchSize: this.currentBatchSize,
isThrottled: this.isThrottled,
};
}

Expand All @@ -137,9 +150,9 @@ export class LedgerFollower {
private async loop(): Promise<void> {
while (this.running) {
try {
const newEvents = await this.pollOnce();
const { processed, hasMore } = await this.pollOnce();

if (newEvents === 0) {
if (processed === 0 && !hasMore) {
this.currentPollIntervalMs = Math.min(
this.currentPollIntervalMs * 1.5,
this.config.maxPollIntervalMs,
Expand All @@ -149,6 +162,13 @@ export class LedgerFollower {
}

this.consecutiveErrors = 0;
this.lastPollAt = new Date().toISOString();

if (hasMore) {
// Catching up: proceed to next batch immediately with a minor delay to prevent event loop exhaustion
await sleep(50);
continue;
}
} catch (err) {
this.consecutiveErrors++;
const backoff = Math.min(
Expand All @@ -164,16 +184,15 @@ export class LedgerFollower {
continue;
}

this.lastPollAt = new Date().toISOString();
await sleep(this.currentPollIntervalMs);
}
}

private async pollOnce(): Promise<number> {
private async pollOnce(): Promise<{ processed: number; hasMore: boolean }> {
const latestLedger = await this.fetchLatestLedgerSequence();

if (latestLedger <= this.lastProcessedLedger) {
return 0;
return { processed: 0, hasMore: false };
}

const gap = latestLedger - this.lastProcessedLedger;
Expand All @@ -186,7 +205,8 @@ export class LedgerFollower {
});
}

const events = await this.fetchEvents(this.lastProcessedLedger + 1, latestLedger);
const targetLedger = Math.min(latestLedger, this.lastProcessedLedger + this.currentBatchSize);
const events = await this.fetchEvents(this.lastProcessedLedger + 1, targetLedger);
const relevant = events.filter(this.topicFilter);
let processed = 0;

Expand All @@ -202,18 +222,43 @@ export class LedgerFollower {
processed++;
}

await this.saveLastProcessedLedger(latestLedger);
this.lastProcessedLedger = latestLedger;
await this.saveLastProcessedLedger(targetLedger);
const prevProcessedLedger = this.lastProcessedLedger;
this.lastProcessedLedger = targetLedger;

// Calculate event density in this batch
const batchRange = targetLedger - prevProcessedLedger;
const density = events.length / Math.max(1, batchRange);

if (density > this.config.highEventDensityThreshold) {
this.isThrottled = true;
// Multiplicative decrease
this.currentBatchSize = Math.max(10, Math.floor(this.currentBatchSize * 0.5));
logger.warn("High event density detected. Applying backpressure and throttling batch size", {
density,
threshold: this.config.highEventDensityThreshold,
newBatchSize: this.currentBatchSize,
eventsCount: events.length,
});
// Apply backpressure throttle delay
await sleep(500);
} else {
this.isThrottled = false;
// Additive increase
this.currentBatchSize = Math.min(this.config.maxBlockLimit, this.currentBatchSize + 10);
}

logger.info("Poll cycle complete", {
fromLedger: this.lastProcessedLedger - (latestLedger - this.lastProcessedLedger),
toLedger: latestLedger,
fromLedger: prevProcessedLedger + 1,
toLedger: targetLedger,
totalEvents: events.length,
relevantEvents: relevant.length,
newlyProcessed: processed,
currentBatchSize: this.currentBatchSize,
isThrottled: this.isThrottled,
});

return processed;
return { processed, hasMore: targetLedger < latestLedger };
}

private async fetchLatestLedgerSequence(): Promise<number> {
Expand Down
189 changes: 189 additions & 0 deletions backend/tests/ledger_follower.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import test from "node:test";
import assert from "node:assert/strict";
import Module from "node:module";
import { EventEmitter } from "node:events";

// ── Mock network clients ──
let mockGetHandler: (url: string, options: any, callback: (res: any) => void) => any = () => {
throw new Error("mockGetHandler not set");
};

const originalLoad = (Module as any)._load;
(Module as any)._load = function patchedLoad(request: string, parent: unknown, isMain: boolean) {
if (request === "http" || request === "https") {
return {
get: (url: string, options: any, callback: (res: any) => void) => {
return mockGetHandler(url, options, callback);
}
};
}
return originalLoad.apply(this, [request, parent, isMain]);
};

// Now import the LedgerFollower
import { LedgerFollower } from "../src/indexer/ledger_follower";

function createMockResponse(body: string) {
const res = new EventEmitter() as any;
process.nextTick(() => {
res.emit("data", Buffer.from(body));
res.emit("end");
});
return res;
}

function createMockRequest() {
const req = new EventEmitter() as any;
req.destroy = () => {};
return req;
}

// ── Test Cases ──

test("LedgerFollower uses dynamic pagination and batches query ranges correctly", async () => {
let dbSavedLedger = 0;
const mockPool = {
query: async (queryText: string, params?: any[]) => {
if (queryText.includes("SELECT last_processed_ledger")) {
return { rowCount: 1, rows: [{ last_processed_ledger: "500" }] };
}
if (queryText.includes("INSERT INTO indexer_state")) {
dbSavedLedger = params?.[1];
return { rowCount: 1, rows: [] };
}
return { rowCount: 0, rows: [] };
}
} as any;

// Set up mock HTTP response
mockGetHandler = (url: string, options: any, callback: any) => {
const req = createMockRequest();
if (url.includes("/ledgers?")) {
callback(createMockResponse(JSON.stringify({
_embedded: { records: [{ sequence: 1000 }] }
})));
} else if (url.includes("/soroban/events?")) {
// Return 0 events
callback(createMockResponse(JSON.stringify({
events: []
})));
}
return req;
};

const follower = new LedgerFollower(mockPool, {
stellarRpcUrl: "https://mock-stellar.com",
contractIds: ["mock-contract"],
maxBlockLimit: 100,
pollIntervalMs: 1000,
});

// Manually set internal state instead of start() to avoid background loop
(follower as any).lastProcessedLedger = 500;
(follower as any).running = true;

const result = await (follower as any).pollOnce();

assert.equal(result.processed, 0);
assert.equal(result.hasMore, true); // Since 600 < 1000
assert.equal(follower.status.lastProcessedLedger, 600); // 500 + maxBlockLimit (100)
assert.equal(dbSavedLedger, 600); // Sequence saved persistently to indexer_state
});

test("LedgerFollower applies multiplicative decrease throttling when event density is high", async () => {
let dbSavedLedger = 0;
const mockPool = {
query: async (queryText: string, params?: any[]) => {
if (queryText.includes("SELECT last_processed_ledger")) {
return { rowCount: 1, rows: [{ last_processed_ledger: "500" }] };
}
if (queryText.includes("INSERT INTO indexer_state")) {
dbSavedLedger = params?.[1];
return { rowCount: 1, rows: [] };
}
return { rowCount: 0, rows: [] };
}
} as any;

// Mock high density events return: 600 events inside the batch of 100 ledgers
mockGetHandler = (url: string, options: any, callback: any) => {
const req = createMockRequest();
if (url.includes("/ledgers?")) {
callback(createMockResponse(JSON.stringify({
_embedded: { records: [{ sequence: 1000 }] }
})));
} else if (url.includes("/soroban/events?")) {
const mockEvents = Array.from({ length: 600 }, (_, i) => ({
id: `event-${i}`,
ledger: 550,
contractId: "mock-contract",
topic: ["test"],
value: "value"
}));
callback(createMockResponse(JSON.stringify({
events: mockEvents
})));
}
return req;
};

const follower = new LedgerFollower(mockPool, {
stellarRpcUrl: "https://mock-stellar.com",
contractIds: ["mock-contract"],
maxBlockLimit: 100,
highEventDensityThreshold: 5.0, // Throttles if density > 5.0 events/block
pollIntervalMs: 1000,
});

(follower as any).lastProcessedLedger = 500;
(follower as any).running = true;

await (follower as any).pollOnce();

// Halved from 100 to 50
assert.equal(follower.status.currentBatchSize, 50);
assert.equal(follower.status.isThrottled, true);
assert.equal(dbSavedLedger, 600);
});

test("LedgerFollower applies additive increase recovery when event density is low", async () => {
const mockPool = {
query: async (queryText: string) => {
if (queryText.includes("SELECT last_processed_ledger")) {
return { rowCount: 1, rows: [{ last_processed_ledger: "500" }] };
}
return { rowCount: 0, rows: [] };
}
} as any;

mockGetHandler = (url: string, options: any, callback: any) => {
const req = createMockRequest();
if (url.includes("/ledgers?")) {
callback(createMockResponse(JSON.stringify({
_embedded: { records: [{ sequence: 1000 }] }
})));
} else if (url.includes("/soroban/events?")) {
callback(createMockResponse(JSON.stringify({ events: [] })));
}
return req;
};

const follower = new LedgerFollower(mockPool, {
stellarRpcUrl: "https://mock-stellar.com",
contractIds: ["mock-contract"],
maxBlockLimit: 100,
pollIntervalMs: 1000,
});

(follower as any).lastProcessedLedger = 500;
(follower as any).running = true;

// Set initial batch size to a low value to test additive increase
(follower as any).currentBatchSize = 50;

await (follower as any).pollOnce();

// Additively increased from 50 to 60 (+10)
assert.equal(follower.status.currentBatchSize, 60);
assert.equal(follower.status.isThrottled, false);
});
Loading