Skip to content

Commit 6c05d00

Browse files
committed
feat: adaptive inactivity
1 parent 1f84928 commit 6c05d00

File tree

2 files changed

+48
-26
lines changed

2 files changed

+48
-26
lines changed

pulsebeam/src/participant/core.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,20 @@ impl ParticipantCore {
199199
self.rtc.bwe().set_current_bitrate(current);
200200
self.rtc.bwe().set_desired_bitrate(desired);
201201
}
202-
Event::StreamPaused(e) => {
203-
let Some(track) = self.upstream_allocator.get_track_mut(&e.mid) else {
204-
return;
205-
};
206-
let Some(layer) = track.by_rid_mut(&e.rid) else {
207-
return;
208-
};
209-
layer.monitor.set_manual_pause(e.paused);
202+
// rtp monitor handles this
203+
Event::StreamPaused(_) => {
204+
// if e.paused {
205+
// return;
206+
// }
207+
//
208+
// let Some(track) = self.upstream_allocator.get_track_mut(&e.mid) else {
209+
// return;
210+
// };
211+
// let Some(layer) = track.by_rid_mut(&e.rid) else {
212+
// return;
213+
// };
214+
//
215+
// layer.monitor.set_manual_pause(e.paused);
210216
}
211217
e => {
212218
tracing::warn!("unhandled event: {e:?}");

pulsebeam/src/rtp/monitor.rs

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ use tokio::time::Instant;
1010

1111
use crate::rtp::PacketTiming;
1212

13-
/// Defines the wall-clock duration without packets after which a stream is considered inactive.
14-
const INACTIVE_TIMEOUT: Duration = Duration::from_millis(500);
13+
const INACTIVE_TIMEOUT_MULTIPLIER: u32 = 15;
1514
const DELTA_DELTA_WINDOW_SIZE: usize = 128;
1615

1716
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -60,7 +59,6 @@ pub struct StreamMonitor {
6059
shared_state: StreamState,
6160

6261
stream_id: String,
63-
manual_pause: bool,
6462

6563
delta_delta: DeltaDeltaState,
6664
last_packet_at: Instant,
@@ -75,7 +73,6 @@ impl StreamMonitor {
7573
Self {
7674
stream_id,
7775
shared_state,
78-
manual_pause: true,
7976
last_packet_at: now,
8077
delta_delta: DeltaDeltaState::new(DELTA_DELTA_WINDOW_SIZE),
8178
bwe: BitrateEstimate::new(now),
@@ -91,8 +88,15 @@ impl StreamMonitor {
9188
}
9289

9390
pub fn poll(&mut self, now: Instant) {
91+
self.bwe.poll(now);
92+
self.shared_state
93+
.bitrate_bps
94+
.store(self.bwe.estimate_bps() as u64, Ordering::Relaxed);
95+
96+
let metrics: RawMetrics = (&self.delta_delta).into();
9497
let was_inactive = self.shared_state.is_inactive();
95-
let is_inactive = self.determine_inactive_state(now);
98+
let is_inactive = self
99+
.determine_inactive_state(now, metrics.frame_duration * INACTIVE_TIMEOUT_MULTIPLIER);
96100
if is_inactive && !was_inactive {
97101
self.reset(now);
98102
}
@@ -103,14 +107,7 @@ impl StreamMonitor {
103107
return;
104108
}
105109

106-
self.bwe.poll(now);
107-
self.shared_state
108-
.bitrate_bps
109-
.store(self.bwe.estimate_bps() as u64, Ordering::Relaxed);
110-
111-
let metrics: RawMetrics = (&self.delta_delta).into();
112110
let quality_score = metrics.calculate_jitter_score();
113-
114111
let new_quality = metrics.quality_hysteresis(quality_score, self.current_quality);
115112

116113
if new_quality != self.current_quality {
@@ -145,12 +142,8 @@ impl StreamMonitor {
145142
self.shared_state.bitrate_bps.store(0, Ordering::Relaxed);
146143
}
147144

148-
pub fn set_manual_pause(&mut self, paused: bool) {
149-
self.manual_pause = paused;
150-
}
151-
152-
fn determine_inactive_state(&self, now: Instant) -> bool {
153-
self.manual_pause || now.saturating_duration_since(self.last_packet_at) > INACTIVE_TIMEOUT
145+
fn determine_inactive_state(&self, now: Instant, timeout: Duration) -> bool {
146+
now.saturating_duration_since(self.last_packet_at) > timeout
154147
}
155148
}
156149

@@ -228,6 +221,7 @@ impl BitrateEstimate {
228221

229222
struct RawMetrics {
230223
pub m_hat: f64, // The Kalman-filtered queue delay trend
224+
pub frame_duration: Duration,
231225
pub packets_actual: u64,
232226
pub packets_expected: u64,
233227
}
@@ -236,6 +230,7 @@ impl From<&DeltaDeltaState> for RawMetrics {
236230
fn from(value: &DeltaDeltaState) -> Self {
237231
Self {
238232
m_hat: value.m_hat,
233+
frame_duration: Duration::from_millis(value.frame_duration_ms_ewma as u64),
239234
packets_actual: value.packets_actual,
240235
packets_expected: value.packets_expected,
241236
}
@@ -330,6 +325,8 @@ struct DeltaDeltaState {
330325

331326
packets_actual: u64,
332327
packets_expected: u64,
328+
frame_duration_ms_ewma: f64,
329+
333330
buffer: Vec<Option<PacketStatus>>,
334331
initialized: bool,
335332
}
@@ -347,6 +344,7 @@ impl DeltaDeltaState {
347344
var_v_hat: 1.0, // A reasonable starting default (var_v is clamped at 1)
348345
packets_actual: 0,
349346
packets_expected: 0,
347+
frame_duration_ms_ewma: 1000.0,
350348
buffer: vec![None; cap],
351349
initialized: false,
352350
}
@@ -458,6 +456,24 @@ impl DeltaDeltaState {
458456
self.last_rtp_ts = pkt.rtp_ts;
459457
self.packets_actual += 1;
460458
self.packets_expected += 1;
459+
460+
if expected_ms != 0.0 {
461+
const ALPHA_UP: f64 = 0.1;
462+
const ALPHA_DOWN: f64 = 0.01;
463+
464+
let new_duration = if expected_ms > self.frame_duration_ms_ewma {
465+
(1.0 - ALPHA_UP) * self.frame_duration_ms_ewma + ALPHA_UP * expected_ms
466+
} else {
467+
(1.0 - ALPHA_DOWN) * self.frame_duration_ms_ewma + ALPHA_DOWN * expected_ms
468+
};
469+
470+
if (new_duration - self.frame_duration_ms_ewma).abs() > 0.1 {
471+
let from = self.frame_duration_ms_ewma * INACTIVE_TIMEOUT_MULTIPLIER as f64;
472+
let to = new_duration * INACTIVE_TIMEOUT_MULTIPLIER as f64;
473+
tracing::debug!("new inactivity timeout: {:.3}ms -> {:.3}ms", from, to);
474+
}
475+
self.frame_duration_ms_ewma = new_duration;
476+
}
461477
}
462478

463479
fn process_in_order(&mut self) {

0 commit comments

Comments
 (0)