Skip to content

Commit c4c8d61

Browse files
committed
use ring buffer in keyframe buffer
1 parent f9d00c2 commit c4c8d61

File tree

1 file changed

+6
-33
lines changed

1 file changed

+6
-33
lines changed

pulsebeam/src/rtp/buffer.rs

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,21 @@
11
use crate::rtp::RtpPacket;
2-
use std::{cmp::Reverse, collections::BinaryHeap};
2+
use pulsebeam_runtime::collections::ring::RingBuffer;
33
use str0m::rtp::SeqNo;
44

55
const KEYFRAME_BUFFER_CAPACITY: usize = 256;
66

7-
#[derive(Debug, Eq, PartialEq)]
8-
struct OrderedRtpPacket(RtpPacket);
9-
10-
impl PartialOrd for OrderedRtpPacket {
11-
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
12-
Some(self.cmp(other))
13-
}
14-
}
15-
impl Ord for OrderedRtpPacket {
16-
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
17-
self.0.seq_no.cmp(&other.0.seq_no)
18-
}
19-
}
20-
217
/// A buffer to capture a complete keyframe from a new stream.
228
/// It waits for a packet with `is_keyframe_start` and then drains all
239
/// subsequent packets in-order.
2410
pub struct KeyframeBuffer {
25-
buffer: BinaryHeap<Reverse<OrderedRtpPacket>>,
11+
ring: RingBuffer<RtpPacket>,
2612
start_seq_no: Option<SeqNo>,
2713
}
2814

2915
impl KeyframeBuffer {
3016
pub fn new() -> Self {
3117
Self {
32-
buffer: BinaryHeap::with_capacity(KEYFRAME_BUFFER_CAPACITY),
18+
ring: RingBuffer::new(KEYFRAME_BUFFER_CAPACITY),
3319
start_seq_no: None,
3420
}
3521
}
@@ -44,32 +30,19 @@ impl KeyframeBuffer {
4430
if pkt.is_keyframe_start {
4531
let new_start_seq = pkt.seq_no;
4632
if self.start_seq_no.is_some() {
47-
self.buffer.retain(|item| item.0.0.seq_no >= new_start_seq);
33+
self.ring.advance_tail_to(*new_start_seq);
4834
}
4935
self.start_seq_no = Some(new_start_seq);
5036
}
5137

52-
// Only add the packet if it's not ancient relative to a keyframe we've already found.
53-
if let Some(start_seq) = self.start_seq_no
54-
&& pkt.seq_no < start_seq
55-
{
56-
return;
57-
}
58-
59-
if self.buffer.len() >= KEYFRAME_BUFFER_CAPACITY {
60-
tracing::warn!("keyframe buffer is full, dropping packets unexpectedly");
61-
return;
62-
}
63-
64-
self.buffer.push(Reverse(OrderedRtpPacket(pkt)));
38+
self.ring.insert(*pkt.seq_no, pkt);
6539
}
6640

6741
/// Pops the next packet in sequence if the buffer is ready.
6842
pub fn pop(&mut self) -> Option<RtpPacket> {
6943
let start_seq = self.start_seq_no?;
7044

71-
while let Some(item) = self.buffer.pop() {
72-
let pkt = item.0.0;
45+
while let Some(pkt) = self.ring.pop_front() {
7346
if pkt.seq_no < start_seq {
7447
continue;
7548
}

0 commit comments

Comments
 (0)