Skip to content

Commit f56a450

Browse files
committed
fix: drain video frames with MediaBlackhole when video_buffered=False
When video_buffered=False, attach a MediaBlackhole to the video proxy track to continuously consume and discard frames. This prevents unbounded queue growth in RTCRtpReceiver._queue (aiortc issue #554). Without draining, unconsumed video frames accumulate at ~400 MiB/10sec because aiortc's receiver queue has no size limit.
1 parent d85f1f9 commit f56a450

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

getstream/video/rtc/pc.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Any, Optional
44

55
import aiortc
6-
from aiortc.contrib.media import MediaRelay
6+
from aiortc.contrib.media import MediaBlackhole, MediaRelay
77
from aiortc.mediastreams import MediaStreamTrack
88
from aiortc.rtcrtpparameters import RTCRtpCodecCapability
99
from aiortc.rtcrtpsender import RTCRtpSender
@@ -142,6 +142,7 @@ def __init__(
142142

143143
self.track_map = {} # track_id -> (MediaRelay, original_track)
144144
self.video_frame_trackers = {} # track_id -> VideoFrameTracker
145+
self._video_blackhole: Optional[MediaBlackhole] = None
145146

146147
@self.on("track")
147148
async def on_track(track: aiortc.mediastreams.MediaStreamTrack):
@@ -180,7 +181,17 @@ def _emit_pcm(pcm: PcmData):
180181
asyncio.create_task(handler.start())
181182

182183
buffered = self._video_buffered if track.kind == "video" else True
183-
self.emit("track_added", relay.subscribe(tracked_track, buffered=buffered), user)
184+
proxy = relay.subscribe(tracked_track, buffered=buffered)
185+
186+
# Drain unconsumed video frames to prevent unbounded queue growth
187+
# in RTCRtpReceiver (aiortc issue #554)
188+
if track.kind == "video" and not self._video_buffered:
189+
blackhole = MediaBlackhole()
190+
blackhole.addTrack(proxy)
191+
asyncio.create_task(blackhole.start())
192+
self._video_blackhole = blackhole
193+
194+
self.emit("track_added", proxy, user)
184195

185196
@self.on("icegatheringstatechange")
186197
def on_icegatheringstatechange():

0 commit comments

Comments
 (0)