Skip to content

Commit a449a09

Browse files
committed
signal voice session termination reason to client
- For abnormal terminations (timeout, remote abort, local abort, error), raise specific exceptions from the iterator. - expose metadata about the end (reason, error) via properties for post- loop inspection. - Exception hierarchy: - `VoiceSessionClosed(Exception)` base with optional fields (reason, error). - `VoiceSessionTimeout(VoiceSessionClosed)` - `VoiceSessionRemoteEnd(VoiceSessionClosed)` - `VoiceSessionLocalEnd(VoiceSessionClosed)` - `VoiceSessionError(VoiceSessionClosed)` - End reason type: - `VoiceEndReason` Enum: `NORMAL`, `TIMEOUT`, `REMOTE`, `LOCAL`, `ERROR`
1 parent 86db63d commit a449a09

File tree

3 files changed

+154
-48
lines changed

3 files changed

+154
-48
lines changed

examples/voice.py

Lines changed: 56 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@
66
from typing import Any
77

88
import ucapi
9-
from ucapi import AssistantEvent, AssistantEventType, VoiceAssistant
9+
from ucapi import (
10+
AssistantError,
11+
AssistantErrorCode,
12+
AssistantEvent,
13+
AssistantEventType,
14+
VoiceAssistant,
15+
)
1016
from ucapi.api_definitions import AssistantSttResponse, AssistantTextResponse
1117
from ucapi.voice_assistant import Attributes as VAAttr
1218
from ucapi.voice_assistant import (
@@ -18,6 +24,7 @@
1824
SampleFormat,
1925
VoiceAssistantEntityOptions,
2026
)
27+
from ucapi.voice_stream import VoiceEndReason, VoiceSessionClosed
2128

2229
loop = asyncio.new_event_loop()
2330
api = ucapi.IntegrationAPI(loop)
@@ -83,34 +90,57 @@ async def on_voice_session(session):
8390
global session_id
8491

8592
total = 0
86-
async for frame in session: # frame is bytes
87-
total += len(frame)
88-
# feed frame into your voice assistant / LLM here
89-
print(f"Got {len(frame)} bytes of audio data")
90-
print(f"Voice stream ended: session={session.session_id}, bytes={total}")
93+
try:
94+
async for frame in session: # frame is bytes
95+
total += len(frame)
96+
# feed frame into your voice assistant / LLM here
97+
print(f"Got {len(frame)} bytes of audio data")
98+
print(f"Voice stream ended: session={session.session_id}, bytes={total}")
99+
100+
event = AssistantEvent(
101+
type=AssistantEventType.STT_RESPONSE,
102+
entity_id=session.entity_id,
103+
session_id=session_id,
104+
data=AssistantSttResponse(
105+
text="I'm just a demo and I don't know what you said."
106+
),
107+
)
108+
await api.broadcast_assistant_event(event)
91109

92-
event = AssistantEvent(
93-
type=AssistantEventType.STT_RESPONSE,
94-
entity_id=session.entity_id,
95-
session_id=session_id,
96-
data=AssistantSttResponse(
97-
text="I'm just a demo and I don't know what you said."
98-
),
99-
)
100-
await api.broadcast_assistant_event(event)
110+
await sleep(1)
111+
event = AssistantEvent(
112+
type=AssistantEventType.TEXT_RESPONSE,
113+
entity_id=session.entity_id,
114+
session_id=session_id,
115+
data=AssistantTextResponse(
116+
success=True, text=f"You have sent {total} bytes of audio data"
117+
),
118+
)
119+
await api.broadcast_assistant_event(event)
101120

102-
await sleep(1)
103-
event = AssistantEvent(
104-
type=AssistantEventType.TEXT_RESPONSE,
105-
entity_id=session.entity_id,
106-
session_id=session_id,
107-
data=AssistantTextResponse(
108-
success=True, text=f"You have sent {total} bytes of audio data"
109-
),
110-
)
111-
await api.broadcast_assistant_event(event)
121+
await sleep(1)
122+
except VoiceSessionClosed as ex:
123+
print(
124+
f"Voice stream session {session.session_id} closed (bytes={total})! Reason: {ex.reason}, exception: {ex.error}"
125+
)
126+
if ex.reason == VoiceEndReason.REMOTE:
127+
return # Remote disconnected
128+
event = AssistantEvent(
129+
type=AssistantEventType.ERROR,
130+
entity_id=session.entity_id,
131+
session_id=session_id,
132+
data=AssistantError(
133+
code=(
134+
AssistantErrorCode.TIMEOUT
135+
if ex.reason == VoiceEndReason.TIMEOUT
136+
else AssistantErrorCode.UNEXPECTED_ERROR
137+
),
138+
message=f"Reason: {ex.reason}, exception: {ex.error}",
139+
),
140+
)
141+
await api.broadcast_assistant_event(event)
112142

