diff --git a/Cargo.toml b/Cargo.toml index 95808b2..5f60774 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "contracts/shared", "contracts/reserve_contract", "contracts/account_factory", + "tools/event_backfill", ] [profile.release] diff --git a/docs/event-backfill.md b/docs/event-backfill.md new file mode 100644 index 0000000..75830f1 --- /dev/null +++ b/docs/event-backfill.md @@ -0,0 +1,231 @@ +# Event Backfill Tool + +## Overview + +The event backfill tool provides functionality to query historical Soroban contract events from Stellar RPC for backfilling purposes. This is useful for reconstructing state after an SDK outage or for analytics. + +## Architecture + +The tool uses the **Stellar RPC API's `getEvents` method** (not Horizon) to query Soroban contract events. RPC is the preferred method for Soroban smart contracts, while Horizon is the legacy API for classic Stellar operations. + +### Key Components + +- **`BackfillConfig`**: Configuration for event backfilling operations +- **`EventBackfiller`**: Main client for querying events +- **`ContractEvent`**: Parsed event structure with metadata +- **`EventFilter`**: Filter for specific event types and topics + +## Usage + +### Basic Example + +```rust +use event_backfill::{EventBackfiller, BackfillConfig}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let config = BackfillConfig::new( + "https://soroban-testnet.stellar.org".to_string(), + "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), + 100000, + ); + + let backfiller = EventBackfiller::new(config); + let events = backfiller.backfill_events().await?; + + println!("Backfilled {} events", events.len()); + Ok(()) +} +``` + +### With Ledger Range + +```rust +let config = BackfillConfig { + rpc_url: "https://soroban-testnet.stellar.org".to_string(), + contract_id: "CONTRACT_ID".to_string(), + start_ledger: 100000, + end_ledger: Some(200000), // Optional end ledger + batch_size: 100, +}; +``` + +### With Topic Filter + +```rust +let backfiller = EventBackfiller::new(config); + +// Filter for specific event topics (base64-encoded) +let topic_filter = vec!["AAAADwAAAAh0cmFuc2Zlcg==".to_string()]; +let events = backfiller.backfill_events_with_filter(topic_filter).await?; +``` + +## Configuration + +### BackfillConfig Fields + +- **`rpc_url`**: Stellar RPC server URL (e.g., `"https://soroban-testnet.stellar.org"`) +- **`contract_id`**: Contract ID to filter events for +- **`start_ledger`**: Starting ledger sequence (inclusive) +- **`end_ledger`**: Optional ending ledger sequence (exclusive). If None, fetches to latest +- **`batch_size`**: Number of events to fetch per batch (max 10000 per RPC limits) + +### RPC Endpoints + +- **Testnet**: `https://soroban-testnet.stellar.org` +- **Mainnet**: `https://soroban.stellar.org` +- **Local Sandbox**: `http://localhost:8000` + +## Event Structure + +### ContractEvent + +```rust +pub struct ContractEvent { + pub id: String, + pub ledger: u32, + pub ledger_closed_at: String, + pub contract_id: String, + pub event_type: String, + pub topics: Vec, + pub data: String, + pub in_successful_contract_call: bool, + pub paging_token: String, +} +``` + +### Decoding Event Data + +Event data and topics are returned as base64-encoded strings. You can decode them to bytes: + +```rust +let data_bytes = event.data_as_bytes()?; +let topic_bytes = event.topic_as_bytes(0)?; +``` + +## Error Handling + +The tool uses a custom `BackfillError` type: + +- **`RpcError`**: Error from the RPC client +- **`HttpError`**: HTTP request error +- **`JsonError`**: JSON parsing error +- **`InvalidConfig`**: Invalid configuration +- **`NoEventsFound`**: No events found for the given criteria +- **`PaginationError`**: Pagination error +- **`XdrError`**: XDR decoding error + +## Testing + +### Unit Tests + +Run unit tests (no network required): + +```bash +cargo test -p event_backfill +``` + +### Integration Tests + +Integration tests require access to a Stellar RPC server: + +```bash +# Run all integration tests (requires testnet access) +cargo test -p event_backfill --test integration_test + +# Run specific integration test +cargo test -p event_backfill --test integration_test test_backfill_events_testnet +``` + +Note: Integration tests are marked with `#[ignore]` by default. Remove the attribute or run with: + +```bash +cargo test -p event_backfill --test integration_test -- --ignored +``` + +## Limitations + +1. **RPC Retention**: Stellar RPC typically retains events for 7 days. For longer history, consider using an indexer like Mercury, SubQuery, or Goldsky. + +2. **Batch Size**: Maximum batch size is 10,000 events per request (RPC limit). + +3. **Rate Limiting**: Be mindful of RPC rate limits when backfilling large ranges. + +4. **Contract ID Limit**: Maximum 5 contract IDs per filter. + +## Use Cases + +### State Reconstruction After Outage + +```rust +// Backfill events from the last known ledger +let config = BackfillConfig::new( + rpc_url, + contract_id, + last_known_ledger, +); + +let backfiller = EventBackfiller::new(config); +let events = backfiller.backfill_events().await?; + +// Reconstruct state from events +for event in events { + // Process event to update local state +} +``` + +### Analytics + +```rust +// Backfill events for analytics +let config = BackfillConfig { + rpc_url: "https://soroban-mainnet.stellar.org".to_string(), + contract_id: contract_id, + start_ledger: analytics_start_ledger, + end_ledger: Some(current_ledger), + batch_size: 1000, +}; + +let events = EventBackfiller::new(config).backfill_events().await?; + +// Analyze event patterns +analyze_events(&events); +``` + +## Best Practices + +1. **Use Appropriate Batch Sizes**: Start with smaller batch sizes (100-500) for testing, increase for production. + +2. **Handle Pagination**: The tool handles pagination automatically, but be aware of rate limits. + +3. **Validate Configuration**: Always call `config.validate()` before creating a backfiller. + +4. **Error Handling**: Handle `NoEventsFound` gracefully - it's expected when no events exist in the range. + +5. **Ledger Ranges**: Use reasonable ledger ranges to avoid timeouts and rate limit issues. + +## Troubleshooting + +### No Events Found + +- Verify the contract ID is correct +- Check that events exist in the specified ledger range +- Ensure the RPC server has data for the requested range + +### RPC Errors + +- Verify the RPC URL is accessible +- Check network connectivity +- Ensure the RPC server is operational + +### Timeout Errors + +- Reduce the batch size +- Use smaller ledger ranges +- Check network latency to the RPC server + +## References + +- [Stellar RPC getEvents Documentation](https://developers.stellar.org/docs/data/apis/rpc/api-reference/methods/getEvents) +- [Soroban Events Guide](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures/events) +- [Event Ingestion Guide](https://developers.stellar.org/docs/build/guides/events/ingest) diff --git a/tools/event_backfill/Cargo.toml b/tools/event_backfill/Cargo.toml new file mode 100644 index 0000000..1e5a6ab --- /dev/null +++ b/tools/event_backfill/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "event_backfill" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +reqwest = { version = "0.12", features = ["json"] } +tokio = { version = "1", features = ["full"] } +thiserror = "1.0" +base64 = "0.22" + +[dev-dependencies] +tokio-test = "0.4" diff --git a/tools/event_backfill/src/backfiller.rs b/tools/event_backfill/src/backfiller.rs new file mode 100644 index 0000000..550023f --- /dev/null +++ b/tools/event_backfill/src/backfiller.rs @@ -0,0 +1,284 @@ +//! Event backfiller implementation for querying historical Soroban contract events. + +use crate::{ + error::{BackfillError, Result}, + types::{BackfillConfig, ContractEvent, EventFilter, GetEventsRequest, GetEventsParams, + GetEventsResponse, Pagination, RpcEvent}, +}; +use reqwest::Client; +use std::time::Duration; + +/// Event backfiller for querying historical contract events from Stellar RPC. +pub struct EventBackfiller { + config: BackfillConfig, + client: Client, +} + +impl EventBackfiller { + /// Creates a new EventBackfiller with the given configuration. + pub fn new(config: BackfillConfig) -> Self { + let client = Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .expect("Failed to create HTTP client"); + + Self { config, client } + } + + /// Backfills all events for the configured contract within the ledger range. + /// + /// This method fetches events in batches, handling pagination automatically. + /// Returns a vector of all events found, ordered by ledger sequence. + pub async fn backfill_events(&self) -> Result> { + self.config.validate()?; + + let mut all_events = Vec::new(); + let mut cursor: Option = None; + let mut current_ledger = self.config.start_ledger; + + loop { + let request = self.build_request(current_ledger, cursor.clone())?; + let response = self.send_request(request).await?; + + if response.result.events.is_empty() { + break; + } + + // Get the cursor and last ledger before consuming events + let last_event_ledger = response.result.events.last().and_then(|e| e.ledger.parse().ok()); + cursor = response.result.events.last().map(|e| e.paging_token.clone()); + + let events: Vec = response + .result + .events + .into_iter() + .map(|e| self.convert_rpc_event(e)) + .collect(); + + all_events.extend(events.clone()); + + // Check if we've reached the end ledger + if let Some(end_ledger) = self.config.end_ledger { + let max_ledger = all_events + .iter() + .map(|e| e.ledger) + .max() + .unwrap_or(current_ledger); + + if max_ledger >= end_ledger { + // Filter out events beyond end_ledger + all_events.retain(|e| e.ledger < end_ledger); + break; + } + } + + // If no cursor, we've reached the end + if cursor.is_none() { + break; + } + + // Update current ledger for the next request + if let Some(ledger) = last_event_ledger { + current_ledger = ledger; + } + } + + if all_events.is_empty() { + return Err(BackfillError::NoEventsFound); + } + + // Sort events by ledger sequence + all_events.sort_by_key(|e| e.ledger); + + Ok(all_events) + } + + /// Backfills events with a specific topic filter. + /// + /// This allows filtering events by specific topics (e.g., event names). + pub async fn backfill_events_with_filter(&self, topic_filter: Vec) -> Result> { + self.config.validate()?; + + let mut all_events = Vec::new(); + let mut cursor: Option = None; + let mut current_ledger = self.config.start_ledger; + + let filter = EventFilter::for_contract(self.config.contract_id.clone()) + .with_topic(topic_filter); + + loop { + let request = self.build_request_with_filter(current_ledger, cursor.clone(), filter.clone())?; + let response = self.send_request(request).await?; + + if response.result.events.is_empty() { + break; + } + + // Get the cursor and last ledger before consuming events + let last_event_ledger = response.result.events.last().and_then(|e| e.ledger.parse().ok()); + cursor = response.result.events.last().map(|e| e.paging_token.clone()); + + let events: Vec = response + .result + .events + .into_iter() + .map(|e| self.convert_rpc_event(e)) + .collect(); + + all_events.extend(events.clone()); + + // Check if we've reached the end ledger + if let Some(end_ledger) = self.config.end_ledger { + let max_ledger = all_events + .iter() + .map(|e| e.ledger) + .max() + .unwrap_or(current_ledger); + + if max_ledger >= end_ledger { + all_events.retain(|e| e.ledger < end_ledger); + break; + } + } + + if cursor.is_none() { + break; + } + + // Update current ledger for the next request + if let Some(ledger) = last_event_ledger { + current_ledger = ledger; + } + } + + if all_events.is_empty() { + return Err(BackfillError::NoEventsFound); + } + + all_events.sort_by_key(|e| e.ledger); + Ok(all_events) + } + + /// Builds the RPC request for fetching events. + fn build_request(&self, start_ledger: u32, cursor: Option) -> Result { + let pagination = cursor.map(|c| Pagination { + cursor: Some(c), + limit: self.config.batch_size, + }); + + let params = GetEventsParams { + start_ledger: start_ledger.to_string(), + end_ledger: self.config.end_ledger.map(|l| l.to_string()), + filters: vec![EventFilter::for_contract(self.config.contract_id.clone())], + pagination, + xdr_format: Some("json".to_string()), + }; + + Ok(GetEventsRequest { + jsonrpc: "2.0".to_string(), + id: 1, + method: "getEvents".to_string(), + params, + }) + } + + /// Builds the RPC request with a custom filter. + fn build_request_with_filter( + &self, + start_ledger: u32, + cursor: Option, + filter: EventFilter, + ) -> Result { + let pagination = cursor.map(|c| Pagination { + cursor: Some(c), + limit: self.config.batch_size, + }); + + let params = GetEventsParams { + start_ledger: start_ledger.to_string(), + end_ledger: self.config.end_ledger.map(|l| l.to_string()), + filters: vec![filter], + pagination, + xdr_format: Some("json".to_string()), + }; + + Ok(GetEventsRequest { + jsonrpc: "2.0".to_string(), + id: 1, + method: "getEvents".to_string(), + params, + }) + } + + /// Sends the RPC request to the server. + async fn send_request(&self, request: GetEventsRequest) -> Result { + let response = self + .client + .post(&self.config.rpc_url) + .json(&request) + .send() + .await?; + + if !response.status().is_success() { + return Err(BackfillError::RpcError(format!( + "RPC request failed with status: {}", + response.status() + ))); + } + + let rpc_response: GetEventsResponse = response.json().await?; + Ok(rpc_response) + } + + /// Converts an RPC event to a ContractEvent. + fn convert_rpc_event(&self, rpc_event: RpcEvent) -> ContractEvent { + ContractEvent { + id: rpc_event.id, + ledger: rpc_event + .ledger + .parse() + .unwrap_or(0), + ledger_closed_at: rpc_event.ledger_closed_at, + contract_id: rpc_event.contract_id.unwrap_or_default(), + event_type: rpc_event.event_type, + topics: rpc_event.topic, + data: rpc_event.value, + in_successful_contract_call: rpc_event.in_successful_contract_call, + paging_token: rpc_event.paging_token, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_backfill_config_validation() { + let valid_config = BackfillConfig::new( + "https://soroban-testnet.stellar.org".to_string(), + "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), + 100000, + ); + assert!(valid_config.validate().is_ok()); + + let invalid_config = BackfillConfig { + rpc_url: "".to_string(), + contract_id: "test".to_string(), + start_ledger: 100000, + end_ledger: None, + batch_size: 100, + }; + assert!(invalid_config.validate().is_err()); + } + + #[test] + fn test_event_filter_creation() { + let filter = EventFilter::for_contract("test_contract".to_string()); + assert_eq!(filter.contract_ids, vec!["test_contract"]); + assert_eq!(filter.r#type, Some("contract".to_string())); + + let filter_with_topic = filter.with_topic(vec!["topic1".to_string()]); + assert_eq!(filter_with_topic.topics.len(), 1); + } +} diff --git a/tools/event_backfill/src/error.rs b/tools/event_backfill/src/error.rs new file mode 100644 index 0000000..26183ee --- /dev/null +++ b/tools/event_backfill/src/error.rs @@ -0,0 +1,38 @@ +//! Error types for event backfilling operations. + +use thiserror::Error; + +/// Errors that can occur during event backfilling. +#[derive(Error, Debug)] +pub enum BackfillError { + /// Error from the RPC client + #[error("RPC error: {0}")] + RpcError(String), + + /// HTTP request error + #[error("HTTP request error: {0}")] + HttpError(#[from] reqwest::Error), + + /// JSON parsing error + #[error("JSON parsing error: {0}")] + JsonError(#[from] serde_json::Error), + + /// Invalid configuration + #[error("Invalid configuration: {0}")] + InvalidConfig(String), + + /// No events found + #[error("No events found for the given criteria")] + NoEventsFound, + + /// Pagination error + #[error("Pagination error: {0}")] + PaginationError(String), + + /// XDR decoding error + #[error("XDR decoding error: {0}")] + XdrError(String), +} + +/// Result type for event backfilling operations. +pub type Result = std::result::Result; diff --git a/tools/event_backfill/src/lib.rs b/tools/event_backfill/src/lib.rs new file mode 100644 index 0000000..b648a6f --- /dev/null +++ b/tools/event_backfill/src/lib.rs @@ -0,0 +1,39 @@ +//! # Event Backfill Tool +//! +//! This crate provides functionality to query historical Soroban contract events +//! from Stellar RPC for backfilling purposes. This is useful for reconstructing +//! state after an SDK outage or for analytics. +//! +//! ## Usage +//! +//! ```rust,no_run +//! use event_backfill::{EventBackfiller, BackfillConfig}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let config = BackfillConfig { +//! rpc_url: "https://soroban-testnet.stellar.org".to_string(), +//! contract_id: "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), +//! start_ledger: 100000, +//! end_ledger: None, +//! batch_size: 100, +//! }; +//! +//! let backfiller = EventBackfiller::new(config); +//! let events = backfiller.backfill_events().await?; +//! +//! println!("Backfilled {} events", events.len()); +//! Ok(()) +//! } +//! ``` + +mod error; +mod types; +mod backfiller; + +#[cfg(test)] +mod tests; + +pub use error::{BackfillError, Result}; +pub use types::{BackfillConfig, ContractEvent, EventFilter}; +pub use backfiller::EventBackfiller; diff --git a/tools/event_backfill/src/tests.rs b/tools/event_backfill/src/tests.rs new file mode 100644 index 0000000..d52ea71 --- /dev/null +++ b/tools/event_backfill/src/tests.rs @@ -0,0 +1,207 @@ +//! Unit tests for event backfilling. + +use crate::types::{BackfillConfig, ContractEvent, EventFilter}; + +#[test] +fn test_backfill_config_valid() { + let config = BackfillConfig::new( + "https://soroban-testnet.stellar.org".to_string(), + "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), + 100000, + ); + assert!(config.validate().is_ok()); +} + +#[test] +fn test_backfill_config_empty_rpc_url() { + let config = BackfillConfig { + rpc_url: "".to_string(), + contract_id: "test".to_string(), + start_ledger: 100000, + end_ledger: None, + batch_size: 100, + }; + assert!(config.validate().is_err()); +} + +#[test] +fn test_backfill_config_empty_contract_id() { + let config = BackfillConfig { + rpc_url: "https://test.com".to_string(), + contract_id: "".to_string(), + start_ledger: 100000, + end_ledger: None, + batch_size: 100, + }; + assert!(config.validate().is_err()); +} + +#[test] +fn test_backfill_config_invalid_batch_size_zero() { + let config = BackfillConfig { + rpc_url: "https://test.com".to_string(), + contract_id: "test".to_string(), + start_ledger: 100000, + end_ledger: None, + batch_size: 0, + }; + assert!(config.validate().is_err()); +} + +#[test] +fn test_backfill_config_invalid_batch_size_too_large() { + let config = BackfillConfig { + rpc_url: "https://test.com".to_string(), + contract_id: "test".to_string(), + start_ledger: 100000, + end_ledger: None, + batch_size: 10001, + }; + assert!(config.validate().is_err()); +} + +#[test] +fn test_backfill_config_end_ledger_before_start() { + let config = BackfillConfig { + rpc_url: "https://test.com".to_string(), + contract_id: "test".to_string(), + start_ledger: 100000, + end_ledger: Some(99999), + batch_size: 100, + }; + assert!(config.validate().is_err()); +} + +#[test] +fn test_backfill_config_end_ledger_equal_start() { + let config = BackfillConfig { + rpc_url: "https://test.com".to_string(), + contract_id: "test".to_string(), + start_ledger: 100000, + end_ledger: Some(100000), + batch_size: 100, + }; + assert!(config.validate().is_err()); +} + +#[test] +fn test_backfill_config_valid_end_ledger() { + let config = BackfillConfig { + rpc_url: "https://test.com".to_string(), + contract_id: "test".to_string(), + start_ledger: 100000, + end_ledger: Some(200000), + batch_size: 100, + }; + assert!(config.validate().is_ok()); +} + +#[test] +fn test_event_filter_for_contract() { + let filter = EventFilter::for_contract("test_contract".to_string()); + assert_eq!(filter.contract_ids, vec!["test_contract"]); + assert_eq!(filter.r#type, Some("contract".to_string())); + assert!(filter.topics.is_empty()); +} + +#[test] +fn test_event_filter_with_topic() { + let filter = EventFilter::for_contract("test_contract".to_string()) + .with_topic(vec!["topic1".to_string()]); + assert_eq!(filter.contract_ids, vec!["test_contract"]); + assert_eq!(filter.topics.len(), 1); + assert_eq!(filter.topics[0], vec!["topic1"]); +} + +#[test] +fn test_event_filter_multiple_topics() { + let filter = EventFilter::for_contract("test_contract".to_string()) + .with_topic(vec!["topic1".to_string()]) + .with_topic(vec!["topic2".to_string()]); + assert_eq!(filter.topics.len(), 2); +} + +#[test] +fn test_contract_event_creation() { + let event = ContractEvent { + id: "test-id".to_string(), + ledger: 100000, + ledger_closed_at: "2024-01-01T00:00:00Z".to_string(), + contract_id: "test-contract".to_string(), + event_type: "contract".to_string(), + topics: vec!["topic1".to_string()], + data: "data".to_string(), + in_successful_contract_call: true, + paging_token: "token".to_string(), + }; + assert_eq!(event.id, "test-id"); + assert_eq!(event.ledger, 100000); +} + +#[test] +fn test_contract_event_topic_out_of_bounds() { + let event = ContractEvent { + id: "test-id".to_string(), + ledger: 100000, + ledger_closed_at: "2024-01-01T00:00:00Z".to_string(), + contract_id: "test-contract".to_string(), + event_type: "contract".to_string(), + topics: vec![], + data: "data".to_string(), + in_successful_contract_call: true, + paging_token: "token".to_string(), + }; + let result = event.topic_as_bytes(0); + assert!(result.is_err()); +} + +#[test] +fn test_backfill_config_serialization() { + let config = BackfillConfig::new( + "https://soroban-testnet.stellar.org".to_string(), + "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), + 100000, + ); + + let json = serde_json::to_string(&config).unwrap(); + let parsed: BackfillConfig = serde_json::from_str(&json).unwrap(); + + assert_eq!(parsed.rpc_url, config.rpc_url); + assert_eq!(parsed.contract_id, config.contract_id); + assert_eq!(parsed.start_ledger, config.start_ledger); +} + +#[test] +fn test_contract_event_serialization() { + let event = ContractEvent { + id: "test-id".to_string(), + ledger: 100000, + ledger_closed_at: "2024-01-01T00:00:00Z".to_string(), + contract_id: "test-contract".to_string(), + event_type: "contract".to_string(), + topics: vec!["topic1".to_string()], + data: "data".to_string(), + in_successful_contract_call: true, + paging_token: "token".to_string(), + }; + + let json = serde_json::to_string(&event).unwrap(); + let parsed: ContractEvent = serde_json::from_str(&json).unwrap(); + + assert_eq!(parsed.id, event.id); + assert_eq!(parsed.ledger, event.ledger); + assert_eq!(parsed.contract_id, event.contract_id); +} + +#[test] +fn test_event_filter_serialization() { + let filter = EventFilter::for_contract("test_contract".to_string()); + + let json = serde_json::to_string(&filter).unwrap(); + let parsed: EventFilter = serde_json::from_str(&json).unwrap(); + + assert_eq!(parsed.contract_ids, filter.contract_ids); + assert_eq!(parsed.r#type, filter.r#type); + // Topics field is skipped when empty due to skip_serializing_if + assert!(parsed.topics.is_empty()); +} diff --git a/tools/event_backfill/src/types.rs b/tools/event_backfill/src/types.rs new file mode 100644 index 0000000..25cc90c --- /dev/null +++ b/tools/event_backfill/src/types.rs @@ -0,0 +1,214 @@ +//! Types for event backfilling configuration and data structures. + +use serde::{Deserialize, Serialize}; + +/// Configuration for event backfilling. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BackfillConfig { + /// RPC server URL (e.g., "https://soroban-testnet.stellar.org") + pub rpc_url: String, + + /// Contract ID to filter events for + pub contract_id: String, + + /// Starting ledger sequence (inclusive) + pub start_ledger: u32, + + /// Optional ending ledger sequence (exclusive). If None, fetches to latest. + pub end_ledger: Option, + + /// Number of events to fetch per batch (max 10000 per RPC limits) + pub batch_size: u32, +} + +impl BackfillConfig { + /// Creates a new BackfillConfig with sensible defaults. + pub fn new(rpc_url: String, contract_id: String, start_ledger: u32) -> Self { + Self { + rpc_url, + contract_id, + start_ledger, + end_ledger: None, + batch_size: 100, + } + } + + /// Validates the configuration. + pub fn validate(&self) -> crate::Result<()> { + if self.rpc_url.is_empty() { + return Err(crate::BackfillError::InvalidConfig( + "RPC URL cannot be empty".to_string(), + )); + } + if self.contract_id.is_empty() { + return Err(crate::BackfillError::InvalidConfig( + "Contract ID cannot be empty".to_string(), + )); + } + if self.batch_size == 0 || self.batch_size > 10000 { + return Err(crate::BackfillError::InvalidConfig( + "Batch size must be between 1 and 10000".to_string(), + )); + } + if let Some(end) = self.end_ledger { + if end <= self.start_ledger { + return Err(crate::BackfillError::InvalidConfig( + "End ledger must be greater than start ledger".to_string(), + )); + } + } + Ok(()) + } +} + +/// Filter for contract events. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EventFilter { + /// Event type filter (contract, system, diagnostic) + #[serde(skip_serializing_if = "Option::is_none")] + pub r#type: Option, + + /// Contract IDs to filter (max 5) + #[serde(skip_serializing_if = "Vec::is_empty")] + pub contract_ids: Vec, + + /// Topic filters + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub topics: Vec>, +} + +impl EventFilter { + /// Creates a new filter for a specific contract. + pub fn for_contract(contract_id: String) -> Self { + Self { + r#type: Some("contract".to_string()), + contract_ids: vec![contract_id], + topics: vec![], + } + } + + /// Adds a topic filter. + pub fn with_topic(mut self, topic: Vec) -> Self { + self.topics.push(topic); + self + } +} + +/// A parsed contract event. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ContractEvent { + /// Event ID + pub id: String, + + /// Ledger sequence where the event occurred + pub ledger: u32, + + /// Timestamp when the ledger closed + pub ledger_closed_at: String, + + /// Contract ID that emitted the event + pub contract_id: String, + + /// Event type (contract, system, diagnostic) + pub event_type: String, + + /// Event topics (base64-encoded XDR) + pub topics: Vec, + + /// Event data (base64-encoded XDR) + pub data: String, + + /// Whether the event was in a successful contract call + pub in_successful_contract_call: bool, + + /// Paging token for pagination + pub paging_token: String, +} + +impl ContractEvent { + /// Returns the event data as base64-encoded bytes. + pub fn data_as_bytes(&self) -> crate::Result> { + use base64::{Engine as _, engine::general_purpose}; + general_purpose::STANDARD.decode(&self.data) + .map_err(|e| crate::BackfillError::XdrError(format!("Base64 decode error: {}", e))) + } + + /// Returns a topic as base64-encoded bytes. + pub fn topic_as_bytes(&self, index: usize) -> crate::Result> { + if index >= self.topics.len() { + return Err(crate::BackfillError::XdrError( + "Topic index out of bounds".to_string(), + )); + } + + use base64::{Engine as _, engine::general_purpose}; + general_purpose::STANDARD.decode(&self.topics[index]) + .map_err(|e| crate::BackfillError::XdrError(format!("Base64 decode error: {}", e))) + } +} + +/// RPC request for getEvents. +#[derive(Debug, Serialize)] +pub(crate) struct GetEventsRequest { + pub jsonrpc: String, + pub id: u64, + pub method: String, + pub params: GetEventsParams, +} + +/// Parameters for getEvents RPC call. +#[derive(Debug, Serialize)] +pub(crate) struct GetEventsParams { + pub start_ledger: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub end_ledger: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub filters: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub pagination: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub xdr_format: Option, +} + +/// Pagination parameters. +#[derive(Debug, Serialize)] +pub(crate) struct Pagination { + #[serde(skip_serializing_if = "Option::is_none")] + pub cursor: Option, + pub limit: u32, +} + +/// RPC response from getEvents. +#[derive(Debug, Deserialize)] +pub(crate) struct GetEventsResponse { + pub jsonrpc: String, + pub id: u64, + pub result: GetEventsResult, +} + +/// Result field from getEvents response. +#[derive(Debug, Deserialize)] +pub(crate) struct GetEventsResult { + pub events: Vec, + pub latest_ledger: String, +} + +/// Event as returned by RPC. +#[derive(Debug, Deserialize)] +pub(crate) struct RpcEvent { + pub id: String, + pub ledger: String, + pub ledger_closed_at: String, + #[serde(rename = "type")] + pub event_type: String, + #[serde(default)] + pub contract_id: Option, + #[serde(default)] + pub topic: Vec, + #[serde(default)] + pub value: String, + #[serde(default)] + pub in_successful_contract_call: bool, + #[serde(default)] + pub paging_token: String, +} diff --git a/tools/event_backfill/tests/integration_test.rs b/tools/event_backfill/tests/integration_test.rs new file mode 100644 index 0000000..709d3cf --- /dev/null +++ b/tools/event_backfill/tests/integration_test.rs @@ -0,0 +1,176 @@ +//! Integration tests for event backfilling. +//! +//! These tests require a local Stellar sandbox or access to testnet. +//! Run with: cargo test --test integration_test + +use event_backfill::{BackfillConfig, EventBackfiller}; + +#[tokio::test] +#[ignore = "Requires local Stellar sandbox or testnet access"] +async fn test_backfill_events_testnet() { + // This test uses Stellar testnet - requires internet access + let config = BackfillConfig::new( + "https://soroban-testnet.stellar.org".to_string(), + "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), // XLM contract on testnet + 100000, + ); + + let backfiller = EventBackfiller::new(config.clone()); + + // Try to backfill events (may fail if no events exist in range) + let result = backfiller.backfill_events().await; + + // We expect either success or NoEventsFound error + match result { + Ok(events) => { + println!("Successfully backfilled {} events", events.len()); + assert!(!events.is_empty()); + + // Verify event structure + for event in events { + assert!(!event.id.is_empty()); + assert!(event.ledger > 0); + assert!(!event.contract_id.is_empty()); + } + } + Err(event_backfill::BackfillError::NoEventsFound) => { + println!("No events found in the specified ledger range - this is acceptable"); + } + Err(e) => { + panic!("Unexpected error: {:?}", e); + } + } +} + +#[tokio::test] +#[ignore = "Requires local Stellar sandbox"] +async fn test_backfill_events_local_sandbox() { + // This test requires a local Stellar sandbox running on localhost:8000 + let config = BackfillConfig::new( + "http://localhost:8000".to_string(), + "YOUR_CONTRACT_ID_HERE".to_string(), // Replace with actual contract ID + 1, + ); + + let backfiller = EventBackfiller::new(config.clone()); + + let result = backfiller.backfill_events().await; + + match result { + Ok(events) => { + println!("Successfully backfilled {} events from local sandbox", events.len()); + assert!(!events.is_empty()); + } + Err(event_backfill::BackfillError::NoEventsFound) => { + println!("No events found - ensure contract has emitted events"); + } + Err(e) => { + panic!("Unexpected error: {:?}", e); + } + } +} + +#[tokio::test] +#[ignore = "Requires local Stellar sandbox or testnet access"] +async fn test_backfill_with_topic_filter() { + let config = BackfillConfig::new( + "https://soroban-testnet.stellar.org".to_string(), + "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), + 100000, + ); + + let backfiller = EventBackfiller::new(config.clone()); + + // Filter for 'transfer' events (base64 encoded) + let topic_filter = vec!["AAAADwAAAAh0cmFuc2Zlcg==".to_string()]; + + let result = backfiller.backfill_events_with_filter(topic_filter).await; + + match result { + Ok(events) => { + println!("Successfully backfilled {} events with topic filter", events.len()); + assert!(!events.is_empty()); + + // Verify all events have the expected topic + for event in events { + assert!(!event.topics.is_empty()); + } + } + Err(event_backfill::BackfillError::NoEventsFound) => { + println!("No events found with the specified topic filter"); + } + Err(e) => { + panic!("Unexpected error: {:?}", e); + } + } +} + +#[tokio::test] +#[ignore = "Requires local Stellar sandbox or testnet access"] +async fn test_backfill_with_ledger_range() { + let config = BackfillConfig { + rpc_url: "https://soroban-testnet.stellar.org".to_string(), + contract_id: "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), + start_ledger: 100000, + end_ledger: Some(100100), // Small range + batch_size: 10, + }; + + let backfiller = EventBackfiller::new(config.clone()); + + let result = backfiller.backfill_events().await; + + match result { + Ok(events) => { + println!("Successfully backfilled {} events in ledger range", events.len()); + + // Verify all events are within the specified range + for event in events { + assert!(event.ledger >= config.start_ledger); + assert!(event.ledger < config.end_ledger.unwrap()); + } + } + Err(event_backfill::BackfillError::NoEventsFound) => { + println!("No events found in the specified ledger range"); + } + Err(e) => { + panic!("Unexpected error: {:?}", e); + } + } +} + +#[tokio::test] +#[ignore = "Requires local Stellar sandbox or testnet access"] +async fn test_event_data_decoding() { + let config = BackfillConfig::new( + "https://soroban-testnet.stellar.org".to_string(), + "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC".to_string(), + 100000, + ); + + let backfiller = EventBackfiller::new(config.clone()); + + let result = backfiller.backfill_events().await; + + match result { + Ok(events) => { + if !events.is_empty() { + // Test decoding the first event's data + let event = &events[0]; + let data_bytes = event.data_as_bytes(); + assert!(data_bytes.is_ok()); + + if !event.topics.is_empty() { + let topic_bytes = event.topic_as_bytes(0); + assert!(topic_bytes.is_ok()); + } + } + } + Err(event_backfill::BackfillError::NoEventsFound) => { + println!("No events found - skipping decoding test"); + } + Err(e) => { + panic!("Unexpected error: {:?}", e); + } + } +}