Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions keeper/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion keeper/src/poller.js
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ class TaskPoller {
this.stats.tasksChecked++;
} else if (result.status === 'rejected') {
this.stats.errors++;
this.logger.error('Error checking task', { taskId: taskIds[index], error: result.reason?.message || result.reason });
this.logger.error('Error checking task', { taskId: candidateIds[index], error: result.reason?.message || result.reason });
}
});

Expand Down
179 changes: 144 additions & 35 deletions keeper/src/registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ const { xdr } = require('@stellar/stellar-sdk');
const { createLogger } = require('./logger');
const TaskSnapshot = require('./taskSnapshot');

const DATA_DIR = path.join(__dirname, '..', 'data');
const TASKS_FILE = path.join(DATA_DIR, 'tasks.json');
const SNAPSHOT_VERSION = 1;

const EVENT_TOPICS = {
TaskRegistered: 'AAAADwAAAA5UYXNrUmVnaXN0ZXJlZAAA',
TaskPaused: 'AAAADwAAAApUYXNrUGF1c2VkAAA=',
Expand All @@ -19,22 +23,12 @@ class TaskRegistry {
this.server = server;
this.contractId = contractId;
this.taskIds = new Set();
this.tasks = new Map(); // taskId -> TaskConfig
this.duplicateTaskIds = new Set();
this.tasks = new Map(); // Store taskId -> TaskConfig
this.lastSeenLedger = options.startLedger || 0;
this.snapshotVersion = SNAPSHOT_VERSION;
this.logger = options.logger || createLogger('registry');

// Snapshot manager β€” injectable for testing, otherwise constructed from options
this.snapshot = options.snapshot || new TaskSnapshot({
dir: options.snapshotDir || null,
staleThresholdLedgers: options.staleThreshold || options.staleThresholdLedgers,
staleThresholdMs: options.staleThresholdMs,
logger: this.logger,
});

// savedAt from last loaded snapshot; used in init() for the wall-clock staleness check
this._snapshotSavedAt = null;

this.staleThreshold = options.staleThreshold || 100000; // ~1 week of ledgers
this._ensureDataDir();
this._loadFromDisk();
}

Expand All @@ -44,16 +38,14 @@ class TaskRegistry {
*/
async init() {
this.logger.info('Initializing task registry');


// Check if snapshot is too old or version mismatch
const latestLedger = await this.server.getLatestLedger();

if (this.lastSeenLedger > 0 && this.snapshot.isStale(
{ lastSeenLedger: this.lastSeenLedger, savedAt: this._snapshotSavedAt },
latestLedger.sequence
)) {
this.logger.warn('Snapshot is stale, triggering full refresh', {
if (this.lastSeenLedger > 0 && (latestLedger.sequence - this.lastSeenLedger) > this.staleThreshold) {
this.logger.warn('Snapshot is too stale, triggering full refresh', {
lastSeen: this.lastSeenLedger,
current: latestLedger.sequence,
threshold: this.staleThreshold
});
this.lastSeenLedger = 0;
this.tasks.clear();
Expand Down Expand Up @@ -154,22 +146,29 @@ class TaskRegistry {
const currentLedger = info.sequence;

if (!this.lastSeenLedger) {
// Look back ~1 hour on testnet (β‰ˆ 720 ledgers)
// Look back a reasonable window (default ~1 hour on testnet β‰ˆ 720 ledgers)
this.lastSeenLedger = Math.max(currentLedger - 720, 0);
}

const topics = [Object.values(EVENT_TOPICS), ['*']];
let cursor;
let hasMore = true;

const topics = [
Object.values(EVENT_TOPICS),
['*']
];

while (hasMore) {
const params = {
startLedger: cursor ? undefined : this.lastSeenLedger,
filters: [{
type: 'contract',
contractIds: [this.contractId],
topics,
}],
filters: [
{
type: 'contract',
contractIds: [contractId],
topics: topics,
},
],
limit: 100,
};

Expand Down Expand Up @@ -213,6 +212,96 @@ class TaskRegistry {
const { scValToNative } = require('@stellar/stellar-sdk');
const topics = event.topic.map(t => scValToNative(xdr.ScVal.fromXDR(t, 'base64')));
const eventType = topics[0];

// Most events have taskId as the 3rd topic (index 2) in v1
let taskId;
if (topics[1] === 'v1') {
taskId = Number(topics[2]);
} else {
// Fallback for legacy or different format
taskId = Number(topics[1]);
}

if (isNaN(taskId)) return;

const eventData = event.value ? scValToNative(xdr.ScVal.fromXDR(event.value, 'base64')) : null;
const ledgerTimestamp = Math.floor(new Date(event.ledgerCloseAt).getTime() / 1000);

const task = this.tasks.get(taskId) || { id: taskId, blocked_by: [] };

switch (eventType) {
case 'TaskRegistered':
// If we only have the event, we might not have the full config yet.
// But we mark it as registered.
this.taskIds.add(taskId);
this.updateTask(taskId, {
id: taskId,
status: 'registered',
registeredAt: event.ledgerCloseAt,
is_active: true,
last_run: 0
});
break;

case 'TaskPaused':
this.updateTask(taskId, { is_active: false, status: 'paused' });
break;

case 'TaskResumed':
this.updateTask(taskId, { is_active: true, status: 'active' });
break;

case 'KeeperPaid':
// eventData is [keeper, fee]
const fee = eventData ? Number(eventData[1]) : 100;
this.updateTask(taskId, {
last_run: ledgerTimestamp,
gas_balance: (task.gas_balance || 0) - fee,
status: 'active'
});
break;

case 'GasDeposited':
// eventData is [from, amount]
const depositAmount = eventData ? Number(eventData[1]) : 0;
this.updateTask(taskId, { gas_balance: (task.gas_balance || 0) + depositAmount });
break;

case 'GasWithdrawn':
// eventData is [from, amount]
const withdrawAmount = eventData ? Number(eventData[1]) : 0;
this.updateTask(taskId, { gas_balance: (task.gas_balance || 0) - withdrawAmount });
break;

case 'TaskCancelled':
this.tasks.delete(taskId);
this.taskIds.delete(taskId);
break;

case 'DependencyAdded':
// eventData is depends_on_task_id
const depId = Number(eventData);
const currentDeps = task.blocked_by || [];
if (!currentDeps.includes(depId)) {
this.updateTask(taskId, { blocked_by: [...currentDeps, depId] });
}
break;

case 'DependencyRemoved':
// eventData is depends_on_task_id
const remId = Number(eventData);
this.updateTask(taskId, { blocked_by: (task.blocked_by || []).filter(id => id !== remId) });
break;
}
}

/**
* Extract the u64 task ID from the second topic of a TaskRegistered event.
*/
_extractTaskId(event) {
const { scValToNative } = require('@stellar/stellar-sdk');
const topics = event.topic.map(t => scValToNative(xdr.ScVal.fromXDR(t, 'base64')));
const eventType = topics[0];

let taskId;
if (topics[1] === 'v1') {
Expand Down Expand Up @@ -317,11 +406,30 @@ class TaskRegistry {
* Delegates all I/O and migration logic to TaskSnapshot.
*/
_loadFromDisk() {
const data = this.snapshot.loadSync();
if (!data) return;

if (Array.isArray(data.taskIds)) {
data.taskIds.forEach(id => this.taskIds.add(id));
try {
if (fs.existsSync(TASKS_FILE)) {
const data = JSON.parse(fs.readFileSync(TASKS_FILE, 'utf-8'));
if (data.version && data.version !== SNAPSHOT_VERSION) {
this.logger.warn('Snapshot version mismatch, full refresh may be needed', {
fileVersion: data.version,
currentVersion: SNAPSHOT_VERSION
});
}
if (Array.isArray(data.taskIds)) {
data.taskIds.forEach(id => this.taskIds.add(id));
}
if (data.tasks) {
Object.entries(data.tasks).forEach(([id, details]) => {
this.tasks.set(Number(id), details);
});
}
if (data.lastSeenLedger && data.lastSeenLedger > this.lastSeenLedger) {
this.lastSeenLedger = data.lastSeenLedger;
}
this.logger.info('Loaded tasks from disk', { taskCount: this.taskIds.size, ledger: this.lastSeenLedger });
}
} catch (err) {
this.logger.warn('Could not load persisted tasks', { error: err.message });
}
if (data.tasks) {
Object.entries(data.tasks).forEach(([id, details]) => {
Expand All @@ -344,9 +452,10 @@ class TaskRegistry {
*/
async _saveToDisk() {
try {
await this.snapshot.save({
taskIds: this.taskIds,
tasks: this.tasks,
const data = {
version: SNAPSHOT_VERSION,
taskIds: Array.from(this.taskIds).sort((a, b) => a - b),
tasks: Object.fromEntries(this.tasks),
lastSeenLedger: this.lastSeenLedger,
});
} catch (err) {
Expand Down
Loading