diff --git a/keeper/package-lock.json b/keeper/package-lock.json index 9e481d9..a03042a 100644 --- a/keeper/package-lock.json +++ b/keeper/package-lock.json @@ -5307,6 +5307,64 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "license": "ISC" }, + "node_modules/ioredis": { + "version": "5.10.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.10.1.tgz", + "integrity": "sha512-HuEDBTI70aYdx1v6U97SbNx9F1+svQKBDo30o0b9fw055LMepzpOOd0Ccg9Q6tbqmBSJaMuY0fB7yw9/vjBYCA==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "1.5.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, + "node_modules/ioredis-mock": { + "version": "8.13.1", + "resolved": "https://registry.npmjs.org/ioredis-mock/-/ioredis-mock-8.13.1.tgz", + "integrity": "sha512-Wsi50AU+cMiI32nAgfwpUaJVBtb4iQdVsOHl9M6R3tePCO/8vGsToCVIG82XWAxN4Se55TZoOzVseu+QngFLyw==", + "dev": true, + "license": "MIT", + "dependencies": { + "@ioredis/as-callback": "^3.0.0", + "@ioredis/commands": "^1.4.0", + "fengari": "^0.1.4", + "fengari-interop": "^0.1.3", + "semver": "^7.7.2" + }, + "engines": { + "node": ">=12.22" + }, + "peerDependencies": { + "@types/ioredis-mock": "^8", + "ioredis": "^5" + } + }, + "node_modules/ioredis-mock/node_modules/semver": { + "version": "7.7.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.4.tgz", + "integrity": "sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA==", + "dev": true, + "license": "ISC", + "bin": { + "semver": "bin/semver.js" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/is-arrayish": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/is-arrayish/-/is-arrayish-0.2.1.tgz", diff --git a/keeper/src/poller.js b/keeper/src/poller.js index f37ac1e..7823602 100644 --- a/keeper/src/poller.js +++ b/keeper/src/poller.js @@ -316,7 +316,7 @@ class TaskPoller { this.stats.tasksChecked++; } else if (result.status === 'rejected') { this.stats.errors++; - this.logger.error('Error checking task', { taskId: taskIds[index], error: result.reason?.message || result.reason }); + this.logger.error('Error checking task', { taskId: candidateIds[index], error: result.reason?.message || result.reason }); } }); diff --git a/keeper/src/registry.js b/keeper/src/registry.js index 05ca92e..7f40388 100644 --- a/keeper/src/registry.js +++ b/keeper/src/registry.js @@ -2,6 +2,10 @@ const { xdr } = require('@stellar/stellar-sdk'); const { createLogger } = require('./logger'); const TaskSnapshot = require('./taskSnapshot'); +const DATA_DIR = path.join(__dirname, '..', 'data'); +const TASKS_FILE = path.join(DATA_DIR, 'tasks.json'); +const SNAPSHOT_VERSION = 1; + const EVENT_TOPICS = { TaskRegistered: 'AAAADwAAAA5UYXNrUmVnaXN0ZXJlZAAA', TaskPaused: 'AAAADwAAAApUYXNrUGF1c2VkAAA=', @@ -19,22 +23,12 @@ class TaskRegistry { this.server = server; this.contractId = contractId; this.taskIds = new Set(); - this.tasks = new Map(); // taskId -> TaskConfig - this.duplicateTaskIds = new Set(); + this.tasks = new Map(); // Store taskId -> TaskConfig this.lastSeenLedger = options.startLedger || 0; + this.snapshotVersion = SNAPSHOT_VERSION; this.logger = options.logger || createLogger('registry'); - - // Snapshot manager — injectable for testing, otherwise constructed from options - this.snapshot = options.snapshot || new TaskSnapshot({ - dir: options.snapshotDir || null, - staleThresholdLedgers: options.staleThreshold || options.staleThresholdLedgers, - staleThresholdMs: options.staleThresholdMs, - logger: this.logger, - }); - - // savedAt from last loaded snapshot; used in init() for the wall-clock staleness check - this._snapshotSavedAt = null; - + this.staleThreshold = options.staleThreshold || 100000; // ~1 week of ledgers + this._ensureDataDir(); this._loadFromDisk(); } @@ -44,16 +38,14 @@ class TaskRegistry { */ async init() { this.logger.info('Initializing task registry'); - + + // Check if snapshot is too old or version mismatch const latestLedger = await this.server.getLatestLedger(); - - if (this.lastSeenLedger > 0 && this.snapshot.isStale( - { lastSeenLedger: this.lastSeenLedger, savedAt: this._snapshotSavedAt }, - latestLedger.sequence - )) { - this.logger.warn('Snapshot is stale, triggering full refresh', { + if (this.lastSeenLedger > 0 && (latestLedger.sequence - this.lastSeenLedger) > this.staleThreshold) { + this.logger.warn('Snapshot is too stale, triggering full refresh', { lastSeen: this.lastSeenLedger, current: latestLedger.sequence, + threshold: this.staleThreshold }); this.lastSeenLedger = 0; this.tasks.clear(); @@ -154,7 +146,7 @@ class TaskRegistry { const currentLedger = info.sequence; if (!this.lastSeenLedger) { - // Look back ~1 hour on testnet (≈ 720 ledgers) + // Look back a reasonable window (default ~1 hour on testnet ≈ 720 ledgers) this.lastSeenLedger = Math.max(currentLedger - 720, 0); } @@ -162,14 +154,21 @@ class TaskRegistry { let cursor; let hasMore = true; + const topics = [ + Object.values(EVENT_TOPICS), + ['*'] + ]; + while (hasMore) { const params = { startLedger: cursor ? undefined : this.lastSeenLedger, - filters: [{ - type: 'contract', - contractIds: [this.contractId], - topics, - }], + filters: [ + { + type: 'contract', + contractIds: [contractId], + topics: topics, + }, + ], limit: 100, }; @@ -213,6 +212,96 @@ class TaskRegistry { const { scValToNative } = require('@stellar/stellar-sdk'); const topics = event.topic.map(t => scValToNative(xdr.ScVal.fromXDR(t, 'base64'))); const eventType = topics[0]; + + // Most events have taskId as the 3rd topic (index 2) in v1 + let taskId; + if (topics[1] === 'v1') { + taskId = Number(topics[2]); + } else { + // Fallback for legacy or different format + taskId = Number(topics[1]); + } + + if (isNaN(taskId)) return; + + const eventData = event.value ? scValToNative(xdr.ScVal.fromXDR(event.value, 'base64')) : null; + const ledgerTimestamp = Math.floor(new Date(event.ledgerCloseAt).getTime() / 1000); + + const task = this.tasks.get(taskId) || { id: taskId, blocked_by: [] }; + + switch (eventType) { + case 'TaskRegistered': + // If we only have the event, we might not have the full config yet. + // But we mark it as registered. + this.taskIds.add(taskId); + this.updateTask(taskId, { + id: taskId, + status: 'registered', + registeredAt: event.ledgerCloseAt, + is_active: true, + last_run: 0 + }); + break; + + case 'TaskPaused': + this.updateTask(taskId, { is_active: false, status: 'paused' }); + break; + + case 'TaskResumed': + this.updateTask(taskId, { is_active: true, status: 'active' }); + break; + + case 'KeeperPaid': + // eventData is [keeper, fee] + const fee = eventData ? Number(eventData[1]) : 100; + this.updateTask(taskId, { + last_run: ledgerTimestamp, + gas_balance: (task.gas_balance || 0) - fee, + status: 'active' + }); + break; + + case 'GasDeposited': + // eventData is [from, amount] + const depositAmount = eventData ? Number(eventData[1]) : 0; + this.updateTask(taskId, { gas_balance: (task.gas_balance || 0) + depositAmount }); + break; + + case 'GasWithdrawn': + // eventData is [from, amount] + const withdrawAmount = eventData ? Number(eventData[1]) : 0; + this.updateTask(taskId, { gas_balance: (task.gas_balance || 0) - withdrawAmount }); + break; + + case 'TaskCancelled': + this.tasks.delete(taskId); + this.taskIds.delete(taskId); + break; + + case 'DependencyAdded': + // eventData is depends_on_task_id + const depId = Number(eventData); + const currentDeps = task.blocked_by || []; + if (!currentDeps.includes(depId)) { + this.updateTask(taskId, { blocked_by: [...currentDeps, depId] }); + } + break; + + case 'DependencyRemoved': + // eventData is depends_on_task_id + const remId = Number(eventData); + this.updateTask(taskId, { blocked_by: (task.blocked_by || []).filter(id => id !== remId) }); + break; + } + } + + /** + * Extract the u64 task ID from the second topic of a TaskRegistered event. + */ + _extractTaskId(event) { + const { scValToNative } = require('@stellar/stellar-sdk'); + const topics = event.topic.map(t => scValToNative(xdr.ScVal.fromXDR(t, 'base64'))); + const eventType = topics[0]; let taskId; if (topics[1] === 'v1') { @@ -317,11 +406,30 @@ class TaskRegistry { * Delegates all I/O and migration logic to TaskSnapshot. */ _loadFromDisk() { - const data = this.snapshot.loadSync(); - if (!data) return; - - if (Array.isArray(data.taskIds)) { - data.taskIds.forEach(id => this.taskIds.add(id)); + try { + if (fs.existsSync(TASKS_FILE)) { + const data = JSON.parse(fs.readFileSync(TASKS_FILE, 'utf-8')); + if (data.version && data.version !== SNAPSHOT_VERSION) { + this.logger.warn('Snapshot version mismatch, full refresh may be needed', { + fileVersion: data.version, + currentVersion: SNAPSHOT_VERSION + }); + } + if (Array.isArray(data.taskIds)) { + data.taskIds.forEach(id => this.taskIds.add(id)); + } + if (data.tasks) { + Object.entries(data.tasks).forEach(([id, details]) => { + this.tasks.set(Number(id), details); + }); + } + if (data.lastSeenLedger && data.lastSeenLedger > this.lastSeenLedger) { + this.lastSeenLedger = data.lastSeenLedger; + } + this.logger.info('Loaded tasks from disk', { taskCount: this.taskIds.size, ledger: this.lastSeenLedger }); + } + } catch (err) { + this.logger.warn('Could not load persisted tasks', { error: err.message }); } if (data.tasks) { Object.entries(data.tasks).forEach(([id, details]) => { @@ -344,9 +452,10 @@ class TaskRegistry { */ async _saveToDisk() { try { - await this.snapshot.save({ - taskIds: this.taskIds, - tasks: this.tasks, + const data = { + version: SNAPSHOT_VERSION, + taskIds: Array.from(this.taskIds).sort((a, b) => a - b), + tasks: Object.fromEntries(this.tasks), lastSeenLedger: this.lastSeenLedger, }); } catch (err) {