diff --git a/crates/base/test_cases/rate-limit-retry-after/index.ts b/crates/base/test_cases/rate-limit-retry-after/index.ts new file mode 100644 index 00000000..a015d629 --- /dev/null +++ b/crates/base/test_cases/rate-limit-retry-after/index.ts @@ -0,0 +1,46 @@ +// Worker: makes one outbound fetch to itself, catches RateLimitError, and +// returns { retryAfterMs } in JSON so the integration test can verify the field. +// +// Requests with "x-skip: 1" are terminal — they return immediately without +// making an outbound call, so the worker does not recurse indefinitely. +Deno.serve(async (req: Request) => { + const serverUrl = req.headers.get("x-test-server-url"); + if (!serverUrl) { + return new Response( + JSON.stringify({ msg: "missing x-test-server-url header" }), + { status: 400, headers: { "Content-Type": "application/json" } }, + ); + } + + // Terminal hop: just acknowledge, no outbound call. + if (req.headers.get("x-skip") === "1") { + return new Response(JSON.stringify({ ok: true }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + try { + await fetch(`${serverUrl}/rate-limit-retry-after`, { + headers: { + "x-test-server-url": serverUrl, + "x-skip": "1", + }, + }); + return new Response(JSON.stringify({ ok: true }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } catch (e) { + if (e instanceof Deno.errors.RateLimitError) { + return new Response( + JSON.stringify({ name: e.name, retryAfterMs: e.retryAfterMs }), + { status: 429, headers: { "Content-Type": "application/json" } }, + ); + } + return new Response( + JSON.stringify({ msg: String(e) }), + { status: 500, headers: { "Content-Type": "application/json" } }, + ); + } +}); diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 7ad6cb56..60c15fac 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -4361,6 +4361,83 @@ async fn test_request_trace_id_isolation() { ); } +/// Verifies that `RateLimitError.retryAfterMs` is a positive number when the +/// global (untraced) budget is exhausted. +#[tokio::test] +#[serial] +async fn test_rate_limit_retry_after_ms() { + init_otel(); + + // The budget in rate-limit-main is 10. Send 11 requests; the last one + // should come back as 429 with a positive retryAfterMs value. + const BUDGET: usize = 10; + + integration_test_with_server_flag!( + ServerFlags { + rate_limit_cleanup_interval_sec: 60, + request_wait_timeout_ms: Some(30_000), + ..Default::default() + }, + "./test_cases/rate-limit-main", + NON_SECURE_PORT, + "rate-limit-retry-after", + None, + None::, + None::, + ( + |(port, _url, _req_builder, _event_rx, _metric_src)| async move { + let client = reqwest::Client::new(); + let url = format!("http://localhost:{}/rate-limit-retry-after", port); + let server_url = format!("http://localhost:{}", port); + + // Exhaust the budget. + for _ in 0..BUDGET { + let resp = client + .get(&url) + .header("x-test-server-url", &server_url) + .send() + .await + .unwrap(); + // Each of these may itself trigger an inner fetch that is counted + // against the budget; we only care about the final one below. + let _ = resp; + } + + // This request should be rate-limited. + let resp = client + .get(&url) + .header("x-test-server-url", &server_url) + .send() + .await; + + Some(resp) + }, + |resp| async move { + let res = resp.unwrap(); + assert_eq!( + res.status().as_u16(), + 429, + "expected 429 from rate-limited request" + ); + let body: serde_json::Value = res.json().await.unwrap(); + assert_eq!( + body["name"].as_str().unwrap_or(""), + "RateLimitError", + "expected RateLimitError in body, got: {body}" + ); + let retry_after_ms = body["retryAfterMs"] + .as_u64() + .expect("retryAfterMs should be a non-null number"); + assert!( + retry_after_ms > 0, + "retryAfterMs should be positive, got {retry_after_ms}" + ); + } + ), + TerminationToken::new() + ); +} + #[derive(Deserialize)] struct ErrorResponsePayload { msg: String, diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index b35c336a..9c0336e3 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -497,16 +497,21 @@ class ClientRequest extends OutgoingMessage { const traceId = internals.getRequestTraceId?.(); const isTraced = traceId !== null && traceId !== undefined; const rlKey = isTraced ? traceId : ""; - const allowed = op_check_outbound_rate_limit( + // op returns u32::MAX when allowed, or retry_after_ms (< 0xFFFFFFFF) when denied. + const rlResult = op_check_outbound_rate_limit( parsedUrl.href, rlKey, isTraced, ); - if (!allowed) { + if (rlResult !== 0xFFFFFFFF) { + const retryAfterMs = rlResult; + const retryHint = retryAfterMs > 0 + ? ` Retry after ${retryAfterMs}ms.` + : ""; const msg = isTraced - ? `Rate limit exceeded for trace ${rlKey}` - : `Rate limit exceeded for function`; - throw new Deno.errors.RateLimitError(msg); + ? `Rate limit exceeded for trace ${rlKey}.${retryHint}` + : `Rate limit exceeded for function.${retryHint}`; + throw new Deno.errors.RateLimitError(msg, retryAfterMs); } this._req = op_node_http_request( diff --git a/ext/runtime/js/errors.js b/ext/runtime/js/errors.js index 844c3925..02ed80c5 100644 --- a/ext/runtime/js/errors.js +++ b/ext/runtime/js/errors.js @@ -56,7 +56,22 @@ const DOMExceptionInvalidCharacterError = buildDomErrorClass( "InvalidCharacterError", ); const DOMExceptionDataError = buildDomErrorClass("DOMExceptionDataError"); -const RateLimitError = buildErrorClass("RateLimitError"); +const RateLimitError = (() => { + const cls = class RateLimitError extends Error { + constructor(msg, retryAfterMs) { + super(msg); + this.name = "RateLimitError"; + // Number of milliseconds until the rate-limit window resets. + // May be 0 if the server could not determine the reset time. + this.retryAfterMs = typeof retryAfterMs === "number" + ? retryAfterMs + : null; + } + }; + cls.getName = () => "RateLimitError"; + knownErrors["RateLimitError"] = cls; + return cls; +})(); function registerErrors() { core.registerErrorClass("InvalidWorkerResponse", InvalidWorkerResponse); diff --git a/ext/runtime/lib.rs b/ext/runtime/lib.rs index 3a2c7442..77378732 100644 --- a/ext/runtime/lib.rs +++ b/ext/runtime/lib.rs @@ -450,16 +450,21 @@ pub fn op_bootstrap_unstable_args(_state: &mut OpState) -> Vec { } #[op2(fast)] +/// Returns `u32::MAX` when the request is allowed, or the number of +/// milliseconds until the rate-limit window resets when denied (< u32::MAX). pub fn op_check_outbound_rate_limit( state: &mut OpState, #[string] url: &str, #[string] key: &str, is_traced: bool, -) -> bool { +) -> u32 { let Some(limiter) = state.try_borrow::() else { - return true; + return u32::MAX; }; - limiter.check_and_increment(url, key, is_traced) + match limiter.check_and_increment(url, key, is_traced) { + Ok(()) => u32::MAX, + Err(retry_after_ms) => retry_after_ms.min(u32::MAX as u64 - 1) as u32, + } } deno_core::extension!( diff --git a/ext/runtime/rate_limit.rs b/ext/runtime/rate_limit.rs index f135abf4..8a5dd864 100644 --- a/ext/runtime/rate_limit.rs +++ b/ext/runtime/rate_limit.rs @@ -62,12 +62,14 @@ impl SharedRateLimitTable { }); } + /// Returns `Ok(())` when the request is allowed, or `Err(retry_after_ms)` + /// with the number of milliseconds until the window resets when denied. pub fn check_and_increment( &self, key: &str, budget: u32, ttl: Duration, - ) -> bool { + ) -> Result<(), u64> { let now = Instant::now(); let mut entry = @@ -98,11 +100,13 @@ impl SharedRateLimitTable { ); if !allowed { - return false; + let retry_after_ms = + entry.expires_at.saturating_duration_since(now).as_millis() as u64; + return Err(retry_after_ms); } entry.count += 1; - true + Ok(()) } } @@ -166,16 +170,18 @@ impl TraceRateLimiter { }) } + /// Returns `Ok(())` when the request is allowed, or `Err(retry_after_ms)` + /// with the number of milliseconds until the window resets when denied. pub fn check_and_increment( &self, url: &str, key: &str, is_traced: bool, - ) -> bool { + ) -> Result<(), u64> { let rule = self.rules.iter().find(|r| r.matches.is_match(url)); let Some(rule) = rule else { - return true; + return Ok(()); }; if is_traced { @@ -187,7 +193,7 @@ impl TraceRateLimiter { // budget accumulates correctly across worker instances. Deny the request // if the caller did not supply one. let Some(fid) = self.global_key.as_deref() else { - return false; + return Err(0); }; self .table diff --git a/types/global.d.ts b/types/global.d.ts index 11d0a2f9..58772a96 100644 --- a/types/global.d.ts +++ b/types/global.d.ts @@ -254,6 +254,13 @@ declare namespace Deno { class WorkerAlreadyRetired extends Error {} /** Thrown when an outbound HTTP request is blocked by the rate limiter. */ - class RateLimitError extends Error {} + class RateLimitError extends Error { + /** + * Number of milliseconds until the rate-limit window resets. + * `null` if the reset time could not be determined. + */ + retryAfterMs: number | null; + constructor(message: string, retryAfterMs?: number); + } } } diff --git a/vendor/deno_fetch/26_fetch.js b/vendor/deno_fetch/26_fetch.js index 7ddd5b45..516e2cd2 100644 --- a/vendor/deno_fetch/26_fetch.js +++ b/vendor/deno_fetch/26_fetch.js @@ -399,16 +399,19 @@ function fetch(input, init = { __proto__: null }) { const traceId = internals.getRequestTraceId?.(); const isTraced = traceId !== null && traceId !== undefined; const rlKey = isTraced ? traceId : ""; - const allowed = op_check_outbound_rate_limit( + // op returns u32::MAX when allowed, or retry_after_ms (< 0xFFFFFFFF) when denied. + const rlResult = op_check_outbound_rate_limit( requestObject.url, rlKey, isTraced, ); - if (!allowed) { + if (rlResult !== 0xFFFFFFFF) { + const retryAfterMs = rlResult; + const retryHint = retryAfterMs > 0 ? ` Retry after ${retryAfterMs}ms.` : ""; const msg = isTraced - ? `Rate limit exceeded for trace ${rlKey}` - : `Rate limit exceeded for function`; - reject(new Deno.errors.RateLimitError(msg)); + ? `Rate limit exceeded for trace ${rlKey}.${retryHint}` + : `Rate limit exceeded for function.${retryHint}`; + reject(new Deno.errors.RateLimitError(msg, retryAfterMs)); return; }