Filter AIMessage state updates from streaming output#1863
Filter AIMessage state updates from streaming output#1863MylesShannon wants to merge 1 commit intoNVIDIA:developfrom
Conversation
LangGraph's stream_mode="messages" emits both AIMessageChunk (incremental tokens) and AIMessage (final state update) from the agent node. The _stream_fn was accepting both via isinstance(msg, (AIMessage, AIMessageChunk)), causing the full accumulated response to be emitted as a final chunk after all the individual tokens had already been streamed. Clients saw the complete response duplicated at the end of the SSE stream. Filter to only AIMessageChunk so the state update is excluded. Adds a regression test that confirms AIMessage objects are emitted by the graph stream (the duplicate source) and that filtering to AIMessageChunk excludes them. Signed-off-by: Myles Shannon <mshannon@nvidia.com>
WalkthroughModified the streaming filter in the tool-calling agent to accept only Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py (1)
370-370: Add return type annotation on the new async test.Please annotate the new test with
-> Nonefor consistency with repository typing conventions.As per coding guidelines, "Python methods should use type hints for all parameters and return values (except for return values of
None, in that situation no return type hint is needed)."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py` at line 370, The new async test function test_stream_fn_no_duplicate_content lacks a return type annotation; update its signature to include the return type None (i.e., change to "async def test_stream_fn_no_duplicate_content(...) -> None:") to match the repository typing conventions and ensure consistency with other tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py`:
- Around line 394-400: The aggregation may include non-string msg.content values
which will cause TypeError when calling "".join for chunk_response and
full_response; update the collection logic around AIMessageChunk/msg.content so
that chunk_contents and full_contents only contain strings (e.g., convert values
with str(msg.content) or filter/transform non-string entries) before performing
"".join, referencing the variables chunk_contents, full_contents,
AIMessageChunk, msg.content, chunk_response and full_response to locate and
modify the code.
---
Nitpick comments:
In `@packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py`:
- Line 370: The new async test function test_stream_fn_no_duplicate_content
lacks a return type annotation; update its signature to include the return type
None (i.e., change to "async def test_stream_fn_no_duplicate_content(...) ->
None:") to match the repository typing conventions and ensure consistency with
other tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 7a7c81f9-8d55-442f-8e57-099d31da3ddd
📒 Files selected for processing (2)
packages/nvidia_nat_langchain/src/nat/plugins/langchain/agent/tool_calling_agent/register.pypackages/nvidia_nat_langchain/tests/agent/test_tool_calling.py
| if isinstance(msg, AIMessageChunk) and msg.content: | ||
| chunk_contents.append(msg.content) | ||
| if hasattr(msg, "content") and msg.content: | ||
| full_contents.append(msg.content) | ||
|
|
||
| chunk_response = "".join(chunk_contents) | ||
| full_response = "".join(full_contents) |
There was a problem hiding this comment.
Guard aggregated contents to strings before join.
chunk_contents / full_contents can receive non-string msg.content, which can raise TypeError at Line 399 or Line 400 during "".join(...).
Proposed fix
- if isinstance(msg, AIMessageChunk) and msg.content:
+ if isinstance(msg, AIMessageChunk) and isinstance(msg.content, str) and msg.content:
chunk_contents.append(msg.content)
- if hasattr(msg, "content") and msg.content:
+ if hasattr(msg, "content") and isinstance(msg.content, str) and msg.content:
full_contents.append(msg.content)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/nvidia_nat_langchain/tests/agent/test_tool_calling.py` around lines
394 - 400, The aggregation may include non-string msg.content values which will
cause TypeError when calling "".join for chunk_response and full_response;
update the collection logic around AIMessageChunk/msg.content so that
chunk_contents and full_contents only contain strings (e.g., convert values with
str(msg.content) or filter/transform non-string entries) before performing
"".join, referencing the variables chunk_contents, full_contents,
AIMessageChunk, msg.content, chunk_response and full_response to locate and
modify the code.
Description
LangGraph's stream_mode="messages" emits both AIMessageChunk (incremental tokens) and AIMessage (final state update) from the agent node. The _stream_fn was accepting both via isinstance(msg, (AIMessage, AIMessageChunk)), causing the full accumulated response to be emitted as a final chunk after all the individual tokens had already been streamed. Clients saw the complete response duplicated at the end of the SSE stream.
Filter to only AIMessageChunk so the state update is excluded.
Adds a regression test that confirms AIMessage objects are emitted by the graph stream (the duplicate source) and that filtering to AIMessageChunk excludes them.
By Submitting this PR I confirm:
Summary by CodeRabbit