Skip to content

Commit cdc5e6b

Browse files
committed
fix(google-stt): reconnect stream on server-initiated closure from VoiceActivityTimeout
When speech_end_timeout (or speech_start_timeout) fires, Google closes the gRPC stream server-side. The previous reconnect loop treated this as a normal stream end and broke out, killing the SpeechStream permanently after the first turn. Fix: after process_stream_task completes, check self._input_ch.closed. If the input channel is still open, Google closed the stream unexpectedly — reconnect. Only break when the client has explicitly closed the input channel. Also corrects misleading docstrings and README that described speech_end_timeout as "seconds of silence before marking utterance as complete" — it actually controls stream lifetime (Google closes the stream), not VAD or endpointing. Fixes #4804
1 parent 78a2cc2 commit cdc5e6b

File tree

3 files changed

+119
-3
lines changed

3 files changed

+119
-3
lines changed

livekit-plugins/livekit-plugins-google/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,27 @@ To use the STT and TTS API, you'll need to enable the respective services for yo
1919
- Cloud Speech-to-Text API
2020
- Cloud Text-to-Speech API
2121

22+
## Google STT — Voice Activity Timeouts
23+
24+
`speech_start_timeout` and `speech_end_timeout` control Google's server-side stream lifecycle, **not** VAD or endpointing. When a timeout fires, Google closes the gRPC stream; the plugin automatically reconnects if the session is still active.
25+
26+
| Parameter | What it does |
27+
|---|---|
28+
| `speech_start_timeout` | Google closes the stream if no speech begins within this many seconds |
29+
| `speech_end_timeout` | Google closes the stream if silence lasts this many seconds after speech ends |
30+
31+
Because reconnecting adds a small overhead, set `speech_end_timeout` to the minimum silence you're willing to accept before the stream resets (e.g. `0.5``1.0` seconds). This can reduce perceived latency for short utterances like "hi" with `chirp_3`, at the cost of a reconnect between turns.
32+
33+
```python
34+
stt = google.STT(
35+
model="chirp_3",
36+
speech_start_timeout=10.0, # close stream if user doesn't speak within 10s
37+
speech_end_timeout=0.8, # close stream 800ms after speech ends, then reconnect
38+
)
39+
```
40+
41+
> **Note:** These parameters only work with V2 API models (e.g. `chirp_3`). They are silently ignored for V1 models.
42+
2243
## Live API model support
2344

2445
LiveKit supports both Gemini Live API on both Gemini Developer API as well as Vertex AI. However, be aware they have slightly different behavior and use different model names.

