Skip to content

Commit 5e308c3

Browse files
committed
fix(azure-stt): surface canceled events and await startup
1 parent 024c629 commit 5e308c3

File tree

3 files changed

+269
-1
lines changed

3 files changed

+269
-1
lines changed

changelog/3884.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Fixed Azure STT silent failures by handling recognizer canceled and session lifecycle events and surfacing cancellation errors through `ErrorFrame` propagation.

src/pipecat/services/azure/stt.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,12 @@ async def start(self, frame: StartFrame):
209209
)
210210
self._speech_recognizer.recognizing.connect(self._on_handle_recognizing)
211211
self._speech_recognizer.recognized.connect(self._on_handle_recognized)
212-
self._speech_recognizer.start_continuous_recognition_async()
212+
self._speech_recognizer.canceled.connect(self._on_handle_canceled)
213+
self._speech_recognizer.session_started.connect(self._on_handle_session_started)
214+
self._speech_recognizer.session_stopped.connect(self._on_handle_session_stopped)
215+
216+
start_future = self._speech_recognizer.start_continuous_recognition_async()
217+
await self.get_event_loop().run_in_executor(None, start_future.get)
213218
except Exception as e:
214219
await self.push_error(
215220
error_msg=f"Uncaught exception during initialization: {e}", exception=e
@@ -280,3 +285,33 @@ def _on_handle_recognizing(self, event):
280285
result=event,
281286
)
282287
asyncio.run_coroutine_threadsafe(self.push_frame(frame), self.get_event_loop())
288+
289+
def _on_handle_canceled(self, event):
290+
details = getattr(event, "cancellation_details", None)
291+
reason = getattr(details, "reason", "UNKNOWN")
292+
code = getattr(details, "code", "UNKNOWN")
293+
error_details = getattr(details, "error_details", "")
294+
295+
logger.error(
296+
"Azure STT recognition canceled: reason={}, code={}, details={}",
297+
reason,
298+
code,
299+
error_details,
300+
)
301+
302+
error_message = f"Azure STT recognition canceled: {code} - {error_details}"
303+
asyncio.run_coroutine_threadsafe(
304+
self.push_error(error_msg=error_message), self.get_event_loop()
305+
)
306+
307+
def _on_handle_session_started(self, event):
308+
logger.info(
309+
"Azure STT session started: session_id={}",
310+
getattr(event, "session_id", "unknown"),
311+
)
312+
313+
def _on_handle_session_stopped(self, event):
314+
logger.warning(
315+
"Azure STT session stopped: session_id={}",
316+
getattr(event, "session_id", "unknown"),
317+
)

