diff --git a/.github/workflows/build-availability-oracle.yml b/.github/workflows/build-availability-oracle.yml index 44479ae..60e49ff 100644 --- a/.github/workflows/build-availability-oracle.yml +++ b/.github/workflows/build-availability-oracle.yml @@ -4,6 +4,8 @@ on: push: branches: - main + tags: + - 'v*' env: BASE_IMAGE: ghcr.io/graphprotocol/availability-oracle @@ -19,12 +21,15 @@ jobs: - name: Docker meta id: docker_meta - uses: docker/metadata-action@v3 + uses: docker/metadata-action@v5 with: images: ${{ env.BASE_IMAGE }} tags: | type=sha - type=raw,value=latest + type=raw,value=latest,enable={{is_default_branch}} + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=semver,pattern={{major}},enable=${{ !startsWith(github.ref, 'refs/tags/v0.') }} - name: Login to GitHub Container Registry uses: docker/login-action@v1 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eb0a20c..dae6ce9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,7 @@ name: Continuous Integration on: push: - branches: [master] + branches: [main] pull_request: types: [opened, synchronize, reopened] diff --git a/Cargo.lock b/Cargo.lock index d38b609..dca6f8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,7 +146,7 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "availability-oracle" -version = "0.3.4" +version = "1.4.0" dependencies = [ "async-trait", "bytes", @@ -155,6 +155,7 @@ dependencies = [ "ethers", "graphql-parser", "hex", + "json-oracle-encoder", "moka", "multibase", "reqwest", diff --git a/availability-oracle/Cargo.toml b/availability-oracle/Cargo.toml index 91f14e0..aa907e6 100644 --- a/availability-oracle/Cargo.toml +++ b/availability-oracle/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "availability-oracle" -version = "0.3.4" +version = "1.4.0" authors = ["Leonardo Yvens "] edition = "2018" @@ -27,3 +27,4 @@ graphql-parser = "0.4.0" secp256k1 = "0.28.2" ethers = "2.0.14" url = "2.5.0" +json-oracle-encoder = { path = "../crates/json-oracle-encoder" } diff --git a/availability-oracle/src/data_edge.rs b/availability-oracle/src/data_edge.rs new file mode 100644 index 0000000..41b0d37 --- /dev/null +++ b/availability-oracle/src/data_edge.rs @@ -0,0 +1,451 @@ +use crate::graph_monitoring_subgraph::{GraphMonitoringSubgraph, OracleConfig}; +use common::prelude::*; +use ethers::abi::Address; + +/// Result of checking config status against the subgraph. +pub enum ConfigStatus { + /// Config matches what's in the subgraph + Unchanged, + /// Config differs from subgraph (includes list of changed field names) + Changed(Vec<&'static str>), + /// Oracle not found in subgraph (first time posting) + NotFound, + /// Failed to fetch from subgraph + FetchError(Error), +} +use ethers::core::types::U256; +use ethers::middleware::SignerMiddleware; +use ethers::providers::{Http, Middleware, Provider}; +use ethers::signers::{LocalWallet, Signer}; +use ethers::types::TransactionRequest; +use secp256k1::SecretKey; +use std::sync::Arc; +use std::time::Duration; +use url::Url; + +/// Extracts a subgraph deployment ID (CID) from a gateway URL. +/// Expects URLs in the format: https://gateway.thegraph.com/api/[api-key]/deployments/id/Qm... +pub fn extract_deployment_id_from_url(url: &str) -> Result { + let url = Url::parse(url).map_err(|e| anyhow!("Invalid URL: {}", e))?; + + let path_segments: Vec<&str> = url.path().split('/').collect(); + for (i, segment) in path_segments.iter().enumerate() { + if *segment == "id" && i + 1 < path_segments.len() { + let deployment_id = path_segments[i + 1]; + if deployment_id.starts_with("Qm") { + return Ok(deployment_id.to_string()); + } + } + } + + Err(anyhow!( + "Could not extract deployment ID from URL: {}. Expected format: .../deployments/id/Qm...", + url + )) +} + +/// Configuration needed to build an OracleConfig from CLI parameters. +pub struct OracleConfigParams<'a> { + pub ipfs_concurrency: usize, + pub ipfs_timeout: Duration, + pub min_signal: u64, + pub period: Duration, + pub grace_period: u64, + pub supported_data_source_kinds: &'a [String], + pub network_subgraph_url: &'a str, + pub epoch_block_oracle_subgraph_url: &'a str, + pub subgraph_availability_manager_contract: Option
, + pub oracle_index: Option, +} + +/// Builds an OracleConfig from CLI config parameters. +pub fn build_oracle_config(params: &OracleConfigParams) -> Result { + let network_subgraph_deployment_id = + extract_deployment_id_from_url(params.network_subgraph_url)?; + let epoch_block_oracle_subgraph_deployment_id = + extract_deployment_id_from_url(params.epoch_block_oracle_subgraph_url)?; + + Ok(OracleConfig { + version: format!("v{}", env!("CARGO_PKG_VERSION")), + ipfs_concurrency: params.ipfs_concurrency.to_string(), + ipfs_timeout: params.ipfs_timeout.as_millis().to_string(), + min_signal: params.min_signal.to_string(), + period: params.period.as_secs().to_string(), + grace_period: params.grace_period.to_string(), + supported_data_source_kinds: params.supported_data_source_kinds.join(","), + network_subgraph_deployment_id, + epoch_block_oracle_subgraph_deployment_id, + subgraph_availability_manager_contract: params + .subgraph_availability_manager_contract + .map(|a| format!("{:?}", a)) + .unwrap_or_default(), + oracle_index: params + .oracle_index + .map(|i| i.to_string()) + .unwrap_or_default(), + }) +} + +/// Checks the local config against the subgraph to determine if it has changed. +pub async fn check_config_status( + local_config: &OracleConfig, + monitoring_subgraph: &impl GraphMonitoringSubgraph, + oracle_index: u64, +) -> ConfigStatus { + match monitoring_subgraph.fetch_oracle_config(oracle_index).await { + Ok(Some(current_config)) => { + if *local_config == current_config { + ConfigStatus::Unchanged + } else { + let changed_fields = local_config.diff(¤t_config); + ConfigStatus::Changed(changed_fields) + } + } + Ok(None) => ConfigStatus::NotFound, + Err(e) => ConfigStatus::FetchError(e), + } +} + +pub struct DataEdgeContract { + provider: Arc, LocalWallet>>, + contract_address: Address, + logger: Logger, +} + +impl DataEdgeContract { + pub async fn new( + signing_key: &SecretKey, + rpc_url: Url, + contract_address: Address, + logger: Logger, + ) -> Result { + let http_client = reqwest::ClientBuilder::new() + .tcp_nodelay(true) + .timeout(Duration::from_secs(30)) + .build() + .unwrap(); + let provider = Provider::new(Http::new_with_client(rpc_url, http_client)); + let chain_id = provider.get_chainid().await?.as_u64(); + let wallet = LocalWallet::from_bytes(signing_key.as_ref()) + .unwrap() + .with_chain_id(chain_id); + let provider = Arc::new(SignerMiddleware::new(provider, wallet)); + + Ok(Self { + provider, + contract_address, + logger, + }) + } + + /// Posts the oracle configuration to the DataEdge contract if it has changed. + /// Returns Ok(true) if posted, Ok(false) if skipped because unchanged. + pub async fn post_config_if_changed( + &self, + local_config: &OracleConfig, + monitoring_subgraph: &impl GraphMonitoringSubgraph, + oracle_index: u64, + ) -> Result { + match check_config_status(local_config, monitoring_subgraph, oracle_index).await { + ConfigStatus::Unchanged => { + info!(self.logger, "Config unchanged, skipping DataEdge post"; + "oracle_index" => oracle_index + ); + return Ok(false); + } + ConfigStatus::Changed(changed_fields) => { + info!(self.logger, "Config changed, will post to DataEdge"; + "oracle_index" => oracle_index, + "changed_fields" => changed_fields.join(",") + ); + } + ConfigStatus::NotFound => { + info!(self.logger, "Oracle not found in subgraph, posting initial config"; + "oracle_index" => oracle_index + ); + } + ConfigStatus::FetchError(e) => { + warn!(self.logger, "Failed to fetch current oracle config from subgraph, will post anyway"; + "oracle_index" => oracle_index, + "error" => format!("{:#}", e) + ); + } + } + + self.post_config(local_config).await?; + Ok(true) + } + + /// Posts the oracle configuration to the DataEdge contract. + async fn post_config(&self, config: &OracleConfig) -> Result<(), Error> { + // Build the configuration JSON for posting + let config_json = serde_json::json!({ + "version": &config.version, + "config": { + "ipfs_concurrency": &config.ipfs_concurrency, + "ipfs_timeout": &config.ipfs_timeout, + "min_signal": &config.min_signal, + "period": &config.period, + "grace_period": &config.grace_period, + "supported_data_source_kinds": &config.supported_data_source_kinds, + "network_subgraph_deloyment_id": &config.network_subgraph_deployment_id, + "epoch_block_oracle_subgraph_deloyment_id": &config.epoch_block_oracle_subgraph_deployment_id, + "subgraph_availability_manager_contract": &config.subgraph_availability_manager_contract, + "oracle_index": &config.oracle_index, + } + }); + + info!(self.logger, "Posting oracle configuration to DataEdge"; + "version" => &config.version, + "data_edge_contract" => format!("{:?}", self.contract_address), + "network_subgraph_deployment_id" => &config.network_subgraph_deployment_id, + "epoch_block_oracle_subgraph_deployment_id" => &config.epoch_block_oracle_subgraph_deployment_id, + ); + + let calldata = json_oracle_encoder::json_to_calldata(config_json) + .map_err(|e| anyhow!("Failed to encode config as calldata: {}", e))?; + + let gas_price = self.provider.get_gas_price().await?; + let gas_price_with_buffer = gas_price * U256::from(120) / U256::from(100); + + let tx = TransactionRequest::new() + .to(self.contract_address) + .data(calldata.clone()); + + let estimated_gas = self.provider.estimate_gas(&tx.clone().into(), None).await?; + let gas_with_buffer = estimated_gas * U256::from(120) / U256::from(100); + + let tx = tx.gas(gas_with_buffer).gas_price(gas_price_with_buffer); + + let pending_tx = self.provider.send_transaction(tx, None).await?; + info!(self.logger, "DataEdge transaction sent, waiting for confirmation"; + "tx_hash" => format!("{:?}", pending_tx.tx_hash()), + "gas_price" => gas_price_with_buffer.as_u64(), + "gas_limit" => gas_with_buffer.as_u64() + ); + + let receipt = pending_tx + .await? + .ok_or_else(|| anyhow!("DataEdge transaction was dropped from mempool"))?; + + info!(self.logger, "Successfully posted config to DataEdge"; + "tx_hash" => format!("{:?}", receipt.transaction_hash), + "block_number" => receipt.block_number.map(|b| b.as_u64()), + "gas_used" => receipt.gas_used.map(|g| g.as_u64()), + ); + + Ok(()) + } +} + +/// Logs what would happen in dry-run mode by checking against the subgraph. +pub async fn log_dry_run_config( + logger: &Logger, + local_config: &OracleConfig, + monitoring_subgraph: Option<&impl GraphMonitoringSubgraph>, + oracle_index: Option, +) { + if let (Some(subgraph), Some(oracle_index)) = (monitoring_subgraph, oracle_index) { + match check_config_status(local_config, subgraph, oracle_index).await { + ConfigStatus::Unchanged => { + info!(logger, "Config unchanged, would skip DataEdge post (dry-run)"; + "oracle_index" => oracle_index + ); + } + ConfigStatus::Changed(changed_fields) => { + info!(logger, "Config changed, would post to DataEdge (dry-run)"; + "oracle_index" => oracle_index, + "changed_fields" => changed_fields.join(",") + ); + } + ConfigStatus::NotFound => { + info!(logger, "Oracle not found in subgraph, would post initial config (dry-run)"; + "oracle_index" => oracle_index + ); + } + ConfigStatus::FetchError(e) => { + warn!(logger, "Failed to fetch current config (dry-run)"; + "error" => format!("{:#}", e) + ); + } + } + } + + info!(logger, "Local config values"; + "version" => &local_config.version, + "ipfs_concurrency" => &local_config.ipfs_concurrency, + "ipfs_timeout" => &local_config.ipfs_timeout, + "min_signal" => &local_config.min_signal, + "period" => &local_config.period, + "grace_period" => &local_config.grace_period, + "supported_data_source_kinds" => &local_config.supported_data_source_kinds, + "network_subgraph_deployment_id" => &local_config.network_subgraph_deployment_id, + "epoch_block_oracle_subgraph_deployment_id" => &local_config.epoch_block_oracle_subgraph_deployment_id, + "subgraph_availability_manager_contract" => &local_config.subgraph_availability_manager_contract, + "oracle_index" => &local_config.oracle_index, + ); +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + + fn test_config() -> OracleConfig { + OracleConfig { + version: "v1.0.0".to_string(), + ipfs_concurrency: "10".to_string(), + ipfs_timeout: "30000".to_string(), + min_signal: "100".to_string(), + period: "60".to_string(), + grace_period: "10".to_string(), + supported_data_source_kinds: "ethereum,file/ipfs".to_string(), + network_subgraph_deployment_id: "Qm123".to_string(), + epoch_block_oracle_subgraph_deployment_id: "Qm456".to_string(), + subgraph_availability_manager_contract: "0x123".to_string(), + oracle_index: "0".to_string(), + } + } + + struct MockSubgraphUnchanged(OracleConfig); + + #[async_trait] + impl GraphMonitoringSubgraph for MockSubgraphUnchanged { + async fn fetch_oracle_config( + &self, + _oracle_index: u64, + ) -> Result, Error> { + Ok(Some(self.0.clone())) + } + } + + struct MockSubgraphChanged(OracleConfig); + + #[async_trait] + impl GraphMonitoringSubgraph for MockSubgraphChanged { + async fn fetch_oracle_config( + &self, + _oracle_index: u64, + ) -> Result, Error> { + Ok(Some(self.0.clone())) + } + } + + struct MockSubgraphNotFound; + + #[async_trait] + impl GraphMonitoringSubgraph for MockSubgraphNotFound { + async fn fetch_oracle_config( + &self, + _oracle_index: u64, + ) -> Result, Error> { + Ok(None) + } + } + + struct MockSubgraphError; + + #[async_trait] + impl GraphMonitoringSubgraph for MockSubgraphError { + async fn fetch_oracle_config( + &self, + _oracle_index: u64, + ) -> Result, Error> { + Err(anyhow!("Mock fetch error")) + } + } + + #[tokio::test] + async fn test_check_config_status_unchanged() { + let config = test_config(); + let mock = MockSubgraphUnchanged(config.clone()); + + let status = check_config_status(&config, &mock, 0).await; + assert!(matches!(status, ConfigStatus::Unchanged)); + } + + #[tokio::test] + async fn test_check_config_status_changed() { + let local_config = test_config(); + let mut remote_config = test_config(); + remote_config.version = "v2.0.0".to_string(); + remote_config.min_signal = "200".to_string(); + let mock = MockSubgraphChanged(remote_config); + + let status = check_config_status(&local_config, &mock, 0).await; + match status { + ConfigStatus::Changed(fields) => { + assert!(fields.contains(&"version")); + assert!(fields.contains(&"min_signal")); + assert_eq!(fields.len(), 2); + } + _ => panic!("Expected ConfigStatus::Changed"), + } + } + + #[tokio::test] + async fn test_check_config_status_not_found() { + let config = test_config(); + let mock = MockSubgraphNotFound; + + let status = check_config_status(&config, &mock, 0).await; + assert!(matches!(status, ConfigStatus::NotFound)); + } + + #[tokio::test] + async fn test_check_config_status_fetch_error() { + let config = test_config(); + let mock = MockSubgraphError; + + let status = check_config_status(&config, &mock, 0).await; + match status { + ConfigStatus::FetchError(e) => { + assert!(e.to_string().contains("Mock fetch error")); + } + _ => panic!("Expected ConfigStatus::FetchError"), + } + } + + #[test] + fn test_extract_deployment_id_from_url_valid() { + // Standard gateway URL format + let url = "https://gateway.thegraph.com/api/some-api-key/deployments/id/QmSWxvd8SaQK6qZKJ7xtfxCCGoRzGnoi2WNzmJYYJW9BXY"; + assert_eq!( + extract_deployment_id_from_url(url).unwrap(), + "QmSWxvd8SaQK6qZKJ7xtfxCCGoRzGnoi2WNzmJYYJW9BXY" + ); + + // Another gateway URL + let url = "https://gateway-arbitrum.network.thegraph.com/api/key123/deployments/id/QmQEGDTb3xeykCXLdWx7pPX3qeeGMUvHmGWP4SpMkv5QJf"; + assert_eq!( + extract_deployment_id_from_url(url).unwrap(), + "QmQEGDTb3xeykCXLdWx7pPX3qeeGMUvHmGWP4SpMkv5QJf" + ); + + // URL with query parameters + let url = "https://gateway.thegraph.com/api/key/deployments/id/QmSWxvd8SaQK6qZKJ7xtfxCCGoRzGnoi2WNzmJYYJW9BXY?foo=bar"; + assert_eq!( + extract_deployment_id_from_url(url).unwrap(), + "QmSWxvd8SaQK6qZKJ7xtfxCCGoRzGnoi2WNzmJYYJW9BXY" + ); + } + + #[test] + fn test_extract_deployment_id_from_url_invalid() { + // Missing /id/ segment + let url = "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-arbitrum"; + assert!(extract_deployment_id_from_url(url).is_err()); + + // /id/ segment but no Qm prefix + let url = "https://gateway.thegraph.com/api/key/deployments/id/not-a-cid"; + assert!(extract_deployment_id_from_url(url).is_err()); + + // Invalid URL + let url = "not-a-valid-url"; + assert!(extract_deployment_id_from_url(url).is_err()); + + // Empty URL + let url = ""; + assert!(extract_deployment_id_from_url(url).is_err()); + } +} diff --git a/availability-oracle/src/graph_monitoring_subgraph.rs b/availability-oracle/src/graph_monitoring_subgraph.rs new file mode 100644 index 0000000..b602e16 --- /dev/null +++ b/availability-oracle/src/graph_monitoring_subgraph.rs @@ -0,0 +1,181 @@ +use async_trait::async_trait; +use common::prelude::*; +use reqwest::Client; +use serde_derive::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::time::Duration; + +/// Represents the oracle configuration as stored in the graph-monitoring subgraph. +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OracleConfig { + pub version: String, + pub ipfs_concurrency: String, + pub ipfs_timeout: String, + pub min_signal: String, + pub period: String, + pub grace_period: String, + pub supported_data_source_kinds: String, + pub network_subgraph_deployment_id: String, + pub epoch_block_oracle_subgraph_deployment_id: String, + pub subgraph_availability_manager_contract: String, + pub oracle_index: String, +} + +impl OracleConfig { + /// Returns a list of field names that differ between two configs. + pub fn diff(&self, other: &OracleConfig) -> Vec<&'static str> { + let mut changed = Vec::new(); + if self.version != other.version { + changed.push("version"); + } + if self.ipfs_concurrency != other.ipfs_concurrency { + changed.push("ipfs_concurrency"); + } + if self.ipfs_timeout != other.ipfs_timeout { + changed.push("ipfs_timeout"); + } + if self.min_signal != other.min_signal { + changed.push("min_signal"); + } + if self.period != other.period { + changed.push("period"); + } + if self.grace_period != other.grace_period { + changed.push("grace_period"); + } + if self.supported_data_source_kinds != other.supported_data_source_kinds { + changed.push("supported_data_source_kinds"); + } + if self.network_subgraph_deployment_id != other.network_subgraph_deployment_id { + changed.push("network_subgraph_deployment_id"); + } + if self.epoch_block_oracle_subgraph_deployment_id + != other.epoch_block_oracle_subgraph_deployment_id + { + changed.push("epoch_block_oracle_subgraph_deployment_id"); + } + if self.subgraph_availability_manager_contract + != other.subgraph_availability_manager_contract + { + changed.push("subgraph_availability_manager_contract"); + } + if self.oracle_index != other.oracle_index { + changed.push("oracle_index"); + } + changed + } +} + +/// Trait for interacting with the graph-monitoring subgraph. +#[async_trait] +pub trait GraphMonitoringSubgraph { + /// Fetches the current oracle configuration from the subgraph. + async fn fetch_oracle_config(&self, oracle_index: u64) -> Result, Error>; +} + +pub struct GraphMonitoringSubgraphImpl { + endpoint: String, + client: Client, +} + +impl GraphMonitoringSubgraphImpl { + pub fn new(endpoint: String) -> Self { + GraphMonitoringSubgraphImpl { + endpoint, + client: Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .unwrap(), + } + } +} + +#[derive(Serialize)] +struct GraphqlRequest { + query: &'static str, + variables: BTreeMap, +} + +#[derive(Deserialize)] +struct GraphqlResponse { + data: Option, + errors: Option>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ResponseData { + global_state: Option, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct GlobalState { + active_oracles: Vec, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct Oracle { + latest_config: OracleConfig, +} + +const ORACLE_CONFIG_QUERY: &str = r#" + query($oracleIndex: String!) { + globalState(id: "0") { + activeOracles(where: { index: $oracleIndex }) { + latestConfig { + version + ipfsConcurrency + ipfsTimeout + minSignal + period + gracePeriod + supportedDataSourceKinds + networkSubgraphDeploymentId + epochBlockOracleSubgraphDeploymentId + subgraphAvailabilityManagerContract + oracleIndex + } + } + } + } +"#; + +#[async_trait] +impl GraphMonitoringSubgraph for GraphMonitoringSubgraphImpl { + async fn fetch_oracle_config(&self, oracle_index: u64) -> Result, Error> { + let mut variables = BTreeMap::new(); + variables.insert("oracleIndex".to_string(), oracle_index.to_string()); + + let request = GraphqlRequest { + query: ORACLE_CONFIG_QUERY, + variables, + }; + + let response: GraphqlResponse = self + .client + .post(&self.endpoint) + .json(&request) + .send() + .await? + .json() + .await?; + + if let Some(errors) = response.errors { + if !errors.is_empty() { + return Err(anyhow!( + "GraphQL errors: {}", + serde_json::to_string(&errors)? + )); + } + } + + Ok(response + .data + .and_then(|d| d.global_state) + .and_then(|gs| gs.active_oracles.into_iter().next()) + .map(|o| o.latest_config)) + } +} diff --git a/availability-oracle/src/main.rs b/availability-oracle/src/main.rs index 468ab34..6c46e71 100644 --- a/availability-oracle/src/main.rs +++ b/availability-oracle/src/main.rs @@ -1,5 +1,7 @@ mod contract; +mod data_edge; mod epoch_block_oracle_subgraph; +mod graph_monitoring_subgraph; mod ipfs; mod manifest; mod network_subgraph; @@ -9,10 +11,12 @@ mod util; use common::prelude::*; use common::prometheus; use contract::*; +use data_edge::{build_oracle_config, log_dry_run_config, DataEdgeContract, OracleConfigParams}; use epoch_block_oracle_subgraph::{EpochBlockOracleSubgraph, EpochBlockOracleSubgraphImpl}; use ethers::abi::Address; use ethers::signers::LocalWallet; use ethers::signers::Signer; +use graph_monitoring_subgraph::GraphMonitoringSubgraphImpl; use ipfs::*; use manifest::{Abi, DataSource, Manifest, Mapping}; use network_subgraph::*; @@ -147,6 +151,22 @@ struct Config { help = "Assigned index for the oracle, to be used when voting on SubgraphAvailabilityManager" )] pub oracle_index: Option, + + #[structopt( + long, + env = "DATA_EDGE_CONTRACT", + required_unless("dry-run"), + help = "The address of the DataEdge contract for posting oracle configuration" + )] + pub data_edge_contract: Option
, + + #[structopt( + long, + env = "GRAPH_MONITORING_SUBGRAPH", + required_unless("dry-run"), + help = "GraphQL endpoint to the graph-monitoring subgraph for checking current oracle config" + )] + pub graph_monitoring_subgraph: Option, } const VALID_DEPLOYMENT_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 24); @@ -157,28 +177,107 @@ async fn main() -> Result<()> { } async fn run(logger: Logger, config: Config) -> Result<()> { - let ipfs = IpfsImpl::new(config.ipfs, config.ipfs_concurrency, config.ipfs_timeout); - let subgraph = NetworkSubgraphImpl::new(logger.clone(), config.subgraph); - let epoch_subgraph = - EpochBlockOracleSubgraphImpl::new(logger.clone(), config.epoch_block_oracle_subgraph); - let contract: Box = if config.dry_run { + let config_params = OracleConfigParams { + ipfs_concurrency: config.ipfs_concurrency, + ipfs_timeout: config.ipfs_timeout, + min_signal: config.min_signal, + period: config.period, + grace_period: config.grace_period, + supported_data_source_kinds: &config.supported_data_source_kinds, + network_subgraph_url: &config.subgraph, + epoch_block_oracle_subgraph_url: &config.epoch_block_oracle_subgraph, + subgraph_availability_manager_contract: config.subgraph_availability_manager_contract, + oracle_index: config.oracle_index, + }; + + let signing_key: Option = if config.dry_run { + None + } else { + Some( + config + .signing_key + .as_ref() + .expect("signing_key is required unless dry-run") + .parse()?, + ) + }; + + if config.dry_run { info!( logger, "Running in dry mode: no transactions will be submitted on chain!" ); - Box::new(StateManagerDryRun::new(logger.clone())) + // In dry-run mode, build local config and check against subgraph if available + if let Ok(local_config) = build_oracle_config(&config_params) { + let monitoring_subgraph = config + .graph_monitoring_subgraph + .as_ref() + .map(|endpoint| GraphMonitoringSubgraphImpl::new(endpoint.clone())); + log_dry_run_config( + &logger, + &local_config, + monitoring_subgraph.as_ref(), + config.oracle_index, + ) + .await; + } } else { - let signing_key: &SecretKey = &config.signing_key.unwrap().parse()?; + let signing_key = signing_key.as_ref().unwrap(); let wallet = LocalWallet::from_bytes(signing_key.as_ref()).unwrap(); info!(logger, "Signing account {}", wallet.address().to_string()); + + // Build local config and post to DataEdge if changed + let local_config = build_oracle_config(&config_params)?; + let oracle_index = config + .oracle_index + .ok_or_else(|| anyhow!("oracle_index is required for DataEdge posting"))?; + + let monitoring_subgraph = GraphMonitoringSubgraphImpl::new( + config + .graph_monitoring_subgraph + .as_ref() + .expect("graph_monitoring_subgraph is required unless dry-run") + .clone(), + ); + + let data_edge = DataEdgeContract::new( + &signing_key, + config.url.clone(), + config + .data_edge_contract + .expect("data_edge_contract is required unless dry-run"), + logger.clone(), + ) + .await?; + + data_edge + .post_config_if_changed(&local_config, &monitoring_subgraph, oracle_index) + .await?; + } + + let ipfs = IpfsImpl::new( + config.ipfs.clone(), + config.ipfs_concurrency, + config.ipfs_timeout, + ); + let subgraph = NetworkSubgraphImpl::new(logger.clone(), config.subgraph.clone()); + let epoch_subgraph = EpochBlockOracleSubgraphImpl::new( + logger.clone(), + config.epoch_block_oracle_subgraph.clone(), + ); + let contract: Box = if config.dry_run { + Box::new(StateManagerDryRun::new(logger.clone())) + } else { state_manager( config.url, - signing_key, + signing_key.as_ref().unwrap(), config.rewards_manager_contract, config.subgraph_availability_manager_contract, config.oracle_index, - logger.clone() - ).await.expect("Configuration error: either [`REWARDS_MANAGER_CONTRACT`] or [`SUBGRAPH_AVAILABILITY_MANAGER_CONTRACT` and `ORACLE_INDEX`] must be provided.") + logger.clone(), + ) + .await + .expect("Configuration error: either [`REWARDS_MANAGER_CONTRACT`] or [`SUBGRAPH_AVAILABILITY_MANAGER_CONTRACT` and `ORACLE_INDEX`] must be provided.") }; let grace_period = Duration::from_secs(config.grace_period);