Skip to content

Commit 4063504

Browse files
authored
Avoid crashing the FfiQueue when subscriber is not cleaned up (#219)
1 parent 31b93c8 commit 4063504

File tree

2 files changed

+72
-1
lines changed

2 files changed

+72
-1
lines changed

examples/multiple_connections.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import os
2+
import asyncio
3+
from livekit import api, rtc
4+
5+
# This example demonstrates running multiple connections sequentially in the same thread.
6+
# This is useful when interoperating with a synchronous framework like Django or Flask
7+
# where you would connect to a LiveKit room as part of a request handler.
8+
9+
# LIVEKIT_URL needs to be set
10+
# also, set either LIVEKIT_TOKEN, or API_KEY and API_SECRET
11+
12+
13+
async def main():
14+
url = os.environ["LIVEKIT_URL"]
15+
token = os.getenv("LIVEKIT_TOKEN")
16+
room = rtc.Room()
17+
if not token:
18+
token = (
19+
api.AccessToken()
20+
.with_identity("python-bot")
21+
.with_name("Python Bot")
22+
.with_grants(
23+
api.VideoGrants(
24+
room_join=True,
25+
room="my-room",
26+
)
27+
)
28+
.to_jwt()
29+
)
30+
31+
track_sub = asyncio.Event()
32+
33+
@room.on("track_subscribed")
34+
def on_track_subscribed(
35+
track: rtc.Track,
36+
publication: rtc.RemoteTrackPublication,
37+
participant: rtc.RemoteParticipant,
38+
):
39+
if track.kind == rtc.TrackKind.KIND_AUDIO:
40+
stream = rtc.AudioStream(track) # the error comes from this line
41+
track_sub.set()
42+
# any created streams would need to be closed explicitly to avoid leaks
43+
asyncio.get_event_loop().create_task(stream.aclose())
44+
print("subscribed to audio track")
45+
46+
await room.connect(url, token)
47+
print(f"connected to room: {room.name}")
48+
await track_sub.wait()
49+
await room.disconnect()
50+
print("disconnected from room")
51+
52+
53+
def ensure_event_loop():
54+
try:
55+
return asyncio.get_event_loop()
56+
except RuntimeError:
57+
# Create a new event loop if none exists (this can happen in some contexts like certain threads)
58+
loop = asyncio.new_event_loop()
59+
asyncio.set_event_loop(loop)
60+
return loop
61+
62+
63+
if __name__ == "__main__":
64+
asyncio.run(main())
65+
asyncio.run(main())
66+
print("successfully ran multiple connections")

livekit-rtc/livekit/rtc/_ffi_client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,12 @@ def __init__(self) -> None:
106106
def put(self, item: T) -> None:
107107
with self._lock:
108108
for queue, loop in self._subscribers:
109-
loop.call_soon_threadsafe(queue.put_nowait, item)
109+
try:
110+
loop.call_soon_threadsafe(queue.put_nowait, item)
111+
except Exception as e:
112+
# this could happen if user closes the runloop without unsubscribing first
113+
# it's not good when it does occur, but we should not fail the entire runloop
114+
logger.error("error putting to queue: %s", e)
110115

111116
def subscribe(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> Queue[T]:
112117
with self._lock:

0 commit comments

Comments
 (0)