Skip to content
Open
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d6c5611
feat: Add requests for supported entity types, version and localization
kennymc-c Jan 31, 2026
df90062
Working handle of websocket requests from ucapi
albaintor Feb 1, 2026
b4de43b
Replaced exception with error trace
albaintor Feb 1, 2026
a139fed
Handle exception
albaintor Feb 1, 2026
fe8f47a
Fixed stacktrace error, all good now
albaintor Feb 2, 2026
c0b802b
Fixed blocking websocket requests : create task for each message to h…
albaintor Feb 3, 2026
e6e7cba
Removed response signatures
albaintor Feb 3, 2026
d0eae3c
Removed response signatures
albaintor Feb 3, 2026
4a9e175
Check after supported entity types
albaintor Feb 3, 2026
110d147
Moved extraction of supported entity types in request for available e…
Feb 4, 2026
e80291d
Linting
Feb 4, 2026
7a3e8a7
Requested changes
albaintor Feb 12, 2026
5e1ab8c
Linting and removed comment
albaintor Feb 13, 2026
4a413fc
Linting flake8
albaintor Feb 13, 2026
0caacfb
Merge branch 'main' into websocket_requests
zehnm Feb 18, 2026
335b9a3
fix: merge from main
zehnm Feb 18, 2026
55b4c45
Added media browsing request
albaintor Mar 4, 2026
5140403
Added missing methods and commands
albaintor Mar 8, 2026
6d0a3a5
Fixed search media response type
albaintor Mar 11, 2026
c1c4d9b
Merged new methods
albaintor Mar 13, 2026
489a789
Merge branch 'websocket_requests' of https://github.com/albaintor/int…
albaintor Mar 13, 2026
22b01ee
Added clients extraction (hack). To be improved
albaintor Mar 13, 2026
a4bbcf6
Create task is necessary to avoid blocking
albaintor Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 197 additions & 3 deletions ucapi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,20 @@ def __init__(self, loop: AbstractEventLoop | None = None):
self._available_entities = Entities("available", self._loop)
self._configured_entities = Entities("configured", self._loop)

self._req_id = 1 # Request ID counter for outgoing requests

self._voice_handler: VoiceStreamHandler | None = None
self._voice_session_timeout: int = self.DEFAULT_VOICE_SESSION_TIMEOUT_S
# Active voice sessions
self._voice_sessions: dict[VoiceSessionKey, _VoiceSessionContext] = {}
# Enforce: at most one active session per entity_id (across all websockets)
self._voice_session_by_entity: dict[str, VoiceSessionKey] = {}

# One receiver per websocket (already in _handle_ws). Responses are dispatched to futures here.
self._ws_pending: dict[Any, dict[int, asyncio.Future]] = {}

self._supported_entity_types: list[str] | None = None

# Setup event loop
asyncio.set_event_loop(self._loop)

Expand Down Expand Up @@ -207,6 +214,8 @@ async def _start_web_socket_server(self, host: str, port: int) -> None:
async def _handle_ws(self, websocket) -> None:
try:
self._clients.add(websocket)
# Init per-websocket pending requests map + send lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosmetic: "send lock" comment no longer correct

self._ws_pending[websocket] = {}
_LOG.info("WS: Client added: %s", websocket.remote_address)

# authenticate on connection
Expand All @@ -218,10 +227,12 @@ async def _handle_ws(self, websocket) -> None:
# Distinguish between text (str) and binary (bytes-like) messages
if isinstance(message, str):
# JSON text message
await self._process_ws_message(websocket, message)
asyncio.create_task(self._process_ws_message(websocket, message))
elif isinstance(message, (bytes, bytearray, memoryview)):
# Binary message (protobuf in future)
await self._process_ws_binary_message(websocket, bytes(message))
asyncio.create_task(
self._process_ws_binary_message(websocket, bytes(message))
)
Comment on lines -223 to +238
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are now tasks required to process the received messages?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the result of our tests, let's take the following example :

  1. We (integration) receive a request from the remote
  2. While processing the request, we need to send a request back to the remote and awaiting its response
  3. In that case the main task is blocked because it is waiting for the step 1 to return

Concrete example that I encountered :

  1. Remote -> Integration : get available entities
  2. Integration -> remote : get supported entity types
  3. remote -> integration : supported entity types
  4. integration -> remote : available entity types (filtered with supported entity types)