113-
await sleep(1)
143+
# final event
114144
event = AssistantEvent(
115145
type=AssistantEventType.FINISHED,
116146
entity_id=session.entity_id,

ucapi/api.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
AudioConfiguration,
4747
)
4848
from .voice_assistant import Commands as VaCommands
49-
from .voice_stream import VoiceSession, VoiceStreamHandler
49+
from .voice_stream import VoiceEndReason, VoiceSession, VoiceStreamHandler
5050

5151
try:
5252
from ._version import version as __version__
@@ -559,7 +559,7 @@ async def _cleanup_voice_session(self, session_id: int) -> None:
559559
# End and remove session
560560
session = self._voice_sessions.pop(session_id, None)
561561
if session is not None and not session.closed:
562-
session.end()
562+
session.end() # normal close
563563

564564
# Remove ownership mappings
565565
try:
@@ -609,18 +609,20 @@ async def _run_voice_handler(self, session: VoiceSession) -> None:
609609
handler = self._voice_handler
610610
if handler is None:
611611
# Handler cleared after session start; just end the session
612-
session.end()
612+
session.end(VoiceEndReason.LOCAL)
613613
return
614614
try:
615615
result = handler(session)
616616
if asyncio.iscoroutine(result):
617617
await result
618-
except Exception: # pylint: disable=W0718
618+
if not session.closed:
619+
session.end()
620+
except Exception as ex: # pylint: disable=W0718
619621
_LOG.exception("Voice handler failed for session %s", session.session_id)
622+
if not session.closed:
623+
session.end(VoiceEndReason.ERROR, ex)
620624
finally:
621625
# Ensure iterator is unblocked and session is cleaned up
622-
if not session.closed:
623-
session.end()
624626
await self._cleanup_voice_session(session.session_id)
625627

626628
def _schedule_voice_timeout(self, session_id: int) -> None:
@@ -671,7 +673,7 @@ async def _voice_session_timeout_task(self, session_id: int) -> None:
671673
)
672674

673675
# End and cleanup
674-
session.end()
676+
session.end(VoiceEndReason.TIMEOUT)
675677
await self._cleanup_voice_session(session_id)
676678

677679
async def _handle_ws_request_msg(

ucapi/voice_stream.py

Lines changed: 89 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import asyncio
1414
import logging
1515
from asyncio import AbstractEventLoop
16+
from enum import Enum, auto
1617
from typing import AsyncIterator, Awaitable, Callable, Optional
1718

1819
from ucapi.voice_assistant import AudioConfiguration
@@ -28,6 +29,53 @@
2829
"""
2930

3031

32+
class VoiceEndReason(Enum):
33+
"""Reasons for ending a voice session."""
34+
35+
NORMAL = auto()
36+
"""Normal session end."""
37+
TIMEOUT = auto()
38+
"""Closed due to timeout."""
39+
REMOTE = auto()
40+
"""Closed remotely, for example by the client disconnecting."""
41+
LOCAL = auto()
42+
"""Closed locally in integration, for example by a new session request."""
43+
ERROR = auto()
44+
"""Closed due to an error."""
45+
46+
47+
class VoiceSessionClosed(Exception):
48+
"""Base class for voice session-related exceptions."""
49+
50+
def __init__(self, reason: VoiceEndReason, error: Exception | None = None):
51+
"""Create a voice session-related exception instance."""
52+
super().__init__(str(reason))
53+
self.reason = reason
54+
self.error = error
55+
56+
57+
class VoiceSessionTimeout(VoiceSessionClosed):
58+
"""Raised when a voice session times out."""
59+
60+
61+
class VoiceSessionRemoteEnd(VoiceSessionClosed):
62+
"""Raised when a voice session ends remotely."""
63+
64+
65+
class VoiceSessionLocalEnd(VoiceSessionClosed):
66+
"""Raised when a voice session ends locally."""
67+
68+
69+
class VoiceSessionError(VoiceSessionClosed):
70+
"""Raised when a voice session ends due to an error."""
71+
72+
73+
class _EndSignal:
74+
def __init__(self, reason: VoiceEndReason, error: Exception | None = None):
75+
self.reason = reason
76+
self.error = error
77+
78+
3179
class VoiceSession:
3280
"""Represents a single remote voice capture session.
3381
@@ -50,9 +98,11 @@ def __init__(
5098
self.entity_id = entity_id
5199
self.config = config
52100
self._loop = loop
53-
self._q: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=max_queue)
101+
self._q: asyncio.Queue[bytes | _EndSignal] = asyncio.Queue(maxsize=max_queue)
54102
self._closed = False
55103
self._drops_logged = 0
104+
self.ended_by: VoiceEndReason | None = None
105+
self.end_error: Exception | None = None
56106

57107
def __aiter__(self) -> AsyncIterator[bytes]:
58108
"""Return an async iterator over audio frames."""
@@ -68,13 +118,27 @@ async def frames(self) -> AsyncIterator[bytes]:
68118
69119
:yield: Bytes : The next frame of data retrieved from the queue.
70120
:rtype: AsyncIterator[bytes]
121+
:raises VoiceSessionTimeout: If the session times out.
122+
:raises VoiceSessionRemoteEnd: If the session ends remotely.
123+
:raises VoiceSessionLocalEnd: If the session ends locally.
124+
:raises VoiceSessionError: If the session ends due to an error.
71125
"""
72126
while True:
73127
item = await self._q.get()
74-
if item is None:
128+
if isinstance(item, _EndSignal):
75129
self._closed = True
76-
return
77-
yield item
130+
self.ended_by = item.reason
131+
self.end_error = item.error
132+
if item.reason is VoiceEndReason.NORMAL:
133+
return # StopAsyncIteration
134+
if item.reason is VoiceEndReason.TIMEOUT:
135+
raise VoiceSessionTimeout(item.reason)
136+
if item.reason is VoiceEndReason.REMOTE:
137+
raise VoiceSessionRemoteEnd(item.reason)
138+
if item.reason is VoiceEndReason.LOCAL:
139+
raise VoiceSessionLocalEnd(item.reason)
140+
raise VoiceSessionError(item.reason, item.error)
141+
yield item # bytes
78142

79143
def feed(self, chunk: bytes) -> None:
80144
"""Feed an audio chunk into the session.
@@ -85,6 +149,8 @@ def feed(self, chunk: bytes) -> None:
85149
:param chunk: Audio data bytes.
86150
"""
87151
try:
152+
if self._closed:
153+
return
88154
self._q.put_nowait(chunk)
89155
except asyncio.QueueFull:
90156
# Drop newest if consumer lags; throttle debug logging.
@@ -95,23 +161,31 @@ def feed(self, chunk: bytes) -> None:
95161
)
96162
self._drops_logged += 1
97163

98-
def end(self) -> None:
164+
def end(
165+
self,
166+
reason: VoiceEndReason = VoiceEndReason.NORMAL,
167+
error: Exception | None = None,
168+
) -> None:
99169
"""
100170
Signal the end of the voice session.
101171
102-
Enqueues a sentinel (None) to stop the consumer iterator.
172+
Enqueues a sentinel (_EndSignal) to stop the consumer iterator.
103173
If the queue is full, attempts to make space for the sentinel.
104174
"""
105-
if not self._closed:
175+
if self._closed:
176+
return
177+
try:
178+
self._q.put_nowait(_EndSignal(reason, error))
179+
except asyncio.QueueFull:
180+
# Clear one and try to enqueue sentinel
106181
try:
107-
self._q.put_nowait(None)
108-
except asyncio.QueueFull:
109-
# Clear one and try to enqueue sentinel
110-
try:
111-
_ = self._q.get_nowait()
112-
self._q.put_nowait(None)
113-
except Exception: # pylint: disable=W0718
114-
self._closed = True
182+
_ = self._q.get_nowait()
183+
self._q.put_nowait(_EndSignal(reason, error))
184+
except Exception: # pylint: disable=W0718
185+
# _should_ not happen, but just in case
186+
pass
187+
finally:
188+
self._closed = True
115189

116190
@property
117191
def closed(self) -> bool:

0 commit comments

Comments
 (0)