|
3 | 3 | from typing import Any, Optional |
4 | 4 |
|
5 | 5 | import aiortc |
6 | | -from aiortc.contrib.media import MediaRelay |
| 6 | +from aiortc.contrib.media import MediaBlackhole, MediaRelay |
7 | 7 | from aiortc.mediastreams import MediaStreamTrack |
8 | 8 | from aiortc.rtcrtpparameters import RTCRtpCodecCapability |
9 | 9 | from aiortc.rtcrtpsender import RTCRtpSender |
@@ -131,15 +131,19 @@ def __init__( |
131 | 131 | self, |
132 | 132 | connection, |
133 | 133 | configuration: aiortc.RTCConfiguration, |
| 134 | + drain_video_frames: bool = False, |
134 | 135 | ) -> None: |
135 | 136 | logger.info( |
136 | 137 | f"creating subscriber peer connection with configuration: {configuration}" |
137 | 138 | ) |
138 | 139 | super().__init__(configuration) |
139 | 140 | self.connection = connection |
| 141 | + self._drain_video_frames = drain_video_frames |
140 | 142 |
|
141 | 143 | self.track_map = {} # track_id -> (MediaRelay, original_track) |
142 | 144 | self.video_frame_trackers = {} # track_id -> VideoFrameTracker |
| 145 | + self._video_blackholes: dict[str, MediaBlackhole] = {} |
| 146 | + self._video_drain_tasks: dict[str, asyncio.Task] = {} |
143 | 147 |
|
144 | 148 | @self.on("track") |
145 | 149 | async def on_track(track: aiortc.mediastreams.MediaStreamTrack): |
@@ -177,7 +181,20 @@ def _emit_pcm(pcm: PcmData): |
177 | 181 | handler = AudioTrackHandler(relay.subscribe(tracked_track), _emit_pcm) |
178 | 182 | asyncio.create_task(handler.start()) |
179 | 183 |
|
180 | | - self.emit("track_added", relay.subscribe(tracked_track), user) |
| 184 | + proxy = relay.subscribe(tracked_track) |
| 185 | + |
| 186 | + # Drain unconsumed video frames to prevent unbounded queue growth |
| 187 | + # in RTCRtpReceiver (aiortc issue #554) |
| 188 | + if track.kind == "video" and self._drain_video_frames: |
| 189 | + drain_proxy = relay.subscribe(tracked_track) |
| 190 | + blackhole = MediaBlackhole() |
| 191 | + blackhole.addTrack(drain_proxy) |
| 192 | + self._video_blackholes[track.id] = blackhole |
| 193 | + self._video_drain_tasks[track.id] = asyncio.create_task( |
| 194 | + blackhole.start() |
| 195 | + ) |
| 196 | + |
| 197 | + self.emit("track_added", proxy, user) |
181 | 198 |
|
182 | 199 | @self.on("icegatheringstatechange") |
183 | 200 | def on_icegatheringstatechange(): |
|
0 commit comments