Skip to content

Commit 4addefb

Browse files
committed
feat: synchronized playback
1 parent c4c8d61 commit 4addefb

File tree

6 files changed

+333
-309
lines changed

6 files changed

+333
-309
lines changed

pulsebeam-runtime/src/collections/ring.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,30 @@ impl<T: Clone> RingBuffer<T> {
8282
slid
8383
}
8484

85+
/// Removes and returns an item at a specific sequence number, if it exists.
86+
/// This does not affect the head or tail pointers. It is useful for cherry-picking
87+
/// items from the buffer without advancing the read cursor (`tail`).
88+
pub fn remove(&mut self, seq: u64) -> Option<T> {
89+
if !self.initialized {
90+
return None;
91+
}
92+
93+
// A sequence number `s` is within the valid range `[tail, head)` if and only if
94+
// the distance from the tail to `s` is less than the distance from the tail to the head.
95+
// This check correctly handles the u64 wrap-around.
96+
let distance_from_tail = seq.wrapping_sub(self.tail);
97+
let buffer_length = self.head.wrapping_sub(self.tail);
98+
99+
if distance_from_tail >= buffer_length {
100+
// The sequence number is outside the valid range (either too old or too new).
101+
return None;
102+
}
103+
104+
// The sequence number is within our valid range, so we can calculate the index.
105+
let index = (seq % self.capacity()) as usize;
106+
self.buffer[index].take()
107+
}
108+
85109
/// Takes the item at the current tail, advancing the tail if successful.
86110
pub fn pop_front(&mut self) -> Option<T> {
87111
if self.tail == self.head {
@@ -358,4 +382,50 @@ mod tests {
358382

359383
assert_eq!(buffer.pop_front(), None);
360384
}
385+
386+
#[test]
387+
fn test_remove() {
388+
let mut buffer = RingBuffer::new(10);
389+
buffer.insert(10, 100); // tail=10, head=11
390+
buffer.insert(12, 120); // tail=10, head=13
391+
392+
// Remove an existing item
393+
assert_eq!(buffer.remove(10), Some(100));
394+
// The buffer should now contain a None at that position
395+
assert!(buffer.buffer[(10 % 10) as usize].is_none());
396+
// Pointers should be unchanged
397+
assert_eq!(buffer.tail(), 10);
398+
assert_eq!(buffer.head(), 13);
399+
400+
// Try to remove it again
401+
assert_eq!(buffer.remove(10), None);
402+
403+
// Remove another existing item
404+
assert_eq!(buffer.remove(12), Some(120));
405+
assert!(buffer.buffer[(12 % 10) as usize].is_none());
406+
407+
// Try to remove an item that was never there (a gap)
408+
assert_eq!(buffer.remove(11), None);
409+
410+
// Try to remove an item older than tail
411+
assert_eq!(buffer.remove(9), None);
412+
413+
// Try to remove an item that is at or newer than head
414+
assert_eq!(buffer.remove(13), None);
415+
assert_eq!(buffer.remove(14), None);
416+
417+
// Test remove on wrap around
418+
let mut buffer = RingBuffer::new(10);
419+
let start_seq = u64::MAX - 2;
420+
buffer.insert(start_seq, 1);
421+
buffer.insert(start_seq.wrapping_add(1), 2);
422+
buffer.insert(start_seq.wrapping_add(3), 4); // seq=0
423+
424+
assert_eq!(buffer.tail(), start_seq);
425+
assert_eq!(buffer.head(), start_seq.wrapping_add(4)); // seq=1
426+
427+
assert_eq!(buffer.remove(start_seq.wrapping_add(3)), Some(4));
428+
assert_eq!(buffer.remove(start_seq.wrapping_add(2)), None);
429+
assert_eq!(buffer.remove(start_seq), Some(1));
430+
}
361431
}

pulsebeam/src/participant/downstream.rs

Lines changed: 65 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,8 @@ impl DownstreamAllocator {
446446
) -> TrackStream {
447447
async_stream::stream! {
448448
let mut reader = TrackReader::new(track, control_rx);
449-
while let Some(item) = reader.next_packet().await {
450-
yield item;
449+
while let Some(pkt) = reader.next_packet().await {
450+
yield (reader.id.clone(), pkt);
451451
}
452452
}
453453
.boxed()
@@ -467,8 +467,13 @@ impl Stream for DownstreamAllocator {
467467
enum TrackReaderState {
468468
/// The stream is paused and waiting for a new configuration.
469469
Paused,
470+
Resuming {
471+
active_index: usize,
472+
},
470473
/// Actively streaming from a single, stable layer.
471-
Streaming { active_index: usize },
474+
Streaming {
475+
active_index: usize,
476+
},
472477
/// Attempting to switch to a new layer (`active_index`) while forwarding packets
473478
/// from the old layer (`fallback_index`) until the new one becomes active.
474479
Transitioning {
@@ -483,9 +488,9 @@ pub struct TrackReader {
483488
track: TrackReceiver,
484489
control_rx: watch::Receiver<StreamConfig>,
485490

491+
switcher: Switcher,
486492
state: TrackReaderState,
487493
config: StreamConfig,
488-
switcher: Switcher,
489494
}
490495

491496
impl TrackReader {
@@ -495,26 +500,25 @@ impl TrackReader {
495500
MediaKind::Audio => rtp::AUDIO_FREQUENCY,
496501
MediaKind::Video => rtp::VIDEO_FREQUENCY,
497502
};
498-
let switcher = Switcher::new(clock_rate);
499503
let mut this = Self {
500504
id: track.meta.id.clone(),
501505
track,
502506
control_rx,
507+
switcher: Switcher::new(clock_rate),
503508
state: TrackReaderState::Paused,
504509
config,
505-
switcher,
506510
};
507511
// seed initial state
508512
this.update_state(config);
509513
this
510514
}
511515

512516
/// Poll for the next RTP packet, handling fallback and layer switching.
513-
pub async fn next_packet(&mut self) -> Option<TrackStreamItem> {
517+
pub async fn next_packet(&mut self) -> Option<RtpPacket> {
514518
loop {
515519
self.maybe_update_state();
516520
if let Some(pkt) = self.switcher.pop() {
517-
return Some((self.id.clone(), pkt));
521+
return Some(pkt);
518522
}
519523

520524
match self.state {
@@ -524,6 +528,37 @@ impl TrackReader {
524528
return None;
525529
}
526530
}
531+
TrackReaderState::Resuming { active_index } => {
532+
let receiver = &mut self.track.simulcast[active_index];
533+
tokio::select! {
534+
biased;
535+
536+
res = self.control_rx.changed() => {
537+
if res.is_err() { return None; }
538+
continue; // Re-evaluate state on next loop iteration
539+
},
540+
541+
res = receiver.channel.recv() => {
542+
match res {
543+
Ok(pkt) => {
544+
self.switcher.stage(pkt.value.clone());
545+
},
546+
Err(spmc::RecvError::Lagged(n)) => {
547+
tracing::warn!(track_id = %self.id, "Receiver lagged {n}, requesting keyframe");
548+
receiver.request_keyframe(str0m::media::KeyframeRequestKind::Pli);
549+
}
550+
Err(spmc::RecvError::Closed) => {
551+
tracing::warn!(track_id = %self.id, "Channel closed, ending stream");
552+
return None;
553+
}
554+
}
555+
}
556+
}
557+
558+
if self.switcher.is_ready() {
559+
self.state = TrackReaderState::Streaming { active_index };
560+
}
561+
}
527562
TrackReaderState::Streaming { active_index } => {
528563
let receiver = &mut self.track.simulcast[active_index];
529564
tokio::select! {
@@ -573,7 +608,7 @@ impl TrackReader {
573608
res = active_receiver.channel.recv() => {
574609
match res {
575610
Ok(pkt) => {
576-
self.switcher.push(pkt.value.clone());
611+
self.switcher.stage(pkt.value.clone());
577612
}
578613
Err(spmc::RecvError::Lagged(n)) => {
579614
tracing::warn!(track_id = %self.id, "New active stream lagged {n}, completing switch anyway");
@@ -606,7 +641,7 @@ impl TrackReader {
606641
}
607642
}
608643

609-
if self.switcher.is_stable() {
644+
if self.switcher.is_ready() {
610645
self.state = TrackReaderState::Streaming { active_index };
611646
}
612647
}
@@ -640,8 +675,16 @@ impl TrackReader {
640675
// Determine the next state based on the current state and the new config.
641676
let next_state = match self.state {
642677
TrackReaderState::Paused => new_active_index
643-
.map(|idx| TrackReaderState::Streaming { active_index: idx })
678+
.map(|idx| TrackReaderState::Resuming { active_index: idx })
644679
.unwrap_or(TrackReaderState::Paused),
680+
TrackReaderState::Resuming { active_index } => match new_active_index {
681+
// TODO: double check if this is safe to resume to a different index here
682+
Some(new_idx) if new_idx != active_index => {
683+
TrackReaderState::Resuming { active_index }
684+
}
685+
Some(_) => self.state,
686+
None => TrackReaderState::Paused,
687+
},
645688
TrackReaderState::Streaming { active_index } => {
646689
match new_active_index {
647690
Some(new_idx) if new_idx != active_index => TrackReaderState::Transitioning {
@@ -670,30 +713,27 @@ impl TrackReader {
670713
if self.state != next_state {
671714
tracing::debug!(track_id = %self.id, "TrackReader state changed from {:?} to {:?}", self.state, next_state);
672715

673-
match (self.state, next_state) {
674-
(
675-
_,
676-
TrackReaderState::Transitioning {
677-
active_index,
678-
fallback_index,
679-
},
680-
) => {
681-
let fallback_receiver_rid = { self.track.simulcast[fallback_index].rid };
716+
match next_state {
717+
TrackReaderState::Resuming { active_index } => {
682718
let active_receiver = &mut self.track.simulcast[active_index];
683719
active_receiver.channel.reset();
684720
active_receiver.request_keyframe(KeyframeRequestKind::Fir);
685721
tracing::info!(track_id = %self.id,
686-
"switch simulcast layer: {:?} -> {:?}",
687-
fallback_receiver_rid, active_receiver.rid
722+
"resuming simulcast layer: {:?}",
723+
active_receiver.rid
688724
);
689725
}
690-
(TrackReaderState::Paused, TrackReaderState::Streaming { active_index }) => {
726+
TrackReaderState::Transitioning {
727+
active_index,
728+
fallback_index,
729+
} => {
730+
let fallback_receiver_rid = { self.track.simulcast[fallback_index].rid };
691731
let active_receiver = &mut self.track.simulcast[active_index];
692732
active_receiver.channel.reset();
693733
active_receiver.request_keyframe(KeyframeRequestKind::Fir);
694734
tracing::info!(track_id = %self.id,
695-
"resuming simulcast layer: {:?}",
696-
active_receiver.rid
735+
"switch simulcast layer: {:?} -> {:?}",
736+
fallback_receiver_rid, active_receiver.rid
697737
);
698738
}
699739
_ => {}

0 commit comments

Comments
 (0)