Skip to content

Commit 9307219

Browse files
committed
handle stale keyframe
1 parent 2c8386d commit 9307219

File tree

3 files changed

+79
-24
lines changed

3 files changed

+79
-24
lines changed

pulsebeam-runtime/src/collections/ring.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ impl<T: Clone> RingBuffer<T> {
3030
self.tail
3131
}
3232

33+
/// Returns the number of sequence number slots between the tail and the head.
34+
/// This represents the span of the buffer, including any empty (lost) packets.
35+
pub fn len(&self) -> u64 {
36+
if !self.initialized {
37+
return 0;
38+
}
39+
self.head.wrapping_sub(self.tail)
40+
}
41+
3342
fn initialize(&mut self, seq: u64) {
3443
self.head = seq.wrapping_add(1);
3544
self.tail = seq;
@@ -479,4 +488,33 @@ mod tests {
479488
assert_eq!(buffer.tail(), 10);
480489
assert_eq!(buffer.head(), 13);
481490
}
491+
492+
#[test]
493+
fn test_len() {
494+
let mut buffer = RingBuffer::<i32>::new(10);
495+
assert_eq!(buffer.len(), 0);
496+
497+
buffer.insert(100, 1); // tail=100, head=101
498+
assert_eq!(buffer.len(), 1);
499+
500+
buffer.insert(101, 2); // tail=100, head=102
501+
assert_eq!(buffer.len(), 2);
502+
503+
buffer.insert(105, 6); // tail=100, head=106
504+
assert_eq!(buffer.len(), 6); // Length is the span, not the count of items
505+
506+
let _ = buffer.pop_front(); // tail=101, head=106
507+
assert_eq!(buffer.len(), 5);
508+
509+
// Test wrap-around
510+
let mut buffer = RingBuffer::<i32>::new(10);
511+
let start_seq = u64::MAX - 1;
512+
buffer.insert(start_seq, 1);
513+
buffer.insert(start_seq.wrapping_add(1), 2); // u64::MAX
514+
buffer.insert(start_seq.wrapping_add(2), 3); // 0
515+
516+
assert_eq!(buffer.tail(), start_seq);
517+
assert_eq!(buffer.head(), start_seq.wrapping_add(3));
518+
assert_eq!(buffer.len(), 3);
519+
}
482520
}

pulsebeam/src/rtp/buffer.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::rtp::RtpPacket;
22
use pulsebeam_runtime::collections::ring::RingBuffer;
33
use str0m::rtp::SeqNo;
4+
use tokio::time::Instant;
45

56
const KEYFRAME_BUFFER_CAPACITY: usize = 256;
67

