Skip to content

Commit a524032

Browse files
committed
Fixes
1 parent 98ecaa9 commit a524032

File tree

4 files changed

+145
-65
lines changed

4 files changed

+145
-65
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,12 @@ The participant who implements the method and will receive its calls must first
141141

142142
```python
143143
@room.local_participant.rpc_method("greet")
144-
async def handle_greet(request_id: str, caller_identity: str, payload: str, response_timeout_ms: int):
144+
async def handle_greet(request_id: str, caller_identity: str, payload: str, response_timeout: float):
145145
print(f"Received greeting from {caller_identity}: {payload}")
146146
return f"Hello, {caller_identity}!"
147147
```
148148

149-
In addition to the payload, your handler will also receive `response_timeout_ms`, which informs you the maximum time available to return a response. If you are unable to respond in time, the call will result in an error on the caller's side.
149+
In addition to the payload, your handler will also receive `response_timeout`, which informs you the maximum time available to return a response. If you are unable to respond in time, the call will result in an error on the caller's side.
150150

151151
#### Performing an RPC request
152152

@@ -164,7 +164,7 @@ except Exception as e:
164164
print(f"RPC call failed: {e}")
165165
```
166166

167-
You may find it useful to adjust the `response_timeout_ms` parameter, which indicates the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application.
167+
You may find it useful to adjust the `response_timeout` parameter, which indicates the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application.
168168

169169
#### Errors
170170

examples/rpc.py

Lines changed: 109 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import asyncio
55
from dotenv import load_dotenv
6+
import time
67

78
load_dotenv(dotenv_path=".env.local", override=False)
89
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
@@ -15,44 +16,74 @@
1516

1617

1718
async def main():
18-
room_name = f"rpc-test-{os.urandom(4).hex()}"
19-
20-
print(f"Connecting participants to room: {room_name}")
21-
22-
callers_room, greeters_room, math_genius_room = await asyncio.gather(
23-
connect_participant("caller", room_name),
24-
connect_participant("greeter", room_name),
25-
connect_participant("math-genius", room_name),
26-
)
27-
28-
register_receiver_methods(greeters_room, math_genius_room)
29-
19+
rooms = [] # Keep track of all rooms for cleanup
3020
try:
31-
print("\n\nRunning greeting example...")
32-
await asyncio.gather(perform_greeting(callers_room))
33-
except Exception as error:
34-
print("Error:", error)
21+
room_name = f"rpc-test-{os.urandom(4).hex()}"
22+
print(f"Connecting participants to room: {room_name}")
3523

36-
try:
37-
print("\n\nRunning error handling example...")
38-
await perform_divide(callers_room)
39-
except Exception as error:
40-
print("Error:", error)
41-
42-
try:
43-
print("\n\nRunning math example...")
44-
await perform_square_root(callers_room)
45-
await asyncio.sleep(2)
46-
await perform_quantum_hypergeometric_series(callers_room)
47-
except Exception as error:
48-
print("Error:", error)
49-
50-
print("\n\nParticipants done, disconnecting...")
51-
await callers_room.disconnect()
52-
await greeters_room.disconnect()
53-
await math_genius_room.disconnect()
54-
55-
print("Participants disconnected. Example completed.")
24+
callers_room, greeters_room, math_genius_room = await asyncio.gather(
25+
connect_participant("caller", room_name),
26+
connect_participant("greeter", room_name),
27+
connect_participant("math-genius", room_name),
28+
)
29+
rooms = [callers_room, greeters_room, math_genius_room]
30+
31+
register_receiver_methods(greeters_room, math_genius_room)
32+
33+
try:
34+
print("\n\nRunning greeting example...")
35+
await asyncio.gather(perform_greeting(callers_room))
36+
except Exception as error:
37+
print("Error:", error)
38+
39+
try:
40+
print("\n\nRunning error handling example...")
41+
await perform_divide(callers_room)
42+
except Exception as error:
43+
print("Error:", error)
44+
45+
try:
46+
print("\n\nRunning math example...")
47+
await perform_square_root(callers_room)
48+
await asyncio.sleep(2)
49+
await perform_quantum_hypergeometric_series(callers_room)
50+
except Exception as error:
51+
print("Error:", error)
52+
53+
try:
54+
print("\n\nRunning long calculation with timeout...")
55+
await asyncio.create_task(perform_long_calculation(callers_room))
56+
except Exception as error:
57+
print("Error:", error)
58+
59+
try:
60+
print("\n\nRunning long calculation with disconnect...")
61+
# Start the long calculation
62+
long_calc_task = asyncio.create_task(perform_long_calculation(callers_room))
63+
# Wait a bit then disconnect the math genius
64+
await asyncio.sleep(5)
65+
print("\nDisconnecting math genius early...")
66+
await math_genius_room.disconnect()
67+
# Wait for the calculation to fail
68+
await long_calc_task
69+
except Exception as error:
70+
print("Error:", error)
71+
72+
print("\n\nParticipants done, disconnecting remaining participants...")
73+
await callers_room.disconnect()
74+
await greeters_room.disconnect()
75+
76+
print("Participants disconnected. Example completed.")
77+
78+
except KeyboardInterrupt:
79+
print("\nReceived interrupt signal, cleaning up...")
80+
except Exception as e:
81+
print(f"Unexpected error: {e}")
82+
finally:
83+
# Clean up all rooms
84+
print("Disconnecting all participants...")
85+
await asyncio.gather(*(room.disconnect() for room in rooms), return_exceptions=True)
86+
print("Cleanup complete")
5687

5788

5889
def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room):
@@ -61,7 +92,7 @@ async def arrival_method(
6192
request_id: str,
6293
caller_identity: str,
6394
payload: str,
64-
response_timeout_ms: int,
95+
response_timeout: float,
6596
):
6697
print(f'[Greeter] Oh {caller_identity} arrived and said "{payload}"')
6798
await asyncio.sleep(2)
@@ -72,12 +103,12 @@ async def square_root_method(
72103
request_id: str,
73104
caller_identity: str,
74105
payload: str,
75-
response_timeout_ms: int,
106+
response_timeout: float,
76107
):
77108
json_data = json.loads(payload)
78109
number = json_data["number"]
79110
print(
80-
f"[Math Genius] I guess {caller_identity} wants the square root of {number}. I've only got {response_timeout_ms / 1000} seconds to respond but I think I can pull it off."
111+
f"[Math Genius] I guess {caller_identity} wants the square root of {number}. I've only got {response_timeout} seconds to respond but I think I can pull it off."
81112
)
82113

83114
print("[Math Genius] *doing math*…")
@@ -92,7 +123,7 @@ def divide_method(
92123
request_id: str,
93124
caller_identity: str,
94125
payload: str,
95-
response_timeout_ms: int,
126+
response_timeout: float,
96127
):
97128
json_data = json.loads(payload)
98129
dividend = json_data["dividend"]
@@ -104,6 +135,18 @@ def divide_method(
104135
result = dividend / divisor
105136
return json.dumps({"result": result})
106137

138+
@math_genius_room.local_participant.rpc_method("long-calculation")
139+
async def long_calculation_method(
140+
request_id: str,
141+
caller_identity: str,
142+
payload: str,
143+
response_timeout: float,
144+
):
145+
print(f"[Math Genius] Starting a very long calculation for {caller_identity}")
146+
print(f"[Math Genius] This will take 30 seconds even though you're only giving me {response_timeout} seconds")
147+
await asyncio.sleep(30)
148+
return json.dumps({"result": "Calculation complete!"})
149+
107150

108151
async def perform_greeting(room: rtc.Room):
109152
print("[Caller] Letting the greeter know that I've arrived")
@@ -168,6 +211,28 @@ async def perform_divide(room: rtc.Room):
168211
print(f"[Caller] RPC call failed with unexpected error: {error}")
169212

170213

214+
async def perform_long_calculation(room: rtc.Room):
215+
print("[Caller] Giving the math genius 10s to complete a long calculation")
216+
try:
217+
response = await room.local_participant.perform_rpc(
218+
"math-genius",
219+
"long-calculation",
220+
json.dumps({}),
221+
response_timeout=10
222+
)
223+
parsed_response = json.loads(response)
224+
print(f"[Caller] Result: {parsed_response['result']}")
225+
except rtc.RpcError as error:
226+
if error.code == rtc.RpcError.ErrorCode.RESPONSE_TIMEOUT:
227+
print("[Caller] Math genius took too long to respond")
228+
elif error.code == rtc.RpcError.ErrorCode.RECIPIENT_DISCONNECTED:
229+
print("[Caller] Math genius disconnected before response was received")
230+
else:
231+
print(f"[Caller] Unexpected RPC error: {error}")
232+
except Exception as error:
233+
print(f"[Caller] Unexpected error: {error}")
234+
235+
171236
def create_token(identity: str, room_name: str):
172237
token = (
173238
api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
@@ -216,4 +281,7 @@ def _on_participant_connected(participant: rtc.RemoteParticipant):
216281

217282

218283
if __name__ == "__main__":
219-
asyncio.run(main())
284+
try:
285+
asyncio.run(main())
286+
except KeyboardInterrupt:
287+
print("\nProgram terminated by user")

livekit-rtc/livekit/rtc/participant.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ async def perform_rpc(
248248
destination_identity: str,
249249
method: str,
250250
payload: str,
251-
response_timeout_ms: Optional[int] = None,
251+
response_timeout: Optional[float] = None,
252252
) -> str:
253253
"""
254254
Initiate an RPC call to a remote participant.
@@ -257,7 +257,7 @@ async def perform_rpc(
257257
destination_identity (str): The `identity` of the destination participant
258258
method (str): The method name to call
259259
payload (str): The method payload
260-
response_timeout_ms (Optional[int]): Timeout for receiving a response after initial connection
260+
response_timeout (Optional[float]): Timeout for receiving a response after initial connection
261261
262262
Returns:
263263
str: The response payload
@@ -270,8 +270,8 @@ async def perform_rpc(
270270
req.perform_rpc.destination_identity = destination_identity
271271
req.perform_rpc.method = method
272272
req.perform_rpc.payload = payload
273-
if response_timeout_ms is not None:
274-
req.perform_rpc.response_timeout_ms = response_timeout_ms
273+
if response_timeout is not None:
274+
req.perform_rpc.response_timeout_ms = int(response_timeout * 1000)
275275

276276
queue = FfiClient.instance.queue.subscribe()
277277
try:
@@ -307,7 +307,7 @@ def register_rpc_method(
307307
RpcError: On failure. Details in `message`.
308308
309309
Example:
310-
async def greet_handler(request_id: str, caller_identity: str, payload: str, response_timeout_ms: int) -> str:
310+
async def greet_handler(request_id: str, caller_identity: str, payload: str, response_timeout: float) -> str:
311311
print(f"Received greeting from {caller_identity}: {payload}")
312312
return f"Hello, {caller_identity}!"
313313
@@ -317,10 +317,10 @@ async def greet_handler(request_id: str, caller_identity: str, payload: str, res
317317
- `request_id`: A unique identifier for this RPC request
318318
- `caller_identity`: The identity of the RemoteParticipant who initiated the RPC call
319319
- `payload`: The data sent by the caller (as a string)
320-
- `response_timeout_ms`: The maximum time available to return a response
320+
- `response_timeout`: The maximum time available to return a response
321321
322322
The handler should return a string or a coroutine that resolves to a string.
323-
If unable to respond within `response_timeout_ms`, the request will result in an error on the caller's side.
323+
If unable to respond within `response_timeout`, the request will result in an error on the caller's side.
324324
325325
You may raise errors of type `RpcError` with a string `message` in the handler,
326326
and they will be received on the caller's side with the message intact.
@@ -343,7 +343,7 @@ def rpc_method(self, method: str):
343343
344344
Example:
345345
@local_participant.rpc_method("greet")
346-
async def greet_handler(request_id: str, caller_identity: str, payload: str, response_timeout_ms: int) -> str:
346+
async def greet_handler(request_id: str, caller_identity: str, payload: str, response_timeout: float) -> str:
347347
print(f"Received greeting from {caller_identity}: {payload}")
348348
return f"Hello, {caller_identity}!"
349349
@@ -379,7 +379,7 @@ async def _handle_rpc_method_invocation(
379379
request_id: str,
380380
caller_identity: str,
381381
payload: str,
382-
response_timeout_ms: int,
382+
response_timeout: float,
383383
) -> None:
384384
response_error: Optional[RpcError] = None
385385
response_payload: Optional[str] = None
@@ -391,12 +391,24 @@ async def _handle_rpc_method_invocation(
391391
else:
392392
try:
393393
if asyncio.iscoroutinefunction(handler):
394-
response_payload = await handler(
395-
request_id, caller_identity, payload, response_timeout_ms
396-
)
394+
async def run_handler():
395+
try:
396+
return await handler(
397+
request_id, caller_identity, payload, response_timeout
398+
)
399+
except asyncio.CancelledError:
400+
# This will be caught by the outer try-except if it's due to timeout
401+
raise
402+
403+
try:
404+
response_payload = await asyncio.wait_for(run_handler(), timeout=response_timeout)
405+
except asyncio.TimeoutError:
406+
raise RpcError._built_in(RpcError.ErrorCode.RESPONSE_TIMEOUT)
407+
except asyncio.CancelledError:
408+
raise RpcError._built_in(RpcError.ErrorCode.RECIPIENT_DISCONNECTED)
397409
else:
398410
response_payload = handler(
399-
request_id, caller_identity, payload, response_timeout_ms
411+
request_id, caller_identity, payload, response_timeout
400412
)
401413
except RpcError as error:
402414
response_error = error
@@ -589,3 +601,5 @@ def track_publications(self) -> Mapping[str, RemoteTrackPublication]:
589601

590602
def __repr__(self) -> str:
591603
return f"rtc.RemoteParticipant(sid={self.sid}, identity={self.identity}, name={self.name})"
604+
605+

livekit-rtc/livekit/rtc/room.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,14 @@ async def disconnect(self) -> None:
400400
"""Disconnects from the room."""
401401
if not self.isconnected():
402402
return
403+
404+
if self._rpc_invocation_tasks:
405+
for task in self._rpc_invocation_tasks:
406+
task.cancel()
407+
await asyncio.gather(*self._rpc_invocation_tasks, return_exceptions=True)
403408

404409
req = proto_ffi.FfiRequest()
405410
req.disconnect.room_handle = self._ffi_handle.handle # type: ignore
406-
407411
queue = FfiClient.instance.queue.subscribe()
408412
try:
409413
resp = FfiClient.instance.request(req)
@@ -412,12 +416,6 @@ async def disconnect(self) -> None:
412416
)
413417
finally:
414418
FfiClient.instance.queue.unsubscribe(queue)
415-
416-
if self._rpc_invocation_tasks:
417-
for task in self._rpc_invocation_tasks:
418-
task.cancel()
419-
await asyncio.gather(*self._rpc_invocation_tasks, return_exceptions=True)
420-
421419
await self._task
422420
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
423421

@@ -457,7 +455,7 @@ def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent):
457455
rpc_invocation.request_id,
458456
rpc_invocation.caller_identity,
459457
rpc_invocation.payload,
460-
rpc_invocation.response_timeout_ms,
458+
rpc_invocation.response_timeout_ms / 1000.0,
461459
)
462460
)
463461
self._rpc_invocation_tasks.add(task)

0 commit comments

Comments
 (0)