Skip to content

Conversation

@chenghao-mou
Copy link
Member

@chenghao-mou chenghao-mou commented Jan 17, 2026

Going through detail.dev issue reports and putting some fixes together. Most of them are clean-ups or defensive fixes.

Summary by CodeRabbit

  • Bug Fixes

    • Detect closed channels before reporting capacity errors
    • Recreate HTTP sessions if a previous session was closed
    • Skip processing when speech recognition responses contain no results
  • Improvements

    • Register inference requests earlier and clean up on send failures to avoid leaked pending work
    • Actively consume metric event streams instead of ignoring them
    • Close server resources concurrently during shutdown
    • Cancel lingering non-interrupt tasks after first completion

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 17, 2026

📝 Walkthrough

Walkthrough

Small control-flow and cleanup adjustments across IPC request handling, STT adapters, channel send logic, HTTP session recreation, voice session teardown, speech-task cancellation, and a Google STT response guard.

Changes

Cohort / File(s) Summary
IPC — Future registration & send error handling
livekit-agents/livekit/agents/ipc/inference_proc_executor.py, livekit-agents/livekit/agents/ipc/job_proc_lazy_main.py
Register request future in _active_requests before sending InferenceRequest; wrap send in try/except to cancel/remove the future on send failures and re-raise. Review future lifecycle and race conditions.
STT — Metrics stream consumption
livekit-agents/livekit/agents/stt/fallback_adapter.py, livekit-agents/livekit/agents/stt/stream_adapter.py
_metrics_monitor_task now asynchronously iterates over event_aiter (drains the stream) instead of a no-op. Check for backpressure/latency impacts.
Channel send ordering
livekit-agents/livekit/agents/utils/aio/channel.py
send_nowait now checks for ChanClosed before checking fullness, making closed-state precedence explicit.
HTTP session recreation on closed session
livekit-agents/livekit/agents/utils/http_context.py
Recreate aiohttp.ClientSession if g_session is None or g_session.closed. Verify session lifecycle and connector reuse.
Voice — Concurrent MCP teardown
livekit-agents/livekit/agents/voice/agent_activity.py
_close_session now concurrently closes MCP servers via asyncio.gather(..., return_exceptions=True) after closing audio recognition. Confirm proper error aggregation/handling.
Speech handle — Cancel pending tasks
livekit-agents/livekit/agents/voice/speech_handle.py
wait_if_not_interrupted shields awaited futures, awaits FIRST_COMPLETED, then cancels remaining non-interrupt futures using utils.aio.cancel_and_wait. Check cancellation semantics and resource cleanup.
Google STT — Guard empty results
livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py
_streaming_recognize_response_to_speech_data returns early when resp.results is empty to skip downstream processing.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 I hopped where futures raced the light,

I tucked them safe before their flight.
I sipped the streams, closed doors I swept,
Canceled tails so none were kept.
A cheerful hop — the code sleeps right.

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'Defensive fixes' is overly vague and generic. While it loosely relates to the changeset (which contains multiple defensive/protective measures), it does not clearly communicate the primary changes or specific focus of the PR. Consider a more specific title such as 'Add defensive checks and improve error handling in IPC and audio processing' that conveys the actual scope of changes.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

📜 Recent review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3b9b077 and 6ab6a1e.

📒 Files selected for processing (1)
  • livekit-agents/livekit/agents/voice/speech_handle.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings

Files:

  • livekit-agents/livekit/agents/voice/speech_handle.py
🧬 Code graph analysis (1)
livekit-agents/livekit/agents/voice/speech_handle.py (1)
livekit-agents/livekit/agents/utils/aio/utils.py (1)
  • cancel_and_wait (6-23)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: livekit-plugins-inworld
  • GitHub Check: livekit-plugins-openai
  • GitHub Check: livekit-plugins-deepgram
  • GitHub Check: livekit-plugins-cartesia
  • GitHub Check: unit-tests
  • GitHub Check: type-check (3.13)
  • GitHub Check: type-check (3.9)
🔇 Additional comments (1)
livekit-agents/livekit/agents/voice/speech_handle.py (1)

170-176: Shielding + targeted cancellation looks correct.
This keeps caller-managed futures from being cancelled while still terminating the internal gather on interrupt.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