@@ -32,7 +33,7 @@ pub struct KeyframeBuffer {
3233
ring: RingBuffer<RtpPacket>,
3334
state: State,
3435
/// The sequence number of the keyframe that we are currently buffering or draining.
35-
keyframe_start_seq: Option<SeqNo>,
36+
keyframe_segment: Option<(SeqNo, Instant)>,
3637
/// The next sequence number we expect to pop. This is used to handle out-of-order
3738
/// packets and to detect and skip gaps.
3839
next_pop_seq: Option<SeqNo>,
@@ -49,14 +50,19 @@ impl KeyframeBuffer {
4950
Self {
5051
ring: RingBuffer::new(KEYFRAME_BUFFER_CAPACITY),
5152
state: State::Waiting,
52-
keyframe_start_seq: None,
53+
keyframe_segment: None,
5354
next_pop_seq: None,
5455
}
5556
}
5657

5758
/// Returns `true` if a keyframe has been received and the buffer is ready to be drained.
58-
pub fn is_ready(&self) -> bool {
59-
self.state == State::Buffering || self.state == State::Draining
59+
pub fn is_ready(&self, target_playout: Instant) -> bool {
60+
self.state == State::Buffering
61+
&& self
62+
.keyframe_segment
63+
.map(|s| s.1 >= target_playout)
64+
.unwrap_or_default()
65+
|| self.state == State::Draining
6066
}
6167

6268
/// Adds an RTP packet to the buffer, applying state-specific logic.
@@ -65,12 +71,15 @@ impl KeyframeBuffer {
6571
// A new keyframe has arrived. We must handle it based on our current state.
6672
match self.state {
6773
State::Waiting | State::Buffering => {
68-
// If we were waiting for a keyframe, or buffering an old one that
69-
// the consumer hasn't started reading yet, we reset to this new keyframe.
70-
self.ring.advance_tail_to(*pkt.seq_no); // Evict any older packets.
71-
self.keyframe_start_seq = Some(pkt.seq_no);
72-
self.next_pop_seq = Some(pkt.seq_no);
73-
self.state = State::Buffering;
74+
if *pkt.seq_no > self.ring.head() {
75+
// If we were waiting for a keyframe, or buffering an old one that
76+
// the consumer hasn't started reading yet, we reset to this new keyframe.
77+
self.ring.advance_tail_to(*pkt.seq_no); // Evict any older packets.
78+
self.keyframe_segment
79+
.replace((pkt.seq_no, pkt.playout_time));
80+
self.next_pop_seq = Some(pkt.seq_no);
81+
self.state = State::Buffering;
82+
}
7483
}
7584
State::Draining => {
7685
// We are already draining a keyframe. We will buffer this new one,
@@ -113,8 +122,8 @@ impl KeyframeBuffer {
113122
let mut next_seq = self.next_pop_seq?;
114123

115124
// Search for the next available packet, starting from `next_pop_seq`.
116-
// We limit the search to the buffer's capacity to prevent infinite loops.
117-
for _ in 0..KEYFRAME_BUFFER_CAPACITY {
125+
// We limit the search to the buffer's len to prevent infinite loops.
126+
for _ in 0..self.ring.len() {
118127
if let Some(pkt) = self.ring.remove(*next_seq) {
119128
// Found the next packet. Update our expected sequence and return it.
120129
self.next_pop_seq = Some(next_seq.wrapping_add(1).into());

pulsebeam/src/rtp/switcher.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use str0m::media::Frequency;
2+
use tokio::time::Instant;
23

34
use crate::rtp::RtpPacket;
45
use crate::rtp::buffer::KeyframeBuffer;
@@ -16,6 +17,8 @@ pub struct Switcher {
1617
/// The state for the *new* stream we are switching to.
1718
/// This is `Some` only when a switch is in progress.
1819
staging: Option<KeyframeBuffer>,
20+
21+
latest_playout: Instant,
1922
}
2023

2124
impl Switcher {
@@ -24,49 +27,54 @@ impl Switcher {
2427
timeline: Timeline::new(clock_rate),
2528
pending: None,
2629
staging: None,
30+
latest_playout: Instant::now(),
2731
}
2832
}
2933

3034
/// Pushes a packet from the **old/current** stream.
3135
/// This is typically used to forward the stream that is already playing out.
3236
pub fn push(&mut self, pkt: RtpPacket) {
33-
// rewrite active stream here so we don't race with the new stream
34-
let pkt = self.timeline.rewrite(pkt);
3537
self.pending.replace(pkt);
3638
}
3739

3840
/// Pushes a packet for the **new** stream we are preparing to switch to.
3941
/// The first call to this method will initiate the switching process.
4042
pub fn stage(&mut self, pkt: RtpPacket) {
4143
let staging = self.staging.get_or_insert_default();
42-
43-
if pkt.is_keyframe_start {
44-
self.timeline.rebase(&pkt);
45-
}
4644
staging.push(pkt);
4745
}
4846

4947
/// Returns true if the new stream has received a keyframe and is ready to be popped.
5048
pub fn is_ready(&self) -> bool {
51-
self.staging.as_ref().map(|s| s.is_ready()).unwrap_or(false)
49+
self.staging
50+
.as_ref()
51+
.map(|s| s.is_ready(self.latest_playout))
52+
.unwrap_or(false)
5253
}
5354

5455
/// Pops the next available packet, prioritizing the old stream to ensure a smooth drain.
5556
pub fn pop(&mut self) -> Option<RtpPacket> {
5657
// --- Priority 1: Drain the pending packet from the OLD stream. ---
5758
if let Some(pending_pkt) = self.pending.take() {
58-
return Some(pending_pkt);
59+
if pending_pkt.playout_time > self.latest_playout {
60+
self.latest_playout = pending_pkt.playout_time;
61+
}
62+
return Some(self.timeline.rewrite(pending_pkt));
63+
}
64+
65+
if !self.is_ready() {
66+
return None;
5967
}
6068

61-
// TODO: deal with older frames here.
6269
// --- Priority 2: Pop packets from the NEW stream if a switch is in progress. ---
6370
if let Some(staging) = &mut self.staging {
6471
if let Some(staged_pkt) = staging.pop() {
72+
if staged_pkt.is_keyframe_start {
73+
self.timeline.rebase(&staged_pkt);
74+
}
6575
return Some(self.timeline.rewrite(staged_pkt));
6676
} else {
67-
if staging.is_ready() {
68-
self.staging = None;
69-
}
77+
self.staging = None;
7078
return None;
7179
}
7280
}

0 commit comments

Comments
 (0)