Skip to content

Commit 834ff65

Browse files
committed
feat: sliding window bitrate and start paused
1 parent d02c5a3 commit 834ff65

File tree

4 files changed

+62
-55
lines changed

4 files changed

+62
-55
lines changed

pulsebeam/src/bitrate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ impl Default for BitrateControllerConfig {
2323
headroom_factor: 1.0,
2424
max_decay_factor: 0.95,
2525
emergency_drop_threshold: 0.50,
26-
required_up_samples: 5,
26+
required_up_samples: 1,
2727
quantization_step: Bitrate::kbps(10),
2828
}
2929
}

pulsebeam/src/participant/downstream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,23 @@ impl DownstreamAllocator {
3434
MediaKind::Audio => self.audio.add_track(track),
3535
MediaKind::Video => self.video.add_track(track),
3636
}
37+
self.update_allocations();
3738
}
3839

3940
pub fn remove_track(&mut self, track: &TrackReceiver) {
4041
match track.meta.kind {
4142
MediaKind::Audio => self.audio.remove_track(&track.meta.id),
4243
MediaKind::Video => self.video.remove_track(&track.meta.id),
4344
}
45+
self.update_allocations();
4446
}
4547

4648
pub fn add_slot(&mut self, mid: Mid, kind: MediaKind) {
4749
match kind {
4850
MediaKind::Audio => self.audio.add_slot(mid),
4951
MediaKind::Video => self.video.add_slot(mid),
5052
}
53+
self.update_allocations();
5154
}
5255

5356
/// Handle BWE and compute both current and desired bitrate in one pass.

pulsebeam/src/participant/downstream/video.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl VideoAllocator {
9797
};
9898

9999
next_track.1.assigned_mid.replace(slot.mid);
100-
slot.switch_to(next_track.1.track.lowest_quality().clone());
100+
slot.assign_to(next_track.1.track.lowest_quality().clone());
101101
}
102102
}
103103