Without tasks, step 2 is blocked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be related to websocket's behaviour with asyncio, that there's a missing await asyncio.sleep(0) call: https://websockets.readthedocs.io/en/stable/faq/asyncio.html
But that's pure speculation right now, I'll try to reproduce this and also look into the asyncio task solution if that won't introduce other side effects.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that await asyncio.sleep(0) is kind of a hack to make the event loop look after other awaiting stuff.
Creating task to handle response seems to be the clean way to unlock the receive (main) task. But it may introduce some overhead I don't know : tasks in asyncio are not threads but this implies pushing data in the event loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no hack in an async context, depending what the client is doing. It's well documented in the websocket library, if the client is doing synchronous operations.

I have to dig deeper in the websocket library documentation about the async message callback. It's very well possible that the _handle_ws callback is awaited inside the websocket library, which means it should not be delayed for too long. Then it's clear why it doesn't work sending another WS message and waiting for the response inside the callback. The callback has to give control back, otherwise it blocks the event loop inside the websocket library. Additional await asyncio.sleep(0) won't solve it.
What I'm afraid of is that simply using an asyncio task for every received message might introduce other side effects. This is a major change in runtime behaviour.

else:
_LOG.warning(
"[%s] WS: Unsupported message type %s",
Expand Down Expand Up @@ -261,7 +272,11 @@ async def _handle_ws(self, websocket) -> None:
key[1],
ex,
)

# Cancel all pending requests for this websocket (client disconnected)
pending = self._ws_pending.pop(websocket, {})
for _, fut in pending.items():
if not fut.done():
fut.set_exception(ConnectionError("WebSocket disconnected"))
self._clients.remove(websocket)
_LOG.info("[%s] WS: Client removed", websocket.remote_address)
self._events.emit(uc.Events.CLIENT_DISCONNECTED)
Expand Down Expand Up @@ -412,6 +427,102 @@ async def _process_ws_message(self, websocket, message) -> None:
await self._handle_ws_request_msg(websocket, msg, req_id, msg_data)
elif kind == "event":
await self._handle_ws_event_msg(msg, msg_data)
elif kind == "resp":
# Response to a previously sent request
# Some implementations use "req_id", others use "id"
resp_id = data.get("req_id", data.get("id"))
if resp_id is None:
_LOG.warning(
"[%s] WS: Received resp without req_id/id: %s",
websocket.remote_address,
message,
)
return

pending = self._ws_pending.get(websocket)
if not pending:
_LOG.debug(
"[%s] WS: No pending map for resp_id=%s (late resp?)",
websocket.remote_address,
resp_id,
)
return
fut = pending.get(int(resp_id))
if fut is None:
_LOG.debug(
"[%s] WS: Unmatched resp_id=%s (not pending). msg=%s",
websocket.remote_address,
resp_id,
msg,
)
return

if not fut.done():
fut.set_result(data)

async def _ws_request(
self,
websocket,
msg: str,
msg_data: dict[str, Any] | None = None,
*,
timeout: float = 10.0,
) -> dict[str, Any]:
"""
Send a request over websocket and await the matching response.

- Uses a Future stored in self._ws_pending[websocket][req_id]
- Reader task (_handle_ws -> _process_ws_message) completes the future on 'resp'
- Raises TimeoutError on timeout
:param websocket: client connection
:param msg: event message name
:param msg_data: message data payload
:param timeout: timeout for message
"""

# Ensure per-socket structures exist (in case you call before _handle_ws init)
if websocket not in self._ws_pending:
self._ws_pending[websocket] = {}

# Allocate req_id safely
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cosmetic: "safely" no longer relevant

req_id = self._req_id
self._req_id += 1

fut = self._loop.create_future()
self._ws_pending[websocket][req_id] = fut

try:
payload: dict[str, Any] = {"kind": "req", "id": req_id, "msg": msg}
if msg_data is not None:
payload["msg_data"] = msg_data

if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug(
"[%s] ->: %s",
websocket.remote_address,
filter_log_msg_data(payload),
)
# Serialize sends to avoid interleaving issues (optional but recommended)
await websocket.send(json.dumps(payload))

# Await response
resp = await asyncio.wait_for(fut, timeout=timeout)
return resp

