Skip to content

fix(live): prevent zombie WebSocket session after LiveRequestQueue.close()#5226

Open
TonyLee-AI wants to merge 2 commits intogoogle:mainfrom
TonyLee-AI:fix/live-zombie-session-on-queue-close
Open

fix(live): prevent zombie WebSocket session after LiveRequestQueue.close()#5226
TonyLee-AI wants to merge 2 commits intogoogle:mainfrom
TonyLee-AI:fix/live-zombie-session-on-queue-close

Conversation

@TonyLee-AI
Copy link
Copy Markdown

@TonyLee-AI TonyLee-AI commented Apr 9, 2026

Fixes #5228

Problem

Calling LiveRequestQueue.close() is the documented way to shut down a live
session from the client side. However, run_live()'s while True: reconnect
loop had no awareness of this intentional shutdown.

When close() is called, _send_to_model sends a WebSocket close frame
(code 1000). _receive_from_model then surfaces this as either an
APIError(1000) or a ConnectionClosed event. The loop's exception handlers
responded in one of two unintended ways:

Condition Old behaviour Expected
close() called, with session-resumption handle reconnects → zombie session terminate cleanly
close() called, no session-resumption handle raises spurious error terminate cleanly

The zombie session stays open until Google's server forces a timeout after ~2 hours,
at which point a 1006 abnormal closure error is logged to the application —
long after the user's call has ended.

Root cause

run_live()'s exception handlers could not distinguish between:

  • An intentional close initiated by LiveRequestQueue.close() (code 1000)
  • A genuine network drop or server-side close (code 1006 etc.)

Both the ConnectionClosed and APIError(1000) paths lacked this distinction.

Fix

live_request_queue.py — add is_closed property backed by a synchronous flag:

class LiveRequestQueue:

  def __init__(self):
    self._queue = asyncio.Queue()
    self._closed = False          # added

  @property
  def is_closed(self) -> bool:   # added
    """Returns True if close() has been called on this queue."""
    return self._closed

  def close(self):
    self._closed = True           # added (set synchronously before enqueue)
    self._queue.put_nowait(LiveRequest(close=True))

Setting the flag synchronously before any await guarantees it is already
True by the time any connection-close exception propagates back to
run_live()'s handlers — no asyncio scheduling race.

base_llm_flow.py — check is_closed before the reconnect logic in both exception handlers:

except (ConnectionClosed, ConnectionClosedOK) as e:
    # An intentional close via LiveRequestQueue.close() may surface as a
    # ConnectionClosed event.  Do not reconnect in that case.
    if invocation_context.live_request_queue.is_closed:   # added
        logger.info(
            'Live session for agent %s closed by client request.',
            invocation_context.agent.name,
        )
        return
    # If we have a session resumption handle, we attempt to reconnect.
    if invocation_context.live_session_resumption_handle:
        ...

except errors.APIError as e:
    # Error code 1000 indicates a normal (intentional) closure.  If the
    # client called LiveRequestQueue.close(), do not treat this as an error
    # and do not attempt to reconnect regardless of session handle state.
    if e.code == 1000 and invocation_context.live_request_queue.is_closed:  # added
        logger.info(
            'Live session for agent %s closed by client request.',
            invocation_context.agent.name,
        )
        return
    # Error code 1000 and 1006 indicates a recoverable connection drop.
    if e.code in [1000, 1006]:
        ...

A trailing guard at the bottom of the loop body covers the edge case where the
receive generator exhausts without raising.

Behaviour after this fix

Scenario Before After
close() called, no session handle raises error terminates cleanly
close() called, session handle present reconnects (zombie) terminates cleanly
Network drop, session handle present reconnects reconnects (unchanged)
Network drop, no session handle raises raises (unchanged)

Logs

Before fix

Intentional close, session handle present → zombie reconnect:

INFO  base_llm_flow: Establishing live connection for agent: my_agent
INFO  base_llm_flow: Update session resumption handle: new_handle='abc123' ...
INFO  base_llm_flow: Connection lost (1000 ...), reconnecting with session handle.
INFO  base_llm_flow: Attempting to reconnect (Attempt 1)...
INFO  base_llm_flow: Establishing live connection for agent: my_agent
# zombie session open — Gemini server terminates it ~2 hours later:
ERROR base_llm_flow: APIError in live flow: 1006 None. abnormal closure [internal]

