Skip to content

Filter AIMessage state updates from streaming output#1863

Open
MylesShannon wants to merge 1 commit intoNVIDIA:developfrom
MylesShannon:fix/stream-duplicate-message
Open

Filter AIMessage state updates from streaming output#1863
MylesShannon wants to merge 1 commit intoNVIDIA:developfrom
MylesShannon:fix/stream-duplicate-message

Conversation

@MylesShannon
Copy link
Copy Markdown
Contributor

@MylesShannon MylesShannon commented Apr 11, 2026

Description

LangGraph's stream_mode="messages" emits both AIMessageChunk (incremental tokens) and AIMessage (final state update) from the agent node. The _stream_fn was accepting both via isinstance(msg, (AIMessage, AIMessageChunk)), causing the full accumulated response to be emitted as a final chunk after all the individual tokens had already been streamed. Clients saw the complete response duplicated at the end of the SSE stream.

Filter to only AIMessageChunk so the state update is excluded.

Adds a regression test that confirms AIMessage objects are emitted by the graph stream (the duplicate source) and that filtering to AIMessageChunk excludes them.

By Submitting this PR I confirm:

  • I am familiar with the Contributing Guidelines.
  • We require that all contributors "sign-off" on their commits. This certifies that the contribution is your original work, or you have rights to submit it under the same license, or a compatible license.
    • Any contribution which contains commits that are not Signed-Off will not be accepted.
  • When the PR is ready for review, new or existing tests cover these changes.
  • When the PR is ready for review, the documentation is up to date with these changes.

Summary by CodeRabbit

  • Bug Fixes
    • Fixed streaming behavior to prevent duplicate content in streamed responses during agent message processing.

LangGraph's stream_mode="messages" emits both AIMessageChunk (incremental
tokens) and AIMessage (final state update) from the agent node. The
_stream_fn was accepting both via isinstance(msg, (AIMessage, AIMessageChunk)),
causing the full accumulated response to be emitted as a final chunk after
all the individual tokens had already been streamed. Clients saw the
complete response duplicated at the end of the SSE stream.

Filter to only AIMessageChunk so the state update is excluded.

Adds a regression test that confirms AIMessage objects are emitted by the
graph stream (the duplicate source) and that filtering to AIMessageChunk
excludes them.

Signed-off-by: Myles Shannon <mshannon@nvidia.com>
@MylesShannon MylesShannon requested a review from a team as a code owner April 11, 2026 17:38
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot bot commented Apr 11, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 11, 2026

Walkthrough

Modified the streaming filter in the tool-calling agent to accept only AIMessageChunk instances instead of both AIMessage and AIMessageChunk, preventing non-chunk messages from producing streamed deltas. Added a regression test to verify prior assistant replies do not appear in streamed chunk responses.

Changes

Cohort / File(s) Summary
Streaming Filter Update
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/register.py
Removed AIMessage import and modified the streaming filter to accept only AIMessageChunk instances, preventing non-chunk AIMessage messages from being processed during graph.astream() calls.
Regression Test
packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py
Added test_stream_fn_no_duplicate_content() async test that validates streamed AIMessageChunk instances do not duplicate prior assistant message content, while full responses retain complete message history.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Filter AIMessage state updates from streaming output' is concise (52 chars), descriptive, uses imperative mood, and accurately summarizes the main change: restricting the streaming filter to exclude AIMessage objects.
Docstring Coverage ✅ Passed Docstring coverage is 80.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py (1)

370-370: Add return type annotation on the new async test.

Please annotate the new test with -> None for consistency with repository typing conventions.

As per coding guidelines, "Python methods should use type hints for all parameters and return values (except for return values of None, in that situation no return type hint is needed)."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py` at line 370,
The new async test function test_stream_fn_no_duplicate_content lacks a return
type annotation; update its signature to include the return type None (i.e.,
change to "async def test_stream_fn_no_duplicate_content(...) -> None:") to
match the repository typing conventions and ensure consistency with other tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py`:
- Around line 394-400: The aggregation may include non-string msg.content values
which will cause TypeError when calling "".join for chunk_response and
full_response; update the collection logic around AIMessageChunk/msg.content so
that chunk_contents and full_contents only contain strings (e.g., convert values
with str(msg.content) or filter/transform non-string entries) before performing
"".join, referencing the variables chunk_contents, full_contents,
AIMessageChunk, msg.content, chunk_response and full_response to locate and
modify the code.

---

Nitpick comments:
In `@packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py`:
- Line 370: The new async test function test_stream_fn_no_duplicate_content
lacks a return type annotation; update its signature to include the return type
None (i.e., change to "async def test_stream_fn_no_duplicate_content(...) ->
None:") to match the repository typing conventions and ensure consistency with
other tests.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 7a7c81f9-8d55-442f-8e57-099d31da3ddd

📥 Commits

Reviewing files that changed from the base of the PR and between ee7ab31 and ad84d8b.

📒 Files selected for processing (2)
  • packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/register.py
  • packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py

Comment on lines +394 to +400
if isinstance(msg, AIMessageChunk) and msg.content:
chunk_contents.append(msg.content)
if hasattr(msg, "content") and msg.content:
full_contents.append(msg.content)

chunk_response = "".join(chunk_contents)
full_response = "".join(full_contents)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard aggregated contents to strings before join.

chunk_contents / full_contents can receive non-string msg.content, which can raise TypeError at Line 399 or Line 400 during "".join(...).

Proposed fix
-        if isinstance(msg, AIMessageChunk) and msg.content:
+        if isinstance(msg, AIMessageChunk) and isinstance(msg.content, str) and msg.content:
             chunk_contents.append(msg.content)
-        if hasattr(msg, "content") and msg.content:
+        if hasattr(msg, "content") and isinstance(msg.content, str) and msg.content:
             full_contents.append(msg.content)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py` around lines
394 - 400, The aggregation may include non-string msg.content values which will
cause TypeError when calling "".join for chunk_response and full_response;
update the collection logic around AIMessageChunk/msg.content so that
chunk_contents and full_contents only contain strings (e.g., convert values with
str(msg.content) or filter/transform non-string entries) before performing
"".join, referencing the variables chunk_contents, full_contents,
AIMessageChunk, msg.content, chunk_response and full_response to locate and
modify the code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant