Skip to content

Commit 86db63d

Browse files
committed
voice session timeout handling
1 parent 8738c8f commit 86db63d

File tree

1 file changed

+91
-8
lines changed

1 file changed

+91
-8
lines changed

ucapi/api.py

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ def __init__(self, loop: AbstractEventLoop):
9090
self._voice_ws_sessions: dict[Any, set[int]] = {}
9191
# Owner mapping: session_id -> websocket to facilitate cleanup
9292
self._voice_session_owner: dict[int, Any] = {}
93+
# Voice session timeout management (seconds)
94+
self._voice_session_timeout: int = 30
95+
self._voice_session_timeouts: dict[int, asyncio.Task] = {}
96+
# Track handler tasks per session (to avoid double-start and for observability)
97+
self._voice_handler_tasks: dict[int, asyncio.Task] = {}
9398

9499
# Setup event loop
95100
asyncio.set_event_loop(self._loop)
@@ -233,9 +238,14 @@ async def _handle_ws(self, websocket) -> None:
233238
except Exception: # pylint: disable=W0718
234239
session_ids = set()
235240
for sid in list(session_ids):
236-
session = self._voice_sessions.pop(sid, None)
237-
if session:
238-
session.end()
241+
try:
242+
await self._cleanup_voice_session(sid)
243+
except Exception: # pylint: disable=W0718
244+
_LOG.exception(
245+
"[%s] WS: Error during voice session cleanup for %s",
246+
websocket.remote_address,
247+
sid,
248+
)
239249

240250
self._clients.remove(websocket)
241251
_LOG.info("[%s] WS: Client removed", websocket.remote_address)
@@ -486,7 +496,8 @@ async def _on_remote_voice_begin(self, websocket, msg: RemoteVoiceBegin) -> None
486496
)
487497

488498
# Invoke handler in the background so the WS loop is not blocked
489-
self._loop.create_task(self._run_voice_handler(session))
499+
task = self._loop.create_task(self._run_voice_handler(session))
500+
self._voice_handler_tasks[session.session_id] = task
490501

491502
async def _on_remote_voice_data(self, websocket, msg: RemoteVoiceData) -> None:
492503
"""Handle a RemoteVoiceData protobuf message.
@@ -530,22 +541,41 @@ async def _on_remote_voice_end(self, _websocket, msg: RemoteVoiceEnd) -> None:
530541
return
531542
session_id = int(getattr(msg, "session_id", 0) or 0)
532543
session_id = 0 # FIXME(voice) until core is fixed
544+
await self._cleanup_voice_session(session_id)
545+
546+
async def _cleanup_voice_session(self, session_id: int) -> None:
547+
"""Cleanup internal state for a voice session.
548+
549+
- Cancel and remove any pending timeout task for the session.
550+
- End the session iterator if still open.
551+
- Remove bookkeeping: _voice_sessions, _voice_session_owner, _voice_ws_sessions.
552+
- Forget handler task reference (do not cancel it; handler should exit on end()).
553+
"""
554+
# Cancel timeout task if present
555+
t = self._voice_session_timeouts.pop(session_id, None)
556+
if t is not None and not t.done():
557+
t.cancel()
558+
559+
# End and remove session
533560
session = self._voice_sessions.pop(session_id, None)
534-
if session:
561+
if session is not None and not session.closed:
535562
session.end()
536-
# Cleanup ownership mappings
563+
564+
# Remove ownership mappings
537565
try:
538566
owner_ws = self._voice_session_owner.pop(session_id, None)
539567
if owner_ws is not None:
540568
owners = self._voice_ws_sessions.get(owner_ws)
541569
if owners is not None:
542570
owners.discard(session_id)
543571
if not owners:
544-
# Keep dict tidy
545572
self._voice_ws_sessions.pop(owner_ws, None)
546573
except Exception: # pylint: disable=W0718
547574
pass
548575

576+
# Drop handler task reference (don't cancel; allow graceful finish)
577+
self._voice_handler_tasks.pop(session_id, None)
578+
549579
def set_voice_stream_handler(self, handler: VoiceStreamHandler | None) -> None:
550580
"""Register or clear the voice stream handler.
551581
@@ -591,7 +621,58 @@ async def _run_voice_handler(self, session: VoiceSession) -> None:
591621
# Ensure iterator is unblocked and session is cleaned up
592622
if not session.closed:
593623
session.end()
594-
self._voice_sessions.pop(session.session_id, None)
624+
await self._cleanup_voice_session(session.session_id)
625+
626+
def _schedule_voice_timeout(self, session_id: int) -> None:
627+
"""Schedule the timeout task for a voice session.
628+
629+
Starts counting immediately at creation time. When the timeout expires and the
630+
session is still active, the handler is notified (invoked) if not already
631+
started, the session is ended, and cleanup is performed.
632+
"""
633+
# Cancel pre-existing task if any (defensive)
634+
existing = self._voice_session_timeouts.pop(session_id, None)
635+
if existing is not None and not existing.done():
636+
existing.cancel()
637+
638+
task = self._loop.create_task(self._voice_session_timeout_task(session_id))
639+
self._voice_session_timeouts[session_id] = task
640+
641+
async def _voice_session_timeout_task(self, session_id: int) -> None:
642+
"""Timeout watchdog for a voice session."""
643+
try:
644+
await asyncio.sleep(self._voice_session_timeout)
645+
except asyncio.CancelledError:
646+
return
647+
648+
# If still active after timeout; take action
649+
session = self._voice_sessions.get(session_id)
650+
if session is None:
651+
return
652+
653+
_LOG.warning(
654+
"Voice session %s timed out after %ss",
655+
session_id,
656+
self._voice_session_timeout,
657+
)
658+
659+
# If handler not started yet (e.g., no VoiceBegin received), notify it now
660+
if (
661+
session_id not in self._voice_handler_tasks
662+
and self._voice_handler is not None
663+
):
664+
try:
665+
task = self._loop.create_task(self._run_voice_handler(session))
666+
self._voice_handler_tasks[session_id] = task
667+
except Exception: # pylint: disable=W0718
668+
_LOG.exception(
669+
"Failed to start voice handler on timeout for session %s",
670+
session_id,
671+
)
672+
673+
# End and cleanup
674+
session.end()
675+
await self._cleanup_voice_session(session_id)
595676

596677
async def _handle_ws_request_msg(
597678
self, websocket, msg: str, req_id: int, msg_data: dict[str, Any] | None
@@ -792,6 +873,8 @@ async def _entity_command(
792873

793874
session = VoiceSession(session_id, entity_id, audio_cfg, loop=self._loop)
794875
self._voice_sessions[session_id] = session
876+
# Start timeout immediately on session creation
877+
self._schedule_voice_timeout(session_id)
795878

796879
result = await entity.command(
797880
cmd_id, msg_data["params"] if "params" in msg_data else None

0 commit comments

Comments
 (0)