Intentional close, no session handle → spurious error:

INFO  base_llm_flow: Establishing live connection for agent: my_agent
ERROR base_llm_flow: APIError in live flow: 1000 None.
# normal client-initiated close logged as an error

After fix

Intentional close (APIError 1000 path) — terminates cleanly, no reconnect:

INFO  base_llm_flow: Establishing live connection for agent: my_agent
INFO  base_llm_flow: Update session resumption handle: new_handle='abc123' ...
INFO  base_llm_flow: Live session for agent my_agent closed by client request.

Intentional close (ConnectionClosed path) — terminates cleanly, no reconnect:

INFO  base_llm_flow: Establishing live connection for agent: my_agent
INFO  base_llm_flow: Update session resumption handle: new_handle='abc123' ...
INFO  base_llm_flow: Live session for agent my_agent closed by client request.

Genuine network drop with handle — reconnection still works (regression guard):

INFO  base_llm_flow: Establishing live connection for agent: my_agent
INFO  base_llm_flow: Update session resumption handle: new_handle='abc123' ...
INFO  base_llm_flow: Connection closed (...), reconnecting with session handle.
INFO  base_llm_flow: Attempting to reconnect (Attempt 1)...
INFO  base_llm_flow: Establishing live connection for agent: my_agent

Testing plan

  • Added is_closed property tests in tests/unittests/agents/test_live_request_queue.py:
    • test_is_closed_initially_false — property starts as False
    • test_is_closed_true_after_close — property becomes True after close()
    • test_is_closed_not_affected_by_other_sends — other sends do not set the flag
  • Added run_live zombie session tests in tests/unittests/flows/llm_flows/test_base_llm_flow.py:
    • test_run_live_no_reconnect_after_queue_close_api_error_1000APIError(1000) after close() terminates without reconnecting
    • test_run_live_no_reconnect_after_queue_close_connection_closedConnectionClosed after close() terminates without reconnecting
    • test_run_live_still_reconnects_on_unintentional_drop_with_handle — genuine network drop (no close()) still reconnects — regression guard
  • All tests pass:
$ pytest tests/unittests/flows/llm_flows/test_base_llm_flow.py \
         tests/unittests/agents/test_live_request_queue.py -v

tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_run_live_reconnects_on_connection_closed PASSED
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_run_live_reconnects_on_api_error PASSED
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_run_live_skips_send_history_on_resumption PASSED
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_live_session_resumption_go_away PASSED
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_run_live_no_reconnect_without_handle PASSED
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_run_live_reconnect_limit PASSED
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_run_live_reconnect_reset_attempt PASSED
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_run_live_no_reconnect_after_queue_close_api_error_1000 PASSED
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_run_live_no_reconnect_after_queue_close_connection_closed PASSED
tests/unittests/flows/llm_flows/test_base_llm_flow.py::test_run_live_still_reconnects_on_unintentional_drop_with_handle PASSED
tests/unittests/agents/test_live_request_queue.py::test_close_queue PASSED
tests/unittests/agents/test_live_request_queue.py::test_is_closed_initially_false PASSED
tests/unittests/agents/test_live_request_queue.py::test_is_closed_true_after_close PASSED
tests/unittests/agents/test_live_request_queue.py::test_is_closed_not_affected_by_other_sends PASSED
tests/unittests/agents/test_live_request_queue.py::test_send_content PASSED
tests/unittests/agents/test_live_request_queue.py::test_send_realtime PASSED
tests/unittests/agents/test_live_request_queue.py::test_send PASSED
tests/unittests/agents/test_live_request_queue.py::test_get PASSED

36 passed, 1 warning in 1.29s

Real-world impact — Google Cloud Support case

This bug was independently reported to Google Cloud Support (January 2026) by a user building a voice agent on Vertex AI. The observed symptom was:

"The operation was cancelled." (gRPC code 1) logged in Cloud Audit Logs every ~10 minutes, indefinitely — even after LiveRequestQueue.close() was called and the async for loop consuming run_live() had completed normally.

The ~10-minute interval matches Gemini Live's server-side idle timeout: the zombie session is terminated by the server, while True: immediately reconnects, and the cycle repeats forever.

Google Cloud Support confirmed (Pratik, Google Cloud Support):