@chenghao-mou chenghao-mou requested a review from a team January 17, 2026 17:26
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@livekit-agents/livekit/agents/ipc/inference_proc_executor.py`:
- Around line 82-89: The code registers request_id/fut into
self._active_requests before awaiting channel.asend_message, but if
channel.asend_message raises the future remains leaked; wrap the await
channel.asend_message(...) call in a try/except (or try/finally) so that on any
exception you remove the entry from self._active_requests (e.g.
self._active_requests.pop(request_id, None)) and cancel the future
(fut.cancel()) before re-raising the exception; reference the existing symbols
request_id, fut, self._active_requests, and channel.asend_message/
proto.InferenceRequest when making the change.

In `@livekit-agents/livekit/agents/ipc/job_proc_lazy_main.py`:
- Around line 132-138: The code registers a future in self._active_requests
before sending and may leave an orphaned entry if
self._client.send(InferenceRequest(...)) raises; update the block around
request_id/fut/_active_requests/_client.send so that you register request_id ->
fut, then perform the send inside a try/except where on any exception you remove
the entry from self._active_requests and cancel the fut (e.g., fut.cancel() or
fut.set_exception(...)) before re-raising the error; ensure this cleanup runs
for all send failures to avoid leaking orphaned futures.

In `@livekit-agents/livekit/agents/voice/speech_handle.py`:
- Around line 174-176: The pending set currently may include the class-level
self._interrupt_fut which must not be cancelled because that corrupts the
interrupted property; change the cancellation so only the speech-processing
gather future(s) are cancelled—filter out self._interrupt_fut from pending (or
directly identify the gather future created for the speech task) and call
utils.aio.cancel_and_wait only with the remaining futures; ensure
self._interrupt_fut is left untouched so self.interrupted continues to reflect
the real interrupt state.

In `@livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py`:
- Around line 779-781: process_stream currently indexes resp.results[0] before
calling _streaming_recognize_response_to_speech_data, which can raise IndexError
when resp.results is empty; add a guard in process_stream that checks if
resp.results is truthy (e.g., if not resp.results: continue/return/skip) before
accessing resp.results[0], and only call
_streaming_recognize_response_to_speech_data or perform further processing when
resp.results is non-empty so you never index into an empty list.
🧹 Nitpick comments (1)
livekit-agents/livekit/agents/voice/agent_activity.py (1)

713-716: Good concurrent cleanup pattern, but suppressed exceptions could hide errors.

The concurrent asyncio.gather with return_exceptions=True is a sound approach for parallelized cleanup. However, any exceptions during MCP server closure are silently discarded. Consider logging errors for observability, similar to the pattern used in _start_session (lines 512-517).

♻️ Optional: Log any cleanup errors
         if self.mcp_servers:
-            await asyncio.gather(
+            results = await asyncio.gather(
                 *(mcp_server.aclose() for mcp_server in self.mcp_servers), return_exceptions=True
             )
+            for mcp_server, result in zip(self.mcp_servers, results):
+                if isinstance(result, BaseException):
+                    logger.warning(
+                        f"failed to close MCP server {mcp_server}",
+                        exc_info=result,
+                    )
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d97b05 and 89a47a6.

📒 Files selected for processing (9)
  • livekit-agents/livekit/agents/ipc/inference_proc_executor.py
  • livekit-agents/livekit/agents/ipc/job_proc_lazy_main.py
  • livekit-agents/livekit/agents/stt/fallback_adapter.py
  • livekit-agents/livekit/agents/stt/stream_adapter.py
  • livekit-agents/livekit/agents/utils/aio/channel.py
  • livekit-agents/livekit/agents/utils/http_context.py
  • livekit-agents/livekit/agents/voice/agent_activity.py
  • livekit-agents/livekit/agents/voice/speech_handle.py
  • livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings

Files:

  • livekit-agents/livekit/agents/voice/speech_handle.py
  • livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py
  • livekit-agents/livekit/agents/stt/fallback_adapter.py
  • livekit-agents/livekit/agents/voice/agent_activity.py
  • livekit-agents/livekit/agents/utils/http_context.py
  • livekit-agents/livekit/agents/ipc/job_proc_lazy_main.py
  • livekit-agents/livekit/agents/ipc/inference_proc_executor.py
  • livekit-agents/livekit/agents/utils/aio/channel.py
  • livekit-agents/livekit/agents/stt/stream_adapter.py
🧬 Code graph analysis (3)
livekit-agents/livekit/agents/voice/speech_handle.py (2)
livekit-agents/livekit/agents/utils/aio/wait_group.py (1)
  • wait (29-30)
livekit-agents/livekit/agents/utils/aio/utils.py (1)
  • cancel_and_wait (6-23)
livekit-agents/livekit/agents/voice/agent_activity.py (2)
livekit-agents/livekit/agents/voice/agent_session.py (1)
  • mcp_servers (386-387)
livekit-agents/livekit/agents/voice/agent.py (1)
  • mcp_servers (547-557)
livekit-agents/livekit/agents/utils/http_context.py (2)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • closed (152-153)
livekit-agents/livekit/agents/voice/transcription/synchronizer.py (1)
  • closed (163-164)
🔇 Additional comments (4)
livekit-agents/livekit/agents/stt/fallback_adapter.py (1)

435-437: LGTM — draining the metrics stream avoids backpressure.

livekit-agents/livekit/agents/utils/aio/channel.py (1)

90-96: LGTM — ChanClosed should take precedence.

livekit-agents/livekit/agents/stt/stream_adapter.py (1)

93-95: LGTM — draining the metrics stream is safer.

livekit-agents/livekit/agents/utils/http_context.py (1)

19-19: LGTM! Good defensive fix for handling closed sessions.

This correctly handles the edge case where a session may have been closed externally or due to errors. The short-circuit evaluation ensures g_session.closed is only accessed when g_session is not None.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link
Member

@davidzhao davidzhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are solid fixes. lgtm

if pending:
await utils.aio.cancel_and_wait(*pending)
if cancellable := [fut for fut in pending if fut is not self._interrupt_fut]:
await utils.aio.cancel_and_wait(*cancellable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should let the caller to manage the awaitables instead of cancelling them whenever the speech is interrupted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think the intention was to cancel the gather only, not each awaitable. I will wrap them in shield instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants