diff --git a/slv-plugins/rpc-gateway/src/dispatch.rs b/slv-plugins/rpc-gateway/src/dispatch.rs index d64458fb..0bd7fd92 100644 --- a/slv-plugins/rpc-gateway/src/dispatch.rs +++ b/slv-plugins/rpc-gateway/src/dispatch.rs @@ -66,9 +66,50 @@ fn slot_commitment(params: &Option) -> CommitmentParam { static JET_NAMESPACE_RE: LazyLock = LazyLock::new(|| Regex::new(r"^jet[A-Z]").expect("static regex compiles")); +/// Methods served by a historical-archive backend (e.g. +/// `yellowstone-faithful`) — they read CAR files of past epochs and +/// answer transaction / block lookups out of those archives. Live- +/// state methods (`getSlot`, `getBalance`, `sendTransaction`, …) must +/// NOT route here because the archive only updates when the next CAR +/// file is imported — answers would be days behind the live chain. +/// +/// Match is case-sensitive and exact. Aliases the protocol no longer +/// advertises (`getConfirmedTransaction` etc.) are included so any +/// client still using them stays archive-backed. +const HISTORICAL_METHODS: &[&str] = &[ + "getTransaction", + "getBlock", + "getBlocks", + "getBlocksWithLimit", + "getBlockTime", + "getBlockCommitment", + "getBlockProduction", + "getSignaturesForAddress", + // Pre-rename aliases for the same archive-backed lookups. + "getConfirmedBlock", + "getConfirmedBlocks", + "getConfirmedBlocksWithLimit", + "getConfirmedTransaction", + "getConfirmedSignaturesForAddress2", +]; + +fn is_historical_method(method: &str) -> bool { + HISTORICAL_METHODS.contains(&method) +} + pub struct Gateway { pub ch: Arc, + /// Historical-archive RPC backend (e.g. yellowstone-faithful). + /// Serves transaction / block lookups out of imported CAR files. pub of1: Arc, + /// Optional live-state RPC backend (= an agave-validator JSON-RPC + /// port or any live full-history RPC). When set, every method + /// NOT in [`HISTORICAL_METHODS`] is routed here instead of + /// `of1` so callers get current chain state. When `None`, all + /// methods continue to forward to `of1` (= prior behaviour, kept + /// for backward compat with deployments that only have an + /// archive backend). + pub live_rpc: Option>, pub ws: WsConfig, /// Optional `slotSubscribe` upstream singletons, owned per- /// process so multiple WS clients share one set of outbound @@ -115,6 +156,14 @@ pub struct GatewayBuilder { /// `--dest-ip-ports` list, and a strict nftables allowlist on /// the bind port (no signature verification is done in-process). pub slot_udp_bind: Option, + /// Optional live-state RPC URL. When set, the gateway routes + /// every non-historical method (see [`HISTORICAL_METHODS`]) here + /// instead of to `of1`. See `Gateway::live_rpc` for the rationale. + pub live_rpc_url: Option, + /// Per-call timeout for the live RPC client (= different default + /// from `of1_timeout` because `getTransaction` reads from CAR + /// archive which can be slow; live RPC calls should be fast). + pub live_rpc_timeout: Option, pub yellowstone_endpoint: String, /// Operator-supplied WS-duration billing config. When all /// three are non-empty, an [`crate::ws::billing::BillingClient`] @@ -167,6 +216,26 @@ impl Gateway { let slot_multiplex = (!builder.slot_multiplex_urls.is_empty()).then( || Arc::new(SlotPubsubMultiplex::slot_subscribe(builder.slot_multiplex_urls)), ); + // Construct an optional live-RPC client when the operator + // wired one up. Uses a separate `Of1Client` instance so the + // historical and live backends can have independent timeouts + // and connection pools without competing for the same + // reqwest::Client's connection slots. + let live_rpc = builder + .live_rpc_url + .as_deref() + .filter(|s| !s.is_empty()) + .map(|url| { + Arc::new( + Of1Client::new(crate::of1::Of1Config { + url: url.to_owned(), + timeout: builder + .live_rpc_timeout + .unwrap_or(std::time::Duration::from_secs(10)), + }) + .expect("live_rpc client builds with operator-supplied URL"), + ) + }); let billing = match ( builder.metrics_api_url.as_deref().filter(|s| !s.is_empty()), builder.metrics_api_bearer.as_deref().filter(|s| !s.is_empty()), @@ -188,6 +257,7 @@ impl Gateway { Self { ch, of1, + live_rpc, ws, slot_first_shred_multi, slot_first_shred, @@ -264,13 +334,20 @@ impl Gateway { } } - /// Forward a request envelope to the upstream RPC node and - /// return its response. The upstream is itself JSON-RPC 2.0 so - /// its body should already carry `jsonrpc`/`id` and either - /// `result` or `error` — if so we deserialise it directly into - /// `Response`. Anything malformed gets wrapped as - /// `UPSTREAM_ERROR` so the client never sees a half-parsed - /// envelope. + /// Forward a request envelope to the appropriate upstream RPC + /// node and return its response. Routing: + /// + /// - methods in [`HISTORICAL_METHODS`] → always `self.of1` + /// (= the historical archive, e.g. yellowstone-faithful). + /// - everything else → `self.live_rpc` when configured (= an + /// agave-validator live RPC port), falling back to `self.of1` + /// when no live backend is wired up. + /// + /// The upstream is itself JSON-RPC 2.0 so its body should already + /// carry `jsonrpc`/`id` and either `result` or `error`; we + /// deserialise it directly into `Response`. Anything malformed + /// gets wrapped as `UPSTREAM_ERROR` so the client never sees a + /// half-parsed envelope. async fn forward_to_upstream(&self, req: &Request, id: Id) -> Response { let envelope = match build_upstream_envelope(req) { Ok(v) => v, @@ -278,7 +355,12 @@ impl Gateway { return Response::err(id, error_codes::INTERNAL_ERROR, msg); } }; - match self.of1.forward(&envelope).await { + let backend: &Arc = if is_historical_method(&req.method) { + &self.of1 + } else { + self.live_rpc.as_ref().unwrap_or(&self.of1) + }; + match backend.forward(&envelope).await { Ok(body) => match serde_json::from_value::(body.clone()) { Ok(parsed) => parsed, Err(_) => { @@ -344,6 +426,48 @@ mod commitment_tests { CommitmentParam::Other, ); } + + #[test] + fn historical_method_classification() { + // Historical archive lookups. + for m in [ + "getTransaction", + "getBlock", + "getBlocks", + "getBlocksWithLimit", + "getBlockTime", + "getBlockCommitment", + "getBlockProduction", + "getSignaturesForAddress", + "getConfirmedBlock", + "getConfirmedTransaction", + "getConfirmedSignaturesForAddress2", + ] { + assert!( + is_historical_method(m), + "{m} must be classified as historical (= archive-backed)", + ); + } + // Live-state methods. + for m in [ + "getSlot", + "getBlockHeight", + "getLatestBlockhash", + "getEpochInfo", + "getBalance", + "getAccountInfo", + "sendTransaction", + "simulateTransaction", + "getRecentPrioritizationFees", + "getVoteAccounts", + "getHealth", + ] { + assert!( + !is_historical_method(m), + "{m} must NOT be classified as historical (= needs live state)", + ); + } + } } fn build_upstream_envelope(req: &Request) -> Result { diff --git a/slv-plugins/rpc-gateway/src/main.rs b/slv-plugins/rpc-gateway/src/main.rs index 1e7d2e14..b7489a96 100644 --- a/slv-plugins/rpc-gateway/src/main.rs +++ b/slv-plugins/rpc-gateway/src/main.rs @@ -7,10 +7,27 @@ //! CLICKHOUSE_USER optional Basic-auth username //! CLICKHOUSE_PASS optional Basic-auth password //! CLICKHOUSE_TIMEOUT_MS per-query timeout (default 30000) -//! OF1_URL upstream JSON-RPC base for the pass-through proxy -//! + `full` mode on `getTransactionsForAddress` +//! OF1_URL upstream JSON-RPC base for the historical- +//! archive backend (e.g. yellowstone-faithful). +//! Used for `getTransaction`, `getBlock`, +//! `getSignaturesForAddress`, etc. — every +//! method in `dispatch::HISTORICAL_METHODS`. +//! Also handles `full` mode on +//! `getTransactionsForAddress`. //! (default http://localhost:8888) //! OF1_TIMEOUT_MS per-call timeout for of1 (default 60000) +//! LIVE_RPC_URL upstream JSON-RPC base for live-state +//! methods (`getSlot`, `getBlockHeight`, +//! `getLatestBlockhash`, `sendTransaction`, +//! `getBalance`, …). Set this to an +//! agave-validator RPC port (typically +//! `http://127.0.0.1:7211`) when the of1 +//! backend is historical-only — otherwise +//! live-state methods would return stale or +//! "not found" answers. Unset = all methods +//! continue to forward to OF1_URL (= prior +//! behaviour, backward compat). +//! LIVE_RPC_TIMEOUT_MS per-call timeout for live RPC (default 10000) //! GTFA_FULL_CONCURRENCY max parallel of1 `getTransaction` calls when //! `transactionDetails: "full"` is requested //! (default 20) @@ -148,6 +165,11 @@ async fn main() -> anyhow::Result<()> { slot_multiplex_urls: comma_list("SLOT_MULTIPLEX_URLS"), slot_grpc_url: std::env::var("SLOT_GRPC_URL").ok().filter(|s| !s.is_empty()), slot_udp_bind: std::env::var("SLOT_UDP_BIND").ok().filter(|s| !s.is_empty()), + live_rpc_url: std::env::var("LIVE_RPC_URL").ok().filter(|s| !s.is_empty()), + live_rpc_timeout: std::env::var("LIVE_RPC_TIMEOUT_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .map(Duration::from_millis), yellowstone_endpoint: env_or("YELLOWSTONE_GRPC", "localhost:10000"), metrics_api_url: std::env::var("RPC_METRICS_API_URL").ok().filter(|s| !s.is_empty()), metrics_api_bearer: std::env::var("RPC_METRICS_API_BEARER").ok().filter(|s| !s.is_empty()), diff --git a/slv-plugins/rpc-gateway/src/ws/ws_test.rs b/slv-plugins/rpc-gateway/src/ws/ws_test.rs index 06d479fc..06760e37 100644 --- a/slv-plugins/rpc-gateway/src/ws/ws_test.rs +++ b/slv-plugins/rpc-gateway/src/ws/ws_test.rs @@ -148,6 +148,8 @@ mod tests { slot_multiplex_urls: Vec::new(), slot_grpc_url: None, slot_udp_bind: None, + live_rpc_url: None, + live_rpc_timeout: None, yellowstone_endpoint: "localhost:10000".into(), metrics_api_url: None, metrics_api_bearer: None, diff --git a/template/2026.5.5.1612/ansible/cmn/install_rpc_gateway.yml b/template/2026.5.5.1612/ansible/cmn/install_rpc_gateway.yml index bac9ac16..f5ab7fdf 100644 --- a/template/2026.5.5.1612/ansible/cmn/install_rpc_gateway.yml +++ b/template/2026.5.5.1612/ansible/cmn/install_rpc_gateway.yml @@ -42,6 +42,19 @@ # this address to --dest-ip-ports # and a strict nftables allowlist # on the bind port. +# rpc_gateway_live_rpc_url (unset) — live-state RPC backend +# (e.g. http://127.0.0.1:7211 for +# a co-located agave-validator). +# When set, every non-historical +# method is routed here instead of +# to OF1_URL. Required when OF1_URL +# points at a historical-only +# backend (e.g. yellowstone-faithful) +# to keep getSlot / getBlockHeight +# / sendTransaction etc. answering +# with live chain state. +# rpc_gateway_live_rpc_timeout_ms 10000 — per-call timeout +# for the live RPC client # rpc_gateway_metrics_api_url (unset) — operator-supplied metrics # API base URL. When set with # *_bearer below, WS connection close @@ -96,6 +109,8 @@ rg_slot_pubsub_url: "{{ rpc_gateway_slot_pubsub_url | default('') }}" rg_slot_grpc_url: "{{ rpc_gateway_slot_grpc_url | default('') }}" rg_slot_udp_bind: "{{ rpc_gateway_slot_udp_bind | default('') }}" + rg_live_rpc_url: "{{ rpc_gateway_live_rpc_url | default('') }}" + rg_live_rpc_timeout_ms: "{{ rpc_gateway_live_rpc_timeout_ms | default('') }}" rg_metrics_api_url: "{{ rpc_gateway_metrics_api_url | default('') }}" rg_metrics_api_bearer: "{{ rpc_gateway_metrics_api_bearer | default('') }}" rg_metrics_upstream_ip: "{{ rpc_gateway_metrics_upstream_ip | default('') }}" @@ -315,6 +330,8 @@ {% if rg_slot_pubsub_url %}Environment=SLOT_PUBSUB_URL={{ rg_slot_pubsub_url }}{% endif %} {% if rg_slot_grpc_url %}Environment=SLOT_GRPC_URL={{ rg_slot_grpc_url }}{% endif %} {% if rg_slot_udp_bind %}Environment=SLOT_UDP_BIND={{ rg_slot_udp_bind }}{% endif %} + {% if rg_live_rpc_url %}Environment=LIVE_RPC_URL={{ rg_live_rpc_url }}{% endif %} + {% if rg_live_rpc_timeout_ms %}Environment=LIVE_RPC_TIMEOUT_MS={{ rg_live_rpc_timeout_ms }}{% endif %} {% if rg_metrics_api_url %}Environment=RPC_METRICS_API_URL={{ rg_metrics_api_url }}{% endif %} {% if rg_metrics_api_bearer %}Environment=RPC_METRICS_API_BEARER={{ rg_metrics_api_bearer }}{% endif %} {% if rg_metrics_upstream_ip %}Environment=RPC_METRICS_UPSTREAM_IP={{ rg_metrics_upstream_ip }}{% endif %}