Skip to content

feat: configurable video frame buffering with blackhole drain#231

Merged
aliev merged 7 commits intomainfrom
fix/configurable-video-buffering
Mar 30, 2026
Merged

feat: configurable video frame buffering with blackhole drain#231
aliev merged 7 commits intomainfrom
fix/configurable-video-buffering

Conversation

@aliev
Copy link
Copy Markdown
Member

@aliev aliev commented Mar 30, 2026

Why

aiortc's RTCRtpReceiver uses an unbounded asyncio.Queue for decoded video frames (aiortc#554). When nobody calls recv() on the video track, frames accumulate at ~400 MiB per 10 seconds. Voice-only agents that receive but don't consume video hit OOM within minutes.

Changes

  • Add drain_video_frames parameter (default False) propagated through ConnectionManagerPeerConnectionManagerSubscriberPeerConnection
  • When drain_video_frames=True, attach a MediaBlackhole to video proxy tracks to continuously consume and discard frames, preventing unbounded queue growth
  • Default behavior unchanged

How it works

Without drain (drain_video_frames=False, default):

SFU → aiortc decoder → RTCRtpReceiver._queue → (nobody reads) → OOM

With drain (drain_video_frames=True):

SFU → aiortc decoder → RTCRtpReceiver._queue → MediaRelay → proxy → MediaBlackhole → discarded

Benchmark (local k8s, 30sec video call)

drain_video_frames=False drain_video_frames=True
Peak RSS 1654 MiB (+1404) 230 MiB (+10)

Usage

connection = await rtc.join(call, user_id, drain_video_frames=True)

Companion PR

Summary by CodeRabbit

  • New Features
    • Added an optional video-frame draining capability to subscriber peer connections, letting the system consume and discard incoming video frames to reduce buffering and resource buildup when enabled. This is configurable at connection setup and applied per-subscriber to improve stability under high incoming-video load.

aliev added 2 commits March 30, 2026 21:32
Pass video_buffered=False through ConnectionManager → PeerConnectionManager
→ SubscriberPeerConnection to relay.subscribe(buffered=False) for video
tracks. Default True preserves current behavior.

Without buffering, only the latest video frame is kept in memory instead
of an unbounded queue. This prevents OOM on voice-only agents that
subscribe to video but never consume frames (~400 MiB/10sec leak).
When video_buffered=False, attach a MediaBlackhole to the video proxy
track to continuously consume and discard frames. This prevents
unbounded queue growth in RTCRtpReceiver._queue (aiortc issue #554).

Without draining, unconsumed video frames accumulate at ~400 MiB/10sec
because aiortc's receiver queue has no size limit.
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 30, 2026

Warning

Rate limit exceeded

@aliev has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 6 minutes and 36 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 6 minutes and 36 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 48ab4d72-1466-4708-95f8-8a995f03383d

📥 Commits

Reviewing files that changed from the base of the PR and between 8239876 and d28776c.

📒 Files selected for processing (1)
  • getstream/video/rtc/pc.py
📝 Walkthrough

Walkthrough

A new boolean parameter drain_video_frames was added and propagated from ConnectionManager → PeerConnectionManager → SubscriberPeerConnection. When enabled, subscriber handling creates relay proxies for incoming video tracks and starts a MediaBlackhole-backed async task to consume (drain) frames, with per-track lifecycle tracking.

Changes

Cohort / File(s) Summary
Connection manager wiring
getstream/video/rtc/connection_manager.py
Added drain_video_frames: bool = False to ConnectionManager.__init__ and pass it into PeerConnectionManager.
Peer connection manager
getstream/video/rtc/peer_connection.py
Store drain_video_frames on PeerConnectionManager and pass it to SubscriberPeerConnection when setting up subscribers.
Subscriber peer connection / draining logic
getstream/video/rtc/pc.py
SubscriberPeerConnection constructor gains drain_video_frames. On incoming video tracks, always create a proxy = relay.subscribe(tracked_track) and emit that proxy. If drain_video_frames is true, create a second drain subscription attached to a MediaBlackhole, start an async drain task, and retain references in _video_blackholes and _video_drain_tasks. Audio handling unchanged.

Sequence Diagram(s)

mermaid
sequenceDiagram
participant CM as ConnectionManager
participant PCM as PeerConnectionManager
participant SPC as SubscriberPeerConnection
participant Relay as MediaRelay
participant MB as MediaBlackhole
CM->>PCM: initialize(drain_video_frames)
PCM->>SPC: setup_subscriber(drain_video_frames)
Note right of SPC: on("track", tracked_track)
SPC->>Relay: subscribe(tracked_track) -> proxy
SPC--)Client: emit("track_added", proxy)
alt drain_video_frames == true and track is video
SPC->>Relay: subscribe(tracked_track) -> drain_proxy
SPC->>MB: attach(drain_proxy)
SPC->>SPC: start async drain task (store in _video_drain_tasks)
end

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 A flag hopped down the manager line,
It told each peer, "Let blackholes dine,"
Tracks get proxied, one to the stream, one to sleep,
Async paws munch frames while state they keep,
Tiny toggle, tidy flow — hop, and all is fine.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: configurable video frame buffering with blackhole drain' accurately describes the main change: adding a configurable mechanism with a drain_video_frames parameter to prevent memory growth by discarding unused video frames via a MediaBlackhole approach.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/configurable-video-buffering

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@getstream/video/rtc/pc.py`:
- Around line 188-192: Current code overwrites single _video_blackhole and
_video_drain_task when multiple video tracks arrive, leaking the previous
MediaBlackhole/task and preventing deterministic cleanup; change the
implementation to store blackholes and drain tasks in maps keyed by track.id
(e.g., self._video_blackholes: Dict[track_id, MediaBlackhole] and
self._video_drain_tasks: Dict[track_id, asyncio.Task]) when creating them in the
track handler (where MediaBlackhole() and asyncio.create_task are called) and
update handle_track_ended to look up by track.id, cancel and await/cancel the
task, and remove both entries to ensure proper cleanup.
- Around line 184-194: The current code reuses the same RelayStreamTrack
subscription (proxy returned by relay.subscribe(tracked_track)) for both
downstream consumers and the MediaBlackhole, which makes them compete for
frames; change the blackhole logic to create a separate subscription for
draining by calling relay.subscribe(tracked_track) again (e.g., drain_proxy) and
add that separate track to MediaBlackhole, keep the original proxy for
emit("track_added", ...), and assign the blackhole and start task to
self._video_blackhole and self._video_drain_task as before so only the
drain_proxy is consumed by MediaBlackhole and downstream consumers get the
original proxy.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 480a4aa8-5710-4ea2-ba60-0d612588d906

📥 Commits

Reviewing files that changed from the base of the PR and between 4e3c87d and 7f5dc2e.

📒 Files selected for processing (3)
  • getstream/video/rtc/connection_manager.py
  • getstream/video/rtc/pc.py
  • getstream/video/rtc/peer_connection.py

…racks

- Create a dedicated relay subscription for blackhole drain so it
  doesn't compete with downstream consumers for frames
- Use dicts keyed by track ID instead of single references to support
  multiple concurrent video tracks
Comment on lines +193 to +194
self._video_drain_tasks[track.id] = asyncio.create_task(
blackhole.start()
Copy link
Copy Markdown
Contributor

@dangusev dangusev Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use add_done_callback to clean up the tasks and stop blackholes in case the track swithches on/off

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants