Skip to content

Commit 38adb3d

Browse files
Implement task_done calls for FFI queue in various RTC components to ensure proper task management and resource cleanup.
1 parent 30ac4e9 commit 38adb3d

File tree

7 files changed

+65
-31
lines changed

7 files changed

+65
-31
lines changed

livekit-rtc/livekit/rtc/audio_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ async def capture_frame(self, frame: AudioFrame) -> None:
142142
cb: proto_ffi.FfiEvent = await queue.wait_for(
143143
lambda e: e.capture_audio_frame.async_id == resp.capture_audio_frame.async_id
144144
)
145+
queue.task_done()
145146
finally:
146147
FfiClient.instance.queue.unsubscribe(queue)
147148

livekit-rtc/livekit/rtc/audio_stream.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,6 @@ def __init__(
108108
if noise_cancellation is not None:
109109
self._audio_filter_module = noise_cancellation.module_id
110110
self._audio_filter_options = noise_cancellation.options
111-
self._task = self._loop.create_task(self._run())
112-
self._task.add_done_callback(task_done_logger)
113111

114112
stream: Any = None
115113
if "participant" in kwargs:
@@ -120,6 +118,9 @@ def __init__(
120118
stream = self._create_owned_stream()
121119
self._ffi_handle = FfiHandle(stream.handle.id)
122120
self._info = stream.info
121+
122+
self._task = self._loop.create_task(self._run())
123+
self._task.add_done_callback(task_done_logger)
123124

124125
@classmethod
125126
def from_participant(
@@ -261,18 +262,29 @@ def _create_owned_stream_from_participant(
261262
return resp.audio_stream_from_participant.stream
262263

263264
async def _run(self):
265+
"""Run the audio stream.
266+
267+
This method is responsible for receiving audio frames from the audio stream and
268+
putting them into the queue. It also handles the EOS event and unsubscribes from
269+
the FFI queue.
270+
271+
It must be initialized after self._ffi_queue is subscribed.
272+
"""
264273
while True:
265-
event = await self._ffi_queue.wait_for(self._is_event)
266-
audio_event: proto_audio_frame.AudioStreamEvent = event.audio_stream_event
267-
268-
if audio_event.HasField("frame_received"):
269-
owned_buffer_info = audio_event.frame_received.frame
270-
frame = AudioFrame._from_owned_info(owned_buffer_info)
271-
event = AudioFrameEvent(frame)
272-
self._queue.put(event)
273-
elif audio_event.HasField("eos"):
274-
self._queue.put(None)
275-
break
274+
try:
275+
event = await self._ffi_queue.wait_for(self._is_event)
276+
audio_event: proto_audio_frame.AudioStreamEvent = event.audio_stream_event
277+
278+
if audio_event.HasField("frame_received"):
279+
owned_buffer_info = audio_event.frame_received.frame
280+
frame = AudioFrame._from_owned_info(owned_buffer_info)
281+
event = AudioFrameEvent(frame)
282+
self._queue.put(event)
283+
elif audio_event.HasField("eos"):
284+
self._queue.put(None)
285+
break
286+
finally:
287+
self._ffi_queue.task_done()
276288

277289
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
278290

livekit-rtc/livekit/rtc/data_stream.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ async def _send_header(self):
182182
cb: proto_ffi.FfiEvent = await queue.wait_for(
183183
lambda e: e.send_stream_header.async_id == resp.send_stream_header.async_id
184184
)
185+
queue.task_done()
185186
finally:
186187
FfiClient.instance.queue.unsubscribe(queue)
187188

@@ -206,6 +207,7 @@ async def _send_chunk(self, chunk: proto_DataStream.Chunk):
206207
cb: proto_ffi.FfiEvent = await queue.wait_for(
207208
lambda e: e.send_stream_chunk.async_id == resp.send_stream_chunk.async_id
208209
)
210+
queue.task_done()
209211
finally:
210212
FfiClient.instance.queue.unsubscribe(queue)
211213

@@ -227,6 +229,7 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer):
227229
cb: proto_ffi.FfiEvent = await queue.wait_for(
228230
lambda e: e.send_stream_trailer.async_id == resp.send_stream_trailer.async_id
229231
)
232+
queue.task_done()
230233
finally:
231234
FfiClient.instance.queue.unsubscribe(queue)
232235

livekit-rtc/livekit/rtc/participant.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ async def publish_data(
210210
cb: proto_ffi.FfiEvent = await queue.wait_for(
211211
lambda e: e.publish_data.async_id == resp.publish_data.async_id
212212
)
213+
queue.task_done()
213214
finally:
214215
FfiClient.instance.queue.unsubscribe(queue)
215216

@@ -238,6 +239,7 @@ async def publish_dtmf(self, *, code: int, digit: str) -> None:
238239
cb: proto_ffi.FfiEvent = await queue.wait_for(
239240
lambda e: e.publish_sip_dtmf.async_id == resp.publish_sip_dtmf.async_id
240241
)
242+
queue.task_done()
241243
finally:
242244
FfiClient.instance.queue.unsubscribe(queue)
243245

@@ -278,6 +280,7 @@ async def publish_transcription(self, transcription: Transcription) -> None:
278280
cb: proto_ffi.FfiEvent = await queue.wait_for(
279281
lambda e: e.publish_transcription.async_id == resp.publish_transcription.async_id
280282
)
283+
queue.task_done()
281284
finally:
282285
FfiClient.instance.queue.unsubscribe(queue)
283286

@@ -321,6 +324,7 @@ async def perform_rpc(
321324
cb = await queue.wait_for(
322325
lambda e: (e.perform_rpc.async_id == resp.perform_rpc.async_id)
323326
)
327+
queue.task_done()
324328
finally:
325329
FfiClient.instance.queue.unsubscribe(queue)
326330

@@ -494,6 +498,7 @@ async def set_metadata(self, metadata: str) -> None:
494498
await queue.wait_for(
495499
lambda e: e.set_local_metadata.async_id == resp.set_local_metadata.async_id
496500
)
501+
queue.task_done()
497502
finally:
498503
FfiClient.instance.queue.unsubscribe(queue)
499504

@@ -516,6 +521,7 @@ async def set_name(self, name: str) -> None:
516521
await queue.wait_for(
517522
lambda e: e.set_local_name.async_id == resp.set_local_name.async_id
518523
)
524+
queue.task_done()
519525
finally:
520526
FfiClient.instance.queue.unsubscribe(queue)
521527

@@ -546,6 +552,7 @@ async def set_attributes(self, attributes: dict[str, str]) -> None:
546552
await queue.wait_for(
547553
lambda e: e.set_local_attributes.async_id == resp.set_local_attributes.async_id
548554
)
555+
queue.task_done()
549556
finally:
550557
FfiClient.instance.queue.unsubscribe(queue)
551558

livekit-rtc/livekit/rtc/room.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ def on_participant_connected(participant):
386386
cb: proto_ffi.FfiEvent = await queue.wait_for(
387387
lambda e: e.connect.async_id == resp.connect.async_id
388388
)
389+
queue.task_done()
389390
finally:
390391
FfiClient.instance.queue.unsubscribe(queue)
391392

@@ -428,6 +429,7 @@ async def get_rtc_stats(self) -> RtcStats:
428429
cb: proto_ffi.FfiEvent = await queue.wait_for(
429430
lambda e: e.get_session_stats.async_id == resp.get_session_stats.async_id
430431
)
432+
queue.task_done()
431433
finally:
432434
FfiClient.instance.queue.unsubscribe(queue)
433435

@@ -475,6 +477,7 @@ async def disconnect(self) -> None:
475477
try:
476478
resp = FfiClient.instance.request(req)
477479
await queue.wait_for(lambda e: e.disconnect.async_id == resp.disconnect.async_id)
480+
queue.task_done()
478481
finally:
479482
FfiClient.instance.queue.unsubscribe(queue)
480483

livekit-rtc/livekit/rtc/track.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ async def get_stats(self) -> List[proto_stats.RtcStats]:
5858
cb: proto_ffi.FfiEvent = await queue.wait_for(
5959
lambda e: e.get_stats.async_id == resp.get_stats.async_id
6060
)
61+
queue.task_done()
6162
finally:
6263
FfiClient.instance.queue.unsubscribe(queue)
6364

livekit-rtc/livekit/rtc/video_stream.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,12 @@ def _create_owned_stream_from_participant(
122122
) -> Any:
123123
req = proto_ffi.FfiRequest()
124124
video_stream_from_participant = req.video_stream_from_participant
125-
video_stream_from_participant.participant_handle = participant._ffi_handle.handle
126-
video_stream_from_participant.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
125+
video_stream_from_participant.participant_handle = (
126+
participant._ffi_handle.handle
127+
)
128+
video_stream_from_participant.type = (
129+
proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
130+
)
127131
video_stream_from_participant.track_source = track_source
128132
video_stream_from_participant.normalize_stride = True
129133
if self._format is not None:
@@ -133,22 +137,25 @@ def _create_owned_stream_from_participant(
133137

134138
async def _run(self) -> None:
135139
while True:
136-
event = await self._ffi_queue.wait_for(self._is_event)
137-
video_event = event.video_stream_event
138-
139-
if video_event.HasField("frame_received"):
140-
owned_buffer_info = video_event.frame_received.buffer
141-
frame = VideoFrame._from_owned_info(owned_buffer_info)
142-
143-
event = VideoFrameEvent(
144-
frame=frame,
145-
timestamp_us=video_event.frame_received.timestamp_us,
146-
rotation=video_event.frame_received.rotation,
147-
)
148-
149-
self._queue.put(event)
150-
elif video_event.HasField("eos"):
151-
break
140+
try:
141+
event = await self._ffi_queue.wait_for(self._is_event)
142+
video_event = event.video_stream_event
143+
144+
if video_event.HasField("frame_received"):
145+
owned_buffer_info = video_event.frame_received.buffer
146+
frame = VideoFrame._from_owned_info(owned_buffer_info)
147+
148+
event = VideoFrameEvent(
149+
frame=frame,
150+
timestamp_us=video_event.frame_received.timestamp_us,
151+
rotation=video_event.frame_received.rotation,
152+
)
153+
154+
self._queue.put(event)
155+
elif video_event.HasField("eos"):
156+
break
157+
finally:
158+
self._ffi_queue.task_done()
152159

153160
FfiClient.instance.queue.unsubscribe(self._ffi_queue)
154161

0 commit comments

Comments
 (0)