"No, simply pushing a 'close' message or sentinel to the LiveRequestQueue is not sufficient to fully terminate the underlying bidirectional streaming connection (WebSockets). Doing so only signals to the agent logic on the server-side that your client has finished sending data. It does not close the network connection itself."

The support response recommended using the async with pattern and referenced Discussion #4156 as a community-identified workaround. This PR provides the proper fix at the framework level so that LiveRequestQueue.close() behaves as documented — terminating the session cleanly without requiring callers to manage task cancellation externally.

Related

  • Discussed in GitHub Discussion #4156 — "True model stop via BIDI streaming and LiveRequestQueue"
  • Google Cloud Support confirmed: "simply pushing a 'close' message or sentinel to the LiveRequestQueue is not sufficient to fully terminate the underlying bidirectional streaming connection"
  • This is distinct from PR fix: session resumption reconnection loop never iterates #5007 which fixed the opposite direction (session resumption loop never iterating)

@google-cla
Copy link
Copy Markdown

google-cla bot commented Apr 9, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@adk-bot adk-bot added the live [Component] This issue is related to live, voice and video chat label Apr 9, 2026
@adk-bot
Copy link
Copy Markdown
Collaborator

adk-bot commented Apr 9, 2026

Response from ADK Triaging Agent

Hello @TonyLee-AI, thank you for your contribution!

It looks like you haven't signed the Contributor License Agreement (CLA) yet. Please make sure to sign it so we can proceed with reviewing your PR.

Thanks!

…ose()

Calling `LiveRequestQueue.close()` is the documented way to shut down a live
session from the client side.  However, `run_live()`'s `while True:` reconnect
loop had no awareness of this intentional shutdown: when the resulting
APIError(1000) / ConnectionClosed event arrived it would either reconnect (if a
session-resumption handle was present) or raise a spurious error (if no handle
was present), in both cases creating a long-lived zombie WebSocket connection
that Gemini eventually terminates after ~2 hours with a 1006 error.

Fix
---
* Add `is_closed: bool` property to `LiveRequestQueue` backed by a simple
  boolean flag that is set synchronously in `close()` *before* the sentinel is
  enqueued.  The synchronous flag avoids any asyncio scheduling race: by the
  time any connection-close exception reaches `run_live()`'s handlers, the
  flag is already True.
* In `run_live()`, check `live_request_queue.is_closed` in both the
  `ConnectionClosed` and `APIError(1000)` exception handlers.  When the queue
  is closed, log an info message and `return` instead of reconnecting or
  raising.  A trailing guard at the bottom of the loop body covers the (less
  common) case where the receive generator returns normally without raising.

Behaviour after this fix
------------------------
| Scenario                                   | Before        | After     |
|--------------------------------------------|---------------|-----------|
| `close()` called, no session handle        | raises error  | terminates cleanly |
| `close()` called, session handle present   | reconnects    | terminates cleanly |
| Network drop, session handle present       | reconnects    | reconnects (unchanged) |
| Network drop, no session handle            | raises        | raises (unchanged) |

Tests
-----
* `test_is_closed_initially_false` — property starts False
* `test_is_closed_true_after_close` — property becomes True after close()
* `test_is_closed_not_affected_by_other_sends` — other sends don't set it
* `test_run_live_no_reconnect_after_queue_close_api_error_1000` — APIError(1000)
  after close() → terminates, connect called once
* `test_run_live_no_reconnect_after_queue_close_connection_closed` — same for
  ConnectionClosed variant
* `test_run_live_still_reconnects_on_unintentional_drop_with_handle` — genuine
  network drop without close() still reconnects (regression guard)
@TonyLee-AI TonyLee-AI force-pushed the fix/live-zombie-session-on-queue-close branch from 6c277e7 to b2024e2 Compare April 9, 2026 09:30
@adk-bot
Copy link
Copy Markdown
Collaborator

adk-bot commented Apr 9, 2026

Response from ADK Triaging Agent

Hello @TonyLee-AI, thank you for your contribution!

I see this PR is a bug fix. As per our contribution guidelines, could you please create a GitHub issue for this bug and associate it with this PR? If an issue already exists, please link it.

This helps us with tracking. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

live [Component] This issue is related to live, voice and video chat

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug(live): zombie WebSocket session after LiveRequestQueue.close() — periodic 'operation cancelled' errors in Cloud Audit Logs

3 participants