@@ -301,7 +301,7 @@ impl VideoAllocator {
301301
for (idx, plan) in committed.drain(..).enumerate() {
302302
let slot = &mut self.slots[idx];
303303
if plan.paused {
304-
slot.pause(plan.receiver);
304+
slot.assign_to(plan.receiver);
305305
} else {
306306
slot.switch_to(plan.receiver);
307307
}
@@ -310,16 +310,16 @@ impl VideoAllocator {
310310
let total_allocated = Bitrate::from(total_allocated.max(MIN_BANDWIDTH));
311311
let total_desired = Bitrate::from(total_desired.max(MIN_BANDWIDTH));
312312

313-
if self.ticks >= 30 {
314-
tracing::debug!(
315-
available = %available_bandwidth,
316-
budget = %Bitrate::from(budget),
317-
allocated = %total_allocated,
318-
desired = %total_desired,
319-
"allocation summary"
320-
);
321-
self.ticks = 0;
322-
}
313+
// if self.ticks >= 30 {
314+
tracing::debug!(
315+
available = %available_bandwidth,
316+
budget = %Bitrate::from(budget),
317+
allocated = %total_allocated,
318+
desired = %total_desired,
319+
"allocation summary"
320+
);
321+
self.ticks = 0;
322+
// }
323323
self.ticks += 1;
324324

325325
Some((total_allocated, total_desired))
@@ -453,6 +453,11 @@ impl Slot {
453453
}
454454
}
455455

456+
// similar to switch_to but it will start as paused
457+
pub fn assign_to(&mut self, receiver: SimulcastReceiver) {
458+
self.transition_to(SlotState::Paused { active: receiver });
459+
}
460+
456461
pub fn switch_to(&mut self, mut receiver: SimulcastReceiver) {
457462
if let Some(current) = self.target_receiver()
458463
&& current.rid == receiver.rid
@@ -527,12 +532,6 @@ impl Slot {
527532
self.transition_to(SlotState::Idle);
528533
}
529534

530-
pub fn pause(&mut self, current_receiver: SimulcastReceiver) {
531-
self.transition_to(SlotState::Paused {
532-
active: current_receiver,
533-
});
534-
}
535-
536535
fn request_keyframe(&self, kind: KeyframeRequestKind) {
537536
match self.state() {
538537
SlotState::Resuming { staging } => {

pulsebeam/src/rtp/monitor.rs

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -332,80 +332,85 @@ impl StreamMonitor {
332332
}
333333
}
334334

335+
#[derive(Debug)]
336+
struct Snapshot {
337+
ts: Instant,
338+
bytes: usize,
339+
}
340+
335341
#[derive(Debug)]
336342
pub struct BitrateEstimate {
337343
controller: BitrateController,
338344
last_update: Instant,
339345
accumulated_bytes: usize,
340346

341-
history: VecDeque<f64>,
342-
scratch: Vec<f64>,
343-
window_size: usize,
347+
history: VecDeque<Snapshot>,
348+
window_sum: usize,
349+
max_window_duration: Duration,
344350
}
345351

346352
impl BitrateEstimate {
347353
pub fn new(now: Instant) -> Self {
348354
let config = BitrateControllerConfig {
349355
min_bitrate: Bitrate::kbps(10),
350356
max_bitrate: Bitrate::mbps(5),
351-
default_bitrate: Bitrate::kbps(100),
352-
headroom_factor: 1.0,
353-
required_up_samples: 1,
354-
quantization_step: Bitrate::kbps(10),
357+
default_bitrate: Bitrate::kbps(300),
355358
..Default::default()
356359
};
357360

358-
let controller = BitrateController::new(config);
359-
let window_size = 35;
360-
361361
Self {
362+
controller: BitrateController::new(config),
362363
last_update: now,
363364
accumulated_bytes: 0,
364-
history: VecDeque::with_capacity(window_size),
365-
scratch: Vec::with_capacity(window_size),
366-
window_size,
367-
controller,
365+
history: VecDeque::new(),
366+
window_sum: 0,
367+
max_window_duration: Duration::from_secs(1),
368368
}
369369
}
370370

371371
pub fn record(&mut self, packet_len: usize) {
372-
self.accumulated_bytes = self.accumulated_bytes.saturating_add(packet_len);
372+
self.accumulated_bytes += packet_len;
373373
}
374374

375375
pub fn poll(&mut self, now: Instant) {
376376
let elapsed = now.saturating_duration_since(self.last_update);
377-
378377
if elapsed < Duration::from_millis(200) {
379378
return;
380379
}
381380

382-
let elapsed_secs = elapsed.as_secs_f64();
383-
if elapsed_secs == 0.0 {
384-
return;
385-
}
386-
387-
let raw_bps = (self.accumulated_bytes as f64 * 8.0) / elapsed_secs;
381+
let snapshot = Snapshot {
382+
ts: now,
383+
bytes: self.accumulated_bytes,
384+
};
385+
self.window_sum += snapshot.bytes;
386+
self.history.push_back(snapshot);
388387

389-
if self.history.len() >= self.window_size {
390-
self.history.pop_front();
388+
while let Some(front) = self.history.front() {
389+
if now.saturating_duration_since(front.ts) > self.max_window_duration {
390+
let removed = self.history.pop_front().unwrap();
391+
self.window_sum = self.window_sum.saturating_sub(removed.bytes);
392+
} else {
393+
break;
394+
}
391395
}
392-
self.history.push_back(raw_bps);
393396

394-
self.scratch.clear();
395-
self.scratch.extend(self.history.iter());
397+
let actual_window_duration = if let Some(front) = self.history.front() {
398+
now.saturating_duration_since(front.ts)
399+
} else {
400+
Duration::ZERO
401+
};
396402

397-
let robust_bps = if self.scratch.is_empty() {
398-
raw_bps
403+
// Safety: ensure we don't divide by zero at the very start
404+
let valid_duration = actual_window_duration.max(elapsed).as_secs_f64();
405+
406+
// sliding window average
407+
let bps = if valid_duration > 0.001 {
408+
(self.window_sum as f64 * 8.0) / valid_duration
399409
} else {
400-
let target_idx = (self.scratch.len() as f64 * 0.5) as usize;
401-
let target_idx = target_idx.min(self.scratch.len().saturating_sub(1));
402-
let (_, &mut val, _) = self
403-
.scratch
404-
.select_nth_unstable_by(target_idx, |a, b| a.total_cmp(b));
405-
val
410+
0.0
406411
};
407412

408-
self.controller.update(Bitrate::from(robust_bps));
413+
self.controller.update(Bitrate::from(bps));
409414
self.last_update = now;
410415
self.accumulated_bytes = 0;
411416
}

0 commit comments

Comments
 (0)