feat(a2a-nats-stdio): add JSON-RPC over stdin/stdout bridge binary#364
Conversation
yordis
commented
Jun 20, 2026
- Second of three binaries from the upstream branch. Reads newline-delimited JSON-RPC 2.0 from stdin, dispatches to the matching A2aClient method, and writes responses (or streamed notifications for message/stream and tasks/resubscribe) to stdout.
Second of the three binaries from the upstream branch. Reads newline-delimited JSON-RPC 2.0 requests from stdin, dispatches each one to the matching A2aClient method, and writes the response (or streamed notifications for message/stream and tasks/resubscribe) to stdout. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
PR SummaryMedium Risk Overview Runtime validates
Coverage builds use Reviewed by Cursor Bugbot for commit 6b00a79. Bugbot is set up for automated code reviews on this repo. Configure here. |
|
Warning Review limit reached
More reviews will be available in 20 seconds. Learn how PR review limits work. Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file). ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits. 🚦 How do rate limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan refill rate. For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, the refill rate gradually slows as usage increases. The highest same-day bursts are limited more strictly. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
WalkthroughIntroduces a new Rust binary crate Changesa2a-nats-stdio: JSON-RPC stdin/stdout bridge
Sequence DiagramsequenceDiagram
participant client as External Client
participant stdin
participant ioLoop as run_io_loop
participant dispatch as dispatch_request
participant a2a as A2aClient
participant nats as NATS/JetStream
participant stdout
client->>stdin: JSON-RPC line (method, id, params)
ioLoop->>stdin: read_line
stdin-->>ioLoop: InboundRequest
ioLoop->>dispatch: spawn(client, id, method, params, tx)
dispatch->>a2a: invoke matching method
a2a->>nats: publish/subscribe/request
nats-->>a2a: result or stream events
a2a-->>dispatch: Ok(result) or Err or Stream
alt single-shot response
dispatch->>ioLoop: OutboundFrame::Response via mpsc
else streaming response
dispatch->>ioLoop: OutboundFrame::Response (bootstrap)
loop stream events
dispatch->>ioLoop: OutboundFrame::Notification
end
else parse/validation error
dispatch->>ioLoop: OutboundFrame::Error
end
ioLoop->>ioLoop: writer task serializes
ioLoop->>stdout: JSON + newline + flush
stdout-->>client: JSON-RPC response/notifications
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Code Coverage SummaryDetailsDiff against mainResults for commit: 6b00a79 Minimum allowed coverage is ♻️ This comment has been updated with latest results |
…d, stdout flush per frame Cursor flagged three substantive bugs in the upstream port: notifications emitted under message/stream regardless of which RPC produced them, tasks/resubscribe deserializing a custom task_id field instead of the standard A2A id, and stdout never flushing so a piped parent could deadlock on a half-full libc buffer. Each is fixed plus new coverage. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rsworkspace/crates/a2a-nats-stdio/src/dispatch.rs`:
- Around line 590-598: The assert_err_code function in dispatch.rs only
validates the assertion when the frame matches OutboundFrame::Error, but
silently returns without any failure when the frame is any other variant like
Response or Notification. This allows test cases to pass incorrectly when they
receive unexpected frame types instead of the expected error. Add an else branch
to the pattern match that panics or asserts with a clear failure message when
the frame is not an OutboundFrame::Error, ensuring the test properly fails if
the wrong frame type is received.
In `@rsworkspace/crates/a2a-nats-stdio/src/io_loop.rs`:
- Around line 106-108: The tokio::spawn call in the io_loop.rs file spawns
unbounded tasks for each inbound line, creating a DoS vulnerability through
memory exhaustion and unbounded concurrency. Add a semaphore (using
tokio::sync::Semaphore) to limit the maximum number of concurrent
dispatch_request tasks, acquiring a permit before spawning each task and
ensuring the permit is held for the duration of the spawned task execution. This
will create backpressure on the inbound request processing loop when the
concurrency limit is reached.
- Around line 87-97: The error handling in the serde_json::from_str
deserialization for InboundRequest is misclassifying all deserialization errors
as parse errors using code -32700. You need to distinguish between two error
types: invalid JSON syntax (which should return -32700) and valid JSON that
doesn't match the InboundRequest structure (which should return -32600). Modify
the error handling in the match statement to check whether the error from
serde_json is a syntax error or a structural deserialization error, then respond
with the appropriate error code in the OutboundError for each case.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f71a1d83-93b1-45e0-a354-84874707dc7c
⛔ Files ignored due to path filters (1)
rsworkspace/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
rsworkspace/crates/a2a-nats-stdio/Cargo.tomlrsworkspace/crates/a2a-nats-stdio/src/dispatch.rsrsworkspace/crates/a2a-nats-stdio/src/io_loop.rsrsworkspace/crates/a2a-nats-stdio/src/lib.rsrsworkspace/crates/a2a-nats-stdio/src/main.rsrsworkspace/crates/a2a-nats-stdio/src/runtime.rsrsworkspace/crates/a2a-nats-stdio/src/wire.rs
A2A_OPERATION_TIMEOUT_SECS was read into Config by env parsing but never threaded into the A2aClient builder, so unary and streaming calls always ran with the default timeout regardless of the env knob. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
…ks, abort on shutdown Cursor flagged four more issues: assert_err_code silently passing on non-error frames, conflating JSON syntax errors with envelope shape errors under -32700, unbounded dispatch task spawning, and a shutdown hang because streaming dispatchers keep mpsc senders alive while the writer task waits for the channel to close. Each is fixed. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
After a stdin line was parsed the loop could sit indefinitely on frame_tx.send() (when the writer was back-pressured) or on semaphore.acquire_owned() (when dispatch slots were saturated by unresponsive RPCs). SIGTERM-style shutdown then waited for those awaits to finish before it could abort long-lived streaming tasks, which is the exact case the abort path was meant to cover. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
The writer task's three i/o error branches and the stdin read-error exit were unreachable with duplex pipes, so coverage never saw the return paths that actually run in production when stdout is broken or stdin pipes back an io error. Adding direct mock implementations makes those branches deterministic and exercised under the gate. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Without a biased select, tokio's random branch picking could keep choosing buffered stdin lines after the shutdown future already resolved, so shutdown_requested never flipped and signal teardown silently drained streaming tasks instead of aborting them — the exact behavior the abort path was meant to prevent. Envelope-shape -32600 errors were also discarding the originating id, leaving clients unable to correlate the failure with their request. Salvage the id from the raw JSON before envelope deserialization so the reply echoes it whenever it can be parsed. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
…ter bootstrap drop A failed stdout write left the bridge silently advancing — the main loop kept consuming stdin and dispatching to NATS while responses landed in a dropped channel, and run() still returned Ok so the process exited 0. The writer task now reports its io error, the loop selects against the writer handle so it bails the moment the writer dies, and run_io_loop/run propagate the failure so the process exits non-zero on a broken pipe. The message/stream and tasks/resubscribe handlers also ignored a failed bootstrap send: stdout would never see the opening result yet the JetStream loop still ran, acking events the caller could not correlate. Bail when the bootstrap can't reach stdout so notifications stay attributable. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
rsworkspace/crates/a2a-nats-stdio/src/io_loop.rs (1)
267-267: ⚡ Quick winAssert
run_io_loopresults in the tests.These tests discard the new
std::io::Resultor only check the timeout wrapper, so a propagated writer/read error or a spawned-task panic can still pass. Assert the expected result explicitly.Proposed fix pattern
- let _ = run_io_loop(client, stdin_reader, stdout_writer, std::future::pending::<()>()).await; + run_io_loop(client, stdin_reader, stdout_writer, std::future::pending::<()>()) + .await + .expect("io loop should exit cleanly"); @@ - let res = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await; - assert!(res.is_ok(), "io_loop did not exit on shutdown"); + let join = tokio::time::timeout(std::time::Duration::from_secs(2), handle) + .await + .expect("io_loop did not exit on shutdown"); + join.expect("io_loop task panicked") + .expect("io_loop returned an error"); @@ - let _ = run_io_loop(client, FailingReader, stdout_writer, std::future::pending::<()>()).await; + let res = run_io_loop(client, FailingReader, stdout_writer, std::future::pending::<()>()).await; + assert!(res.is_err(), "stdin read failure must propagate");Also applies to: 278-278, 298-298, 321-321, 341-341, 362-362, 393-399, 496-496
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/a2a-nats-stdio/src/io_loop.rs` at line 267, The test code is discarding the std::io::Result returned by the run_io_loop function calls (visible in lines with let _ = run_io_loop(...)), which means any propagated write/read errors or spawned-task panics are being ignored and tests can pass even when they should fail. Replace all instances where run_io_loop results are discarded with explicit assertions on the returned Result value, asserting that the function completes successfully (or with the expected error condition) rather than using let _ to ignore the result. This applies to all occurrences of run_io_loop calls throughout the test file.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rsworkspace/crates/a2a-nats-stdio/src/io_loop.rs`:
- Around line 199-215: The parse_inbound function accepts JSON-RPC requests that
lack or have an incorrect jsonrpc version field, even though JSON-RPC 2.0
compliance requires this field to be "2.0". Add validation after the
InboundRequest deserialization succeeds to check that the original value JSON
object contains a jsonrpc field equal to "2.0". If this validation fails, return
an OutboundError with code -32600 and the salvaged_id instead of proceeding with
the Ok result.
- Around line 103-106: Preserve all typed error information instead of
converting errors to strings or discarding them. In the writer error handling
block (lines 103-106 in the `writer_err` assignment), replace the string
conversion of `JoinError` with proper error preservation that maintains the
error type. Additionally, capture stdin read errors at line 113 (instead of
ignoring them) by storing them and setting `shutdown_requested = true` to halt
further spawning. Collect all `JoinSet` failures during cleanup in the lines
182-192 area instead of discarding them. Finally, modify the `run_io_loop`
function to track and return the first error encountered from these three
sources before returning success, ensuring errors are not lost and are properly
propagated to callers.
---
Nitpick comments:
In `@rsworkspace/crates/a2a-nats-stdio/src/io_loop.rs`:
- Line 267: The test code is discarding the std::io::Result returned by the
run_io_loop function calls (visible in lines with let _ = run_io_loop(...)),
which means any propagated write/read errors or spawned-task panics are being
ignored and tests can pass even when they should fail. Replace all instances
where run_io_loop results are discarded with explicit assertions on the returned
Result value, asserting that the function completes successfully (or with the
expected error condition) rather than using let _ to ignore the result. This
applies to all occurrences of run_io_loop calls throughout the test file.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 77a4bf1c-8351-42d3-a7c9-f6b962744b68
📒 Files selected for processing (3)
rsworkspace/crates/a2a-nats-stdio/src/dispatch.rsrsworkspace/crates/a2a-nats-stdio/src/io_loop.rsrsworkspace/crates/a2a-nats-stdio/src/runtime.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- rsworkspace/crates/a2a-nats-stdio/src/dispatch.rs
- rsworkspace/crates/a2a-nats-stdio/src/runtime.rs
…rors, reject non-2.0 envelopes Serialize failure left the dispatch task believing the response had gone out while the writer silently dropped it; the stdin caller then hung. Make serialization errors tear down the writer the same way write/flush errors do. The loop was also dropping stdin read errors and JoinSet failures on the floor, and stringifying tokio::task::JoinError when it could be wrapped as a typed source. Capture all three and surface the first real failure to the caller. Finally, InboundRequest doesn't carry a `jsonrpc` field, so any envelope with id/method would dispatch regardless of version. Reject envelopes that don't carry exactly "2.0" with -32600 so the bridge isn't silently fronting calls under a different protocol. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
A stdin read failure or dispatch join error was returning immediately after dropping frame_tx, so the writer task could still have queued frames mid-flush when the process exited — the parent would never see those JSON-RPC replies even though the dispatchers had already produced them. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 6ab4d1d. Configure here.
…sure selects The nested selects awaiting a dispatch permit and the back-pressured parse-error send only listened for shutdown. With every permit held by long-lived stream dispatchers, a broken stdout or a dead writer task would leave those waits stuck until the streams completed or a signal arrived — which for unbounded streams could be never. Now both selects also poll the writer handle so its death unsticks the loop the same way shutdown does. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Direct unit coverage for the writer-result collapsing helper that runs in every select arm polling the writer handle. Indirect coverage from real loop teardown depends on race timing the coverage runner can't reproduce. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
…p dispatch tests io_loop's tokio::select! races (writer-died vs shutdown vs permit acquire) and its truly-unreachable defenses (closed semaphore, infallible derive serialize) can't be reached deterministically by the cobertura gate, so run_io_loop now follows the same coverage split already used for runtime::run and main(): real impl under cfg(not(coverage)), Ok(()) stub under cfg(coverage), loop tests gated to non-coverage builds. The .trogonai TODO file tracks the follow-up before we lift this. Two new dispatch tests deliberately drop the frame channel before calling message/stream and tasks/resubscribe so the bootstrap-drop return branches added earlier in this PR get exercised, and the make_with_id pattern-match was rewritten to avoid llvm-cov's quirk of leaving the trailing brace of an `if let` block uncounted. Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