tests/test_azure_stt.py

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
#
2+
# Copyright (c) 2024-2026, Daily
3+
#
4+
# SPDX-License-Identifier: BSD 2-Clause License
5+
#
6+
7+
"""Unit tests for Azure STT service error and startup handling."""
8+
9+
import importlib.util
10+
import pathlib
11+
import sys
12+
import types
13+
from unittest.mock import AsyncMock, MagicMock, patch
14+
15+
import pytest
16+
17+
18+
def _install_websockets_stub() -> None:
19+
"""Install minimal websockets stubs required by STT service imports."""
20+
21+
websockets_module = sys.modules.setdefault("websockets", types.ModuleType("websockets"))
22+
protocol_module = sys.modules.setdefault(
23+
"websockets.protocol", types.ModuleType("websockets.protocol")
24+
)
25+
exceptions_module = sys.modules.setdefault(
26+
"websockets.exceptions", types.ModuleType("websockets.exceptions")
27+
)
28+
29+
class State:
30+
OPEN = "OPEN"
31+
CLOSED = "CLOSED"
32+
33+
class ConnectionClosedError(Exception):
34+
pass
35+
36+
class ConnectionClosedOK(Exception):
37+
pass
38+
39+
class WebSocketClientProtocol:
40+
pass
41+
42+
protocol_module.State = State
43+
exceptions_module.ConnectionClosedError = ConnectionClosedError
44+
exceptions_module.ConnectionClosedOK = ConnectionClosedOK
45+
websockets_module.protocol = protocol_module
46+
websockets_module.exceptions = exceptions_module
47+
websockets_module.WebSocketClientProtocol = WebSocketClientProtocol
48+
49+
50+
def _install_azure_speech_stub() -> None:
51+
"""Install minimal Azure Speech SDK stubs for tests.
52+
53+
The Azure STT module imports Azure SDK symbols at module import time.
54+
These stubs allow tests to import the module without the optional dependency.
55+
"""
56+
57+
azure_module = sys.modules.setdefault("azure", types.ModuleType("azure"))
58+
cognitiveservices_module = sys.modules.setdefault(
59+
"azure.cognitiveservices", types.ModuleType("azure.cognitiveservices")
60+
)
61+
speech_module = sys.modules.setdefault(
62+
"azure.cognitiveservices.speech", types.ModuleType("azure.cognitiveservices.speech")
63+
)
64+
audio_module = sys.modules.setdefault(
65+
"azure.cognitiveservices.speech.audio", types.ModuleType("azure.cognitiveservices.speech.audio")
66+
)
67+
dialog_module = sys.modules.setdefault(
68+
"azure.cognitiveservices.speech.dialog", types.ModuleType("azure.cognitiveservices.speech.dialog")
69+
)
70+
71+
class ResultReason:
72+
RecognizedSpeech = "RecognizedSpeech"
73+
RecognizingSpeech = "RecognizingSpeech"
74+
75+
class SpeechConfig:
76+
def __init__(self, **kwargs):
77+
self.kwargs = kwargs
78+
self.endpoint_id = None
79+
80+
class _EventHook:
81+
def connect(self, _handler):
82+
return None
83+
84+
class SpeechRecognizer:
85+
def __init__(self, speech_config=None, audio_config=None):
86+
self.speech_config = speech_config
87+
self.audio_config = audio_config
88+
self.recognizing = _EventHook()
89+
self.recognized = _EventHook()
90+
self.canceled = _EventHook()
91+
self.session_started = _EventHook()
92+
self.session_stopped = _EventHook()
93+
94+
def start_continuous_recognition_async(self):
95+
future = MagicMock()
96+
future.get = MagicMock()
97+
return future
98+
99+
def stop_continuous_recognition_async(self):
100+
return None
101+
102+
class AudioStreamFormat:
103+
def __init__(self, samples_per_second=None, channels=None):
104+
self.samples_per_second = samples_per_second
105+
self.channels = channels
106+
107+
class PushAudioInputStream:
108+
def __init__(self, _stream_format):
109+
pass
110+
111+
def write(self, _audio):
112+
return None
113+
114+
def close(self):
115+
return None
116+
117+
class AudioConfig:
118+
def __init__(self, stream=None):
119+
self.stream = stream
120+
121+
speech_module.ResultReason = ResultReason
122+
speech_module.SpeechConfig = SpeechConfig
123+
speech_module.SpeechRecognizer = SpeechRecognizer
124+
audio_module.AudioStreamFormat = AudioStreamFormat
125+
audio_module.PushAudioInputStream = PushAudioInputStream
126+
dialog_module.AudioConfig = AudioConfig
127+
128+
azure_module.cognitiveservices = cognitiveservices_module
129+
cognitiveservices_module.speech = speech_module
130+
speech_module.audio = audio_module
131+
speech_module.dialog = dialog_module
132+
133+
134+
_install_azure_speech_stub()
135+
_install_websockets_stub()
136+
137+
azure_package_module = types.ModuleType("pipecat.services.azure")
138+
azure_package_module.__path__ = []
139+
common_module = types.ModuleType("pipecat.services.azure.common")
140+
common_module.language_to_azure_language = lambda _language: "en-US"
141+
142+
sys.modules["pipecat.services.azure"] = azure_package_module
143+
sys.modules["pipecat.services.azure.common"] = common_module
144+
145+
stt_file = pathlib.Path(__file__).resolve().parents[1] / "src/pipecat/services/azure/stt.py"
146+
spec = importlib.util.spec_from_file_location("pipecat.services.azure.stt", stt_file)
147+
stt_module = importlib.util.module_from_spec(spec)
148+
sys.modules["pipecat.services.azure.stt"] = stt_module
149+
assert spec and spec.loader
150+
spec.loader.exec_module(stt_module)
151+
152+
from pipecat.frames.frames import StartFrame
153+
154+
155+
@pytest.mark.asyncio
156+
async def test_start_connects_all_handlers_and_waits_for_future():
157+
"""Verify startup wires event handlers and blocks for recognizer startup."""
158+
159+
service = stt_module.AzureSTTService(api_key="test-key", region="eastus")
160+
161+
loop = MagicMock()
162+
loop.run_in_executor = AsyncMock()
163+
service.get_event_loop = MagicMock(return_value=loop)
164+
165+
recognizer = MagicMock()
166+
recognizer.recognizing = MagicMock()
167+
recognizer.recognized = MagicMock()
168+
recognizer.canceled = MagicMock()
169+
recognizer.session_started = MagicMock()
170+
recognizer.session_stopped = MagicMock()
171+
172+
start_future = MagicMock()
173+
recognizer.start_continuous_recognition_async.return_value = start_future
174+
175+
with patch.object(stt_module.STTService, "start", new=AsyncMock()):
176+
with patch.object(stt_module, "PushAudioInputStream") as mock_stream_cls:
177+
with patch.object(stt_module, "SpeechRecognizer", return_value=recognizer):
178+
await service.start(StartFrame())
179+
180+
mock_stream_cls.assert_called_once()
181+
recognizer.recognizing.connect.assert_called_once_with(service._on_handle_recognizing)
182+
recognizer.recognized.connect.assert_called_once_with(service._on_handle_recognized)
183+
recognizer.canceled.connect.assert_called_once_with(service._on_handle_canceled)
184+
recognizer.session_started.connect.assert_called_once_with(service._on_handle_session_started)
185+
recognizer.session_stopped.connect.assert_called_once_with(service._on_handle_session_stopped)
186+
loop.run_in_executor.assert_awaited_once_with(None, start_future.get)
187+
188+
189+
def test_canceled_handler_pushes_error_with_details():
190+
"""Verify canceled events are surfaced through push_error."""
191+
192+
service = stt_module.AzureSTTService(api_key="test-key", region="eastus")
193+
service.push_error = AsyncMock()
194+
service.get_event_loop = MagicMock(return_value=MagicMock())
195+
196+
canceled_event = MagicMock()
197+
canceled_event.cancellation_details.reason = "Error"
198+
canceled_event.cancellation_details.code = "AuthenticationFailure"
199+
canceled_event.cancellation_details.error_details = "401 Unauthorized"
200+
201+
with patch("pipecat.services.azure.stt.asyncio.run_coroutine_threadsafe") as run_threadsafe:
202+
service._on_handle_canceled(canceled_event)
203+
204+
service.push_error.assert_called_once_with(
205+
error_msg="Azure STT recognition canceled: AuthenticationFailure - 401 Unauthorized"
206+
)
207+
assert run_threadsafe.call_count == 1
208+
209+
pending_coroutine = run_threadsafe.call_args.args[0]
210+
pending_coroutine.close()
211+
212+
213+
def test_canceled_handler_uses_safe_defaults_when_details_missing():
214+
"""Verify canceled handler does not crash on missing SDK fields."""
215+
216+
service = stt_module.AzureSTTService(api_key="test-key", region="eastus")
217+
service.push_error = AsyncMock()
218+
service.get_event_loop = MagicMock(return_value=MagicMock())
219+
220+
canceled_event = MagicMock()
221+
canceled_event.cancellation_details = None
222+
223+
with patch("pipecat.services.azure.stt.asyncio.run_coroutine_threadsafe") as run_threadsafe:
224+
service._on_handle_canceled(canceled_event)
225+
226+
service.push_error.assert_called_once_with(
227+
error_msg="Azure STT recognition canceled: UNKNOWN - "
228+
)
229+
assert run_threadsafe.call_count == 1
230+
231+
pending_coroutine = run_threadsafe.call_args.args[0]
232+
pending_coroutine.close()

0 commit comments

Comments
 (0)