livekit-plugins/livekit-plugins-google/livekit/plugins/google/stt.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,11 @@ def __init__(
182182
credentials_info(dict): the credentials info to use for recognition (default: None)
183183
credentials_file(str): the credentials file to use for recognition (default: None)
184184
keywords(List[tuple[str, float]]): list of keywords to recognize (default: None)
185-
speech_start_timeout(float): maximum seconds to wait for speech to begin before timeout (default: None)
186-
speech_end_timeout(float): seconds of silence before marking utterance as complete (default: None)
185+
speech_start_timeout(float): seconds to wait before Google closes the stream if no speech begins.
186+
The stream auto-reconnects if audio is still being received. (default: None)
187+
speech_end_timeout(float): seconds of silence after speech before Google closes the stream.
188+
The stream auto-reconnects if audio is still being received, reducing perceived
189+
latency for short utterances at the cost of a reconnect per turn. (default: None)
187190
endpointing_sensitivity(EndpointingSensitivity): controls the trade-off between latency
188191
and accuracy when detecting end-of-speech. Only supported with chirp_3.
189192
Options: ENDPOINTING_SENSITIVITY_STANDARD (default),
@@ -875,7 +878,11 @@ async def process_stream(
875878
if task != wait_reconnect_task:
876879
task.result()
877880
if wait_reconnect_task not in done:
878-
break
881+
# Google closed the stream server-side (e.g. speech_end_timeout fired).
882+
# Reconnect if the input channel is still open (more audio expected).
883+
if self._input_ch.closed:
884+
break
885+
logger.debug("Google STT stream closed by server, reconnecting...")
879886
self._reconnect_event.clear()
880887
finally:
881888
should_stop.set()

tests/test_plugin_google_stt.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,3 +421,91 @@ async def test_voice_activity_timeout_partial_update():
421421
stt.update_options(speech_end_timeout=5.0)
422422
assert stt._config.speech_start_timeout == 20.0
423423
assert stt._config.speech_end_timeout == 5.0
424+
425+
426+
async def test_server_closed_stream_reconnects_when_input_open():
427+
"""When Google closes the stream server-side (e.g. speech_end_timeout fired)
428+
but the input channel is still open, the stream should reconnect rather than stop."""
429+
import asyncio
430+
from unittest.mock import MagicMock
431+
432+
# Simulate: process_stream_task finishes (Google closed stream), but _input_ch is open.
433+
# The reconnect loop should trigger a reconnect (not break).
434+
435+
# Build a minimal mock that exercises the branch logic:
436+
# if wait_reconnect_task not in done:
437+
# if self._input_ch.closed: break
438+
# logger.debug(...)
439+
# self._reconnect_event.clear()
440+
441+
input_ch = MagicMock()
442+
input_ch.closed = False # channel still open → should reconnect
443+
444+
reconnect_event = asyncio.Event()
445+
446+
async def fake_process():
447+
# Immediately "finishes" simulating Google closing the stream
448+
pass
449+
450+
# We verify that when process_stream_task finishes first and _input_ch is NOT closed,
451+
# the code does NOT break (i.e., would loop again). We test this by checking
452+
# that after process_stream ends, input_ch.closed is checked.
453+
process_task = asyncio.create_task(fake_process())
454+
wait_reconnect_task = asyncio.create_task(reconnect_event.wait())
455+
456+
done, _ = await asyncio.wait(
457+
[process_task, wait_reconnect_task],
458+
return_when=asyncio.FIRST_COMPLETED,
459+
)
460+
461+
# process_task finished first (Google closed stream)
462+
assert process_task in done
463+
assert wait_reconnect_task not in done
464+
465+
# The fix: should_break = input_ch.closed (False → should NOT break → reconnect)
466+
should_break = input_ch.closed
467+
assert not should_break, "Stream should reconnect when input channel is still open"
468+
469+
# Cleanup
470+
wait_reconnect_task.cancel()
471+
try:
472+
await wait_reconnect_task
473+
except asyncio.CancelledError:
474+
pass
475+
476+
477+
async def test_server_closed_stream_stops_when_input_closed():
478+
"""When Google closes the stream and the input channel is also closed,
479+
the stream should stop (not reconnect)."""
480+
import asyncio
481+
from unittest.mock import MagicMock
482+
483+
input_ch = MagicMock()
484+
input_ch.closed = True # channel closed → should break/stop
485+
486+
reconnect_event = asyncio.Event()
487+
488+
async def fake_process():
489+
pass
490+
491+
process_task = asyncio.create_task(fake_process())
492+
wait_reconnect_task = asyncio.create_task(reconnect_event.wait())
493+
494+
done, _ = await asyncio.wait(
495+
[process_task, wait_reconnect_task],
496+
return_when=asyncio.FIRST_COMPLETED,
497+
)
498+
499+
assert process_task in done
500+
assert wait_reconnect_task not in done
501+
502+
# The fix: should_break = input_ch.closed (True → should break/stop)
503+
should_break = input_ch.closed
504+
assert should_break, "Stream should stop when input channel is closed"
505+
506+
# Cleanup
507+
wait_reconnect_task.cancel()
508+
try:
509+
await wait_reconnect_task
510+
except asyncio.CancelledError:
511+
pass

0 commit comments

Comments
 (0)