Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions crates/base/test_cases/rate-limit-retry-after/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Worker: makes one outbound fetch to itself, catches RateLimitError, and
// returns { retryAfterMs } in JSON so the integration test can verify the field.
//
// Requests with "x-skip: 1" are terminal — they return immediately without
// making an outbound call, so the worker does not recurse indefinitely.
Deno.serve(async (req: Request) => {
const serverUrl = req.headers.get("x-test-server-url");
if (!serverUrl) {
return new Response(
JSON.stringify({ msg: "missing x-test-server-url header" }),
{ status: 400, headers: { "Content-Type": "application/json" } },
);
}

// Terminal hop: just acknowledge, no outbound call.
if (req.headers.get("x-skip") === "1") {
return new Response(JSON.stringify({ ok: true }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}

try {
await fetch(`${serverUrl}/rate-limit-retry-after`, {
headers: {
"x-test-server-url": serverUrl,
"x-skip": "1",
},
});
return new Response(JSON.stringify({ ok: true }), {
status: 200,
headers: { "Content-Type": "application/json" },
});
} catch (e) {
if (e instanceof Deno.errors.RateLimitError) {
return new Response(
JSON.stringify({ name: e.name, retryAfterMs: e.retryAfterMs }),
{ status: 429, headers: { "Content-Type": "application/json" } },
);
}
return new Response(
JSON.stringify({ msg: String(e) }),
{ status: 500, headers: { "Content-Type": "application/json" } },
);
}
});
77 changes: 77 additions & 0 deletions crates/base/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4361,6 +4361,83 @@ async fn test_request_trace_id_isolation() {
);
}

/// Verifies that `RateLimitError.retryAfterMs` is a positive number when the
/// global (untraced) budget is exhausted.
#[tokio::test]
#[serial]
async fn test_rate_limit_retry_after_ms() {
init_otel();

// The budget in rate-limit-main is 10. Send 11 requests; the last one
// should come back as 429 with a positive retryAfterMs value.
const BUDGET: usize = 10;

integration_test_with_server_flag!(
ServerFlags {
rate_limit_cleanup_interval_sec: 60,
request_wait_timeout_ms: Some(30_000),
..Default::default()
},
"./test_cases/rate-limit-main",
NON_SECURE_PORT,
"rate-limit-retry-after",
None,
None::<reqwest::RequestBuilder>,
None::<Tls>,
(
|(port, _url, _req_builder, _event_rx, _metric_src)| async move {
let client = reqwest::Client::new();
let url = format!("http://localhost:{}/rate-limit-retry-after", port);
let server_url = format!("http://localhost:{}", port);

// Exhaust the budget.
for _ in 0..BUDGET {
let resp = client
.get(&url)
.header("x-test-server-url", &server_url)
.send()
.await
.unwrap();
// Each of these may itself trigger an inner fetch that is counted
// against the budget; we only care about the final one below.
let _ = resp;
}

// This request should be rate-limited.
let resp = client
.get(&url)
.header("x-test-server-url", &server_url)
.send()
.await;

Some(resp)
},
|resp| async move {
let res = resp.unwrap();
assert_eq!(
res.status().as_u16(),
429,
"expected 429 from rate-limited request"
);
let body: serde_json::Value = res.json().await.unwrap();
assert_eq!(
body["name"].as_str().unwrap_or(""),
"RateLimitError",
"expected RateLimitError in body, got: {body}"
);
let retry_after_ms = body["retryAfterMs"]
.as_u64()
.expect("retryAfterMs should be a non-null number");
assert!(
retry_after_ms > 0,
"retryAfterMs should be positive, got {retry_after_ms}"
);
}
),
TerminationToken::new()
);
}

#[derive(Deserialize)]
struct ErrorResponsePayload {
msg: String,
Expand Down
15 changes: 10 additions & 5 deletions ext/node/polyfills/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,16 +497,21 @@ class ClientRequest extends OutgoingMessage {
const traceId = internals.getRequestTraceId?.();
const isTraced = traceId !== null && traceId !== undefined;
const rlKey = isTraced ? traceId : "";
const allowed = op_check_outbound_rate_limit(
// op returns u32::MAX when allowed, or retry_after_ms (< 0xFFFFFFFF) when denied.
const rlResult = op_check_outbound_rate_limit(
parsedUrl.href,
rlKey,
isTraced,
);
if (!allowed) {
if (rlResult !== 0xFFFFFFFF) {
const retryAfterMs = rlResult;
const retryHint = retryAfterMs > 0
? ` Retry after ${retryAfterMs}ms.`
: "";
const msg = isTraced
? `Rate limit exceeded for trace ${rlKey}`
: `Rate limit exceeded for function`;
throw new Deno.errors.RateLimitError(msg);
? `Rate limit exceeded for trace ${rlKey}.${retryHint}`
: `Rate limit exceeded for function.${retryHint}`;
throw new Deno.errors.RateLimitError(msg, retryAfterMs);
}

this._req = op_node_http_request(
Expand Down
17 changes: 16 additions & 1 deletion ext/runtime/js/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,22 @@ const DOMExceptionInvalidCharacterError = buildDomErrorClass(
"InvalidCharacterError",
);
const DOMExceptionDataError = buildDomErrorClass("DOMExceptionDataError");
const RateLimitError = buildErrorClass("RateLimitError");
const RateLimitError = (() => {
const cls = class RateLimitError extends Error {
constructor(msg, retryAfterMs) {
super(msg);
this.name = "RateLimitError";
// Number of milliseconds until the rate-limit window resets.
// May be 0 if the server could not determine the reset time.
this.retryAfterMs = typeof retryAfterMs === "number"
? retryAfterMs
: null;
}
};
cls.getName = () => "RateLimitError";
knownErrors["RateLimitError"] = cls;
return cls;
})();

function registerErrors() {
core.registerErrorClass("InvalidWorkerResponse", InvalidWorkerResponse);
Expand Down
11 changes: 8 additions & 3 deletions ext/runtime/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,16 +450,21 @@ pub fn op_bootstrap_unstable_args(_state: &mut OpState) -> Vec<String> {
}

#[op2(fast)]
/// Returns `u32::MAX` when the request is allowed, or the number of
/// milliseconds until the rate-limit window resets when denied (< u32::MAX).
pub fn op_check_outbound_rate_limit(
state: &mut OpState,
#[string] url: &str,
#[string] key: &str,
is_traced: bool,
) -> bool {
) -> u32 {
let Some(limiter) = state.try_borrow::<TraceRateLimiter>() else {
return true;
return u32::MAX;
};
limiter.check_and_increment(url, key, is_traced)
match limiter.check_and_increment(url, key, is_traced) {
Ok(()) => u32::MAX,
Err(retry_after_ms) => retry_after_ms.min(u32::MAX as u64 - 1) as u32,
}
}

deno_core::extension!(
Expand Down
18 changes: 12 additions & 6 deletions ext/runtime/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ impl SharedRateLimitTable {
});
}

/// Returns `Ok(())` when the request is allowed, or `Err(retry_after_ms)`
/// with the number of milliseconds until the window resets when denied.
pub fn check_and_increment(
&self,
key: &str,
budget: u32,
ttl: Duration,
) -> bool {
) -> Result<(), u64> {
let now = Instant::now();

let mut entry =
Expand Down Expand Up @@ -98,11 +100,13 @@ impl SharedRateLimitTable {
);

if !allowed {
return false;
let retry_after_ms =
entry.expires_at.saturating_duration_since(now).as_millis() as u64;
return Err(retry_after_ms);
}

entry.count += 1;
true
Ok(())
}
}

Expand Down Expand Up @@ -166,16 +170,18 @@ impl TraceRateLimiter {
})
}

/// Returns `Ok(())` when the request is allowed, or `Err(retry_after_ms)`
/// with the number of milliseconds until the window resets when denied.
pub fn check_and_increment(
&self,
url: &str,
key: &str,
is_traced: bool,
) -> bool {
) -> Result<(), u64> {
let rule = self.rules.iter().find(|r| r.matches.is_match(url));

let Some(rule) = rule else {
return true;
return Ok(());
};

if is_traced {
Expand All @@ -187,7 +193,7 @@ impl TraceRateLimiter {
// budget accumulates correctly across worker instances. Deny the request
// if the caller did not supply one.
let Some(fid) = self.global_key.as_deref() else {
return false;
return Err(0);
};
self
.table
Expand Down
9 changes: 8 additions & 1 deletion types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,13 @@ declare namespace Deno {
class WorkerAlreadyRetired extends Error {}

/** Thrown when an outbound HTTP request is blocked by the rate limiter. */
class RateLimitError extends Error {}
class RateLimitError extends Error {
/**
* Number of milliseconds until the rate-limit window resets.
* `null` if the reset time could not be determined.
*/
retryAfterMs: number | null;
constructor(message: string, retryAfterMs?: number);
}
}
}
13 changes: 8 additions & 5 deletions vendor/deno_fetch/26_fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -399,16 +399,19 @@ function fetch(input, init = { __proto__: null }) {
const traceId = internals.getRequestTraceId?.();
const isTraced = traceId !== null && traceId !== undefined;
const rlKey = isTraced ? traceId : "";
const allowed = op_check_outbound_rate_limit(
// op returns u32::MAX when allowed, or retry_after_ms (< 0xFFFFFFFF) when denied.
const rlResult = op_check_outbound_rate_limit(
requestObject.url,
rlKey,
isTraced,
);
if (!allowed) {
if (rlResult !== 0xFFFFFFFF) {
const retryAfterMs = rlResult;
const retryHint = retryAfterMs > 0 ? ` Retry after ${retryAfterMs}ms.` : "";
const msg = isTraced
? `Rate limit exceeded for trace ${rlKey}`
: `Rate limit exceeded for function`;
reject(new Deno.errors.RateLimitError(msg));
? `Rate limit exceeded for trace ${rlKey}.${retryHint}`
: `Rate limit exceeded for function.${retryHint}`;
reject(new Deno.errors.RateLimitError(msg, retryAfterMs));
return;
}

Expand Down