except asyncio.TimeoutError as ex:
_LOG.error(
"[%s] Timeout waiting for response to %s (req_id=%s) %s",
websocket.remote_address,
msg,
req_id,
ex,
)
raise ex
finally:
# Cleanup pending future entry
pending = self._ws_pending.get(websocket)
if pending:
pending.pop(req_id, None)

async def _process_ws_binary_message(self, websocket, data: bytes) -> None:
"""Process a binary WebSocket message using protobuf IntegrationMessage.
Expand Down Expand Up @@ -677,6 +788,7 @@ async def _voice_session_timeout_task(self, key: VoiceSessionKey) -> None:
async def _handle_ws_request_msg(
self, websocket, msg: str, req_id: int, msg_data: dict[str, Any] | None
) -> None:
# pylint: disable=R0912
if msg == uc.WsMessages.GET_DRIVER_VERSION:
await self._send_ws_response(
websocket,
Expand All @@ -693,6 +805,15 @@ async def _handle_ws_request_msg(
)
elif msg == uc.WsMessages.GET_AVAILABLE_ENTITIES:
available_entities = self._available_entities.get_all()
if self._supported_entity_types is None:
# Request supported entity types from remote
await self._update_supported_entity_types(websocket)
if self._supported_entity_types:
available_entities = [
entity
for entity in available_entities
if entity.get("entity_type") in self._supported_entity_types
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main issue: sending another WS message inside the message callback from the websocket library, and waiting for a response.

It also looks hackish: why request the supported entity types here?
If entity types need to be restricted, it should already be known at this time.
If there's no way around it, then this would be the place to create an asyncio task to not block the websocket event loops. It's a much safer approach than putting every received message into a task.

await self._send_ws_response(
websocket,
req_id,
Expand Down Expand Up @@ -1156,6 +1277,79 @@ def remove_all_listeners(self, event: uc.Events | None) -> None:
"""
self._events.remove_all_listeners(event)

async def get_supported_entity_types(
self, websocket, *, timeout: float = 5.0
) -> list[str]:
Comment on lines +1426 to +1428
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From where does an integration driver get the websocket handle from? Same for the other new get_ methods below.
Accessing api._clients is not an option! Internal fields might change at any time.

An obvious option could be enhancing the emitted Events with a websocket parameter that a client could use if interested. That would even allow tracking multiple Remote connections in an external integration. Existing integrations would not be affected (as far as I understand the event emitting / Python parameter handling).
But I need to think about that a bit more.

"""Request supported entity types from client and return msg_data."""
resp = await self._ws_request(
websocket,
"get_supported_entity_types",
timeout=timeout,
)
if resp.get("msg") != "supported_entity_types":
_LOG.debug(
"[%s] Unexpected resp msg for get_supported_entity_types: %s",
websocket.remote_address,
resp.get("msg"),
)
return resp.get("msg_data", [])

async def _update_supported_entity_types(
self, websocket, *, timeout: float = 5.0
) -> None:
"""Update supported entity types by remote."""
await asyncio.sleep(0)
try:
self._supported_entity_types = await self.get_supported_entity_types(
websocket, timeout=timeout
)
_LOG.debug(
"[%s] Supported entity types %s",
websocket.remote_address,
self._supported_entity_types,
)
except Exception as ex: # pylint: disable=W0718
_LOG.error(
"[%s] Unable to retrieve entity types %s",
websocket.remote_address,
ex,
)

async def get_version(self, websocket, *, timeout: float = 5.0) -> dict[str, Any]:
"""Request client version and return msg_data."""
resp = await self._ws_request(
websocket,
"get_version",
timeout=timeout,
)
if resp.get("msg") != "version":
_LOG.debug(
"[%s] Unexpected resp msg for get_version: %s",
websocket.remote_address,
resp.get("msg"),
)

return resp.get("msg_data")

async def get_localization_cfg(
self, websocket, *, timeout: float = 5.0
) -> dict[str, Any]:
"""Request localization config and return msg_data."""
resp = await self._ws_request(
websocket,
"get_localization_cfg",
timeout=timeout,
)

if resp.get("msg") != "localization_cfg":
_LOG.debug(
"[%s] Unexpected resp msg for get_localization_cfg: %s",
websocket.remote_address,
resp.get("msg"),
)

return resp.get("msg_data")

##############
# Properties #
##############
Expand Down