Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 132 additions & 8 deletions slv-plugins/rpc-gateway/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,50 @@ fn slot_commitment(params: &Option<Value>) -> CommitmentParam {
static JET_NAMESPACE_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^jet[A-Z]").expect("static regex compiles"));

/// Methods served by a historical-archive backend (e.g.
/// `yellowstone-faithful`) — they read CAR files of past epochs and
/// answer transaction / block lookups out of those archives. Live-
/// state methods (`getSlot`, `getBalance`, `sendTransaction`, …) must
/// NOT route here because the archive only updates when the next CAR
/// file is imported — answers would be days behind the live chain.
///
/// Match is case-sensitive and exact. Aliases the protocol no longer
/// advertises (`getConfirmedTransaction` etc.) are included so any
/// client still using them stays archive-backed.
const HISTORICAL_METHODS: &[&str] = &[
"getTransaction",
"getBlock",
"getBlocks",
"getBlocksWithLimit",
"getBlockTime",
"getBlockCommitment",
"getBlockProduction",
"getSignaturesForAddress",
// Pre-rename aliases for the same archive-backed lookups.
"getConfirmedBlock",
"getConfirmedBlocks",
"getConfirmedBlocksWithLimit",
"getConfirmedTransaction",
"getConfirmedSignaturesForAddress2",
];

fn is_historical_method(method: &str) -> bool {
HISTORICAL_METHODS.contains(&method)
}

pub struct Gateway {
pub ch: Arc<ClickHouseClient>,
/// Historical-archive RPC backend (e.g. yellowstone-faithful).
/// Serves transaction / block lookups out of imported CAR files.
pub of1: Arc<Of1Client>,
/// Optional live-state RPC backend (= an agave-validator JSON-RPC
/// port or any live full-history RPC). When set, every method
/// NOT in [`HISTORICAL_METHODS`] is routed here instead of
/// `of1` so callers get current chain state. When `None`, all
/// methods continue to forward to `of1` (= prior behaviour, kept
/// for backward compat with deployments that only have an
/// archive backend).
pub live_rpc: Option<Arc<Of1Client>>,
pub ws: WsConfig,
/// Optional `slotSubscribe` upstream singletons, owned per-
/// process so multiple WS clients share one set of outbound
Expand Down Expand Up @@ -115,6 +156,14 @@ pub struct GatewayBuilder {
/// `--dest-ip-ports` list, and a strict nftables allowlist on
/// the bind port (no signature verification is done in-process).
pub slot_udp_bind: Option<String>,
/// Optional live-state RPC URL. When set, the gateway routes
/// every non-historical method (see [`HISTORICAL_METHODS`]) here
/// instead of to `of1`. See `Gateway::live_rpc` for the rationale.
pub live_rpc_url: Option<String>,
/// Per-call timeout for the live RPC client (= different default
/// from `of1_timeout` because `getTransaction` reads from CAR
/// archive which can be slow; live RPC calls should be fast).
pub live_rpc_timeout: Option<std::time::Duration>,
pub yellowstone_endpoint: String,
/// Operator-supplied WS-duration billing config. When all
/// three are non-empty, an [`crate::ws::billing::BillingClient`]
Expand Down Expand Up @@ -167,6 +216,26 @@ impl Gateway {
let slot_multiplex = (!builder.slot_multiplex_urls.is_empty()).then(
|| Arc::new(SlotPubsubMultiplex::slot_subscribe(builder.slot_multiplex_urls)),
);
// Construct an optional live-RPC client when the operator
// wired one up. Uses a separate `Of1Client` instance so the
// historical and live backends can have independent timeouts
// and connection pools without competing for the same
// reqwest::Client's connection slots.
let live_rpc = builder
.live_rpc_url
.as_deref()
.filter(|s| !s.is_empty())
.map(|url| {
Arc::new(
Of1Client::new(crate::of1::Of1Config {
url: url.to_owned(),
timeout: builder
.live_rpc_timeout
.unwrap_or(std::time::Duration::from_secs(10)),
})
.expect("live_rpc client builds with operator-supplied URL"),
)
});
let billing = match (
builder.metrics_api_url.as_deref().filter(|s| !s.is_empty()),
builder.metrics_api_bearer.as_deref().filter(|s| !s.is_empty()),
Expand All @@ -188,6 +257,7 @@ impl Gateway {
Self {
ch,
of1,
live_rpc,
ws,
slot_first_shred_multi,
slot_first_shred,
Expand Down Expand Up @@ -264,21 +334,33 @@ impl Gateway {
}
}

/// Forward a request envelope to the upstream RPC node and
/// return its response. The upstream is itself JSON-RPC 2.0 so
/// its body should already carry `jsonrpc`/`id` and either
/// `result` or `error` — if so we deserialise it directly into
/// `Response`. Anything malformed gets wrapped as
/// `UPSTREAM_ERROR` so the client never sees a half-parsed
/// envelope.
/// Forward a request envelope to the appropriate upstream RPC
/// node and return its response. Routing:
///
/// - methods in [`HISTORICAL_METHODS`] → always `self.of1`
/// (= the historical archive, e.g. yellowstone-faithful).
/// - everything else → `self.live_rpc` when configured (= an
/// agave-validator live RPC port), falling back to `self.of1`
/// when no live backend is wired up.
///
/// The upstream is itself JSON-RPC 2.0 so its body should already
/// carry `jsonrpc`/`id` and either `result` or `error`; we
/// deserialise it directly into `Response`. Anything malformed
/// gets wrapped as `UPSTREAM_ERROR` so the client never sees a
/// half-parsed envelope.
async fn forward_to_upstream(&self, req: &Request, id: Id) -> Response {
let envelope = match build_upstream_envelope(req) {
Ok(v) => v,
Err(msg) => {
return Response::err(id, error_codes::INTERNAL_ERROR, msg);
}
};
match self.of1.forward(&envelope).await {
let backend: &Arc<Of1Client> = if is_historical_method(&req.method) {
&self.of1
} else {
self.live_rpc.as_ref().unwrap_or(&self.of1)
};
match backend.forward(&envelope).await {
Ok(body) => match serde_json::from_value::<Response>(body.clone()) {
Ok(parsed) => parsed,
Err(_) => {
Expand Down Expand Up @@ -344,6 +426,48 @@ mod commitment_tests {
CommitmentParam::Other,
);
}

#[test]
fn historical_method_classification() {
// Historical archive lookups.
for m in [
"getTransaction",
"getBlock",
"getBlocks",
"getBlocksWithLimit",
"getBlockTime",
"getBlockCommitment",
"getBlockProduction",
"getSignaturesForAddress",
"getConfirmedBlock",
"getConfirmedTransaction",
"getConfirmedSignaturesForAddress2",
] {
assert!(
is_historical_method(m),
"{m} must be classified as historical (= archive-backed)",
);
}
// Live-state methods.
for m in [
"getSlot",
"getBlockHeight",
"getLatestBlockhash",
"getEpochInfo",
"getBalance",
"getAccountInfo",
"sendTransaction",
"simulateTransaction",
"getRecentPrioritizationFees",
"getVoteAccounts",
"getHealth",
] {
assert!(
!is_historical_method(m),
"{m} must NOT be classified as historical (= needs live state)",
);
}
}
}

fn build_upstream_envelope(req: &Request) -> Result<serde_json::Value, String> {
Expand Down
26 changes: 24 additions & 2 deletions slv-plugins/rpc-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,27 @@
//! CLICKHOUSE_USER optional Basic-auth username
//! CLICKHOUSE_PASS optional Basic-auth password
//! CLICKHOUSE_TIMEOUT_MS per-query timeout (default 30000)
//! OF1_URL upstream JSON-RPC base for the pass-through proxy
//! + `full` mode on `getTransactionsForAddress`
//! OF1_URL upstream JSON-RPC base for the historical-
//! archive backend (e.g. yellowstone-faithful).
//! Used for `getTransaction`, `getBlock`,
//! `getSignaturesForAddress`, etc. — every
//! method in `dispatch::HISTORICAL_METHODS`.
//! Also handles `full` mode on
//! `getTransactionsForAddress`.
//! (default http://localhost:8888)
//! OF1_TIMEOUT_MS per-call timeout for of1 (default 60000)
//! LIVE_RPC_URL upstream JSON-RPC base for live-state
//! methods (`getSlot`, `getBlockHeight`,
//! `getLatestBlockhash`, `sendTransaction`,
//! `getBalance`, …). Set this to an
//! agave-validator RPC port (typically
//! `http://127.0.0.1:7211`) when the of1
//! backend is historical-only — otherwise
//! live-state methods would return stale or
//! "not found" answers. Unset = all methods
//! continue to forward to OF1_URL (= prior
//! behaviour, backward compat).
//! LIVE_RPC_TIMEOUT_MS per-call timeout for live RPC (default 10000)
//! GTFA_FULL_CONCURRENCY max parallel of1 `getTransaction` calls when
//! `transactionDetails: "full"` is requested
//! (default 20)
Expand Down Expand Up @@ -148,6 +165,11 @@ async fn main() -> anyhow::Result<()> {
slot_multiplex_urls: comma_list("SLOT_MULTIPLEX_URLS"),
slot_grpc_url: std::env::var("SLOT_GRPC_URL").ok().filter(|s| !s.is_empty()),
slot_udp_bind: std::env::var("SLOT_UDP_BIND").ok().filter(|s| !s.is_empty()),
live_rpc_url: std::env::var("LIVE_RPC_URL").ok().filter(|s| !s.is_empty()),
live_rpc_timeout: std::env::var("LIVE_RPC_TIMEOUT_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.map(Duration::from_millis),
yellowstone_endpoint: env_or("YELLOWSTONE_GRPC", "localhost:10000"),
metrics_api_url: std::env::var("RPC_METRICS_API_URL").ok().filter(|s| !s.is_empty()),
metrics_api_bearer: std::env::var("RPC_METRICS_API_BEARER").ok().filter(|s| !s.is_empty()),
Expand Down
2 changes: 2 additions & 0 deletions slv-plugins/rpc-gateway/src/ws/ws_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ mod tests {
slot_multiplex_urls: Vec::new(),
slot_grpc_url: None,
slot_udp_bind: None,
live_rpc_url: None,
live_rpc_timeout: None,
yellowstone_endpoint: "localhost:10000".into(),
metrics_api_url: None,
metrics_api_bearer: None,
Expand Down
17 changes: 17 additions & 0 deletions template/2026.5.5.1612/ansible/cmn/install_rpc_gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@
# this address to --dest-ip-ports
# and a strict nftables allowlist
# on the bind port.
# rpc_gateway_live_rpc_url (unset) — live-state RPC backend
# (e.g. http://127.0.0.1:7211 for
# a co-located agave-validator).
# When set, every non-historical
# method is routed here instead of
# to OF1_URL. Required when OF1_URL
# points at a historical-only
# backend (e.g. yellowstone-faithful)
# to keep getSlot / getBlockHeight
# / sendTransaction etc. answering
# with live chain state.
# rpc_gateway_live_rpc_timeout_ms 10000 — per-call timeout
# for the live RPC client
# rpc_gateway_metrics_api_url (unset) — operator-supplied metrics
# API base URL. When set with
# *_bearer below, WS connection close
Expand Down Expand Up @@ -96,6 +109,8 @@
rg_slot_pubsub_url: "{{ rpc_gateway_slot_pubsub_url | default('') }}"
rg_slot_grpc_url: "{{ rpc_gateway_slot_grpc_url | default('') }}"
rg_slot_udp_bind: "{{ rpc_gateway_slot_udp_bind | default('') }}"
rg_live_rpc_url: "{{ rpc_gateway_live_rpc_url | default('') }}"
rg_live_rpc_timeout_ms: "{{ rpc_gateway_live_rpc_timeout_ms | default('') }}"
rg_metrics_api_url: "{{ rpc_gateway_metrics_api_url | default('') }}"
rg_metrics_api_bearer: "{{ rpc_gateway_metrics_api_bearer | default('') }}"
rg_metrics_upstream_ip: "{{ rpc_gateway_metrics_upstream_ip | default('') }}"
Expand Down Expand Up @@ -315,6 +330,8 @@
{% if rg_slot_pubsub_url %}Environment=SLOT_PUBSUB_URL={{ rg_slot_pubsub_url }}{% endif %}
{% if rg_slot_grpc_url %}Environment=SLOT_GRPC_URL={{ rg_slot_grpc_url }}{% endif %}
{% if rg_slot_udp_bind %}Environment=SLOT_UDP_BIND={{ rg_slot_udp_bind }}{% endif %}
{% if rg_live_rpc_url %}Environment=LIVE_RPC_URL={{ rg_live_rpc_url }}{% endif %}
{% if rg_live_rpc_timeout_ms %}Environment=LIVE_RPC_TIMEOUT_MS={{ rg_live_rpc_timeout_ms }}{% endif %}
{% if rg_metrics_api_url %}Environment=RPC_METRICS_API_URL={{ rg_metrics_api_url }}{% endif %}
{% if rg_metrics_api_bearer %}Environment=RPC_METRICS_API_BEARER={{ rg_metrics_api_bearer }}{% endif %}
{% if rg_metrics_upstream_ip %}Environment=RPC_METRICS_UPSTREAM_IP={{ rg_metrics_upstream_ip }}{% endif %}
Expand Down
Loading