Skip to content

Commit b9aab58

Browse files
committed
context aware stream quality
1 parent a4fc051 commit b9aab58

File tree

1 file changed

+50
-19
lines changed

1 file changed

+50
-19
lines changed

pulsebeam/src/rtp/monitor.rs

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ pub struct StreamMonitor {
110110
shared_state: StreamState,
111111

112112
stream_id: String,
113+
kind: MediaKind, // distinguish Audio/Video for scoring
113114

114115
delta_delta: DeltaDeltaState,
115116
last_packet_at: Instant,
@@ -131,6 +132,7 @@ impl StreamMonitor {
131132
};
132133
Self {
133134
stream_id,
135+
kind,
134136
shared_state,
135137
last_packet_at: now,
136138
delta_delta: DeltaDeltaState::new(DELTA_DELTA_WINDOW_SIZE),
@@ -184,7 +186,8 @@ impl StreamMonitor {
184186
);
185187
}
186188

187-
let metrics: RawMetrics = (&self.delta_delta).into();
189+
// Pass MediaKind to metrics to handle Jitter vs Loss sensitivity
190+
let metrics = RawMetrics::new(&self.delta_delta, self.kind);
188191
// Reset the counters so we measure the interval (windowed) metrics next time.
189192
// This makes the loss score reactive to RECENT conditions rather than lifetime average.
190193
self.delta_delta.snapshot_and_reset();
@@ -373,6 +376,8 @@ impl BitrateEstimate {
373376
}
374377

375378
pub fn poll(&mut self, now: Instant) {
379+
// Headroom strategy implies that if we see a burst (like a keyframe),
380+
// we likely have the bandwidth capacity for it.
376381
const HEADROOM: f64 = 1.25;
377382
let elapsed = now.saturating_duration_since(self.last_update);
378383
if elapsed < Duration::from_millis(200) {
@@ -426,20 +431,20 @@ struct RawMetrics {
426431
pub frame_duration: Duration,
427432
pub packets_actual: u64,
428433
pub packets_expected: u64,
434+
pub kind: MediaKind, // context-aware scoring
429435
}
430436

431-
impl From<&DeltaDeltaState> for RawMetrics {
432-
fn from(value: &DeltaDeltaState) -> Self {
437+
impl RawMetrics {
438+
fn new(value: &DeltaDeltaState, kind: MediaKind) -> Self {
433439
Self {
434440
m_hat: value.m_hat,
435441
frame_duration: Duration::from_millis(value.frame_duration_ms_ewma as u64),
436442
packets_actual: value.packets_actual,
437443
packets_expected: value.packets_expected,
444+
kind,
438445
}
439446
}
440-
}
441447

442-
impl RawMetrics {
443448
fn packet_loss(&self) -> f64 {
444449
if self.packets_expected == 0 {
445450
return 0.0;
@@ -451,9 +456,15 @@ impl RawMetrics {
451456
pub fn calculate_jitter_score(&self) -> f64 {
452457
// We use its absolute value to penalize both overuse (positive)
453458
// and underuse (negative, which can also indicate instability).
454-
// The midpoint will need to be re-tuned. The paper RECOMMENDS
455-
// a threshold of 12.5ms to detect overuse.
456-
sigmoid(self.m_hat.abs(), 100.0, -0.2, 12.5)
459+
460+
// Audio is extremely sensitive to jitter (robot voice).
461+
// Video (Simulcast/Screen) can buffer jitter (latency vs quality tradeoff).
462+
let (sensitivity, midpoint) = match self.kind {
463+
MediaKind::Audio => (-0.3, 10.0), // Strict: penalize > 10ms jitter
464+
MediaKind::Video => (-0.1, 50.0), // Loose: allow up to 50ms jitter (screen share bursts)
465+
};
466+
467+
sigmoid(self.m_hat.abs(), 100.0, sensitivity, midpoint)
457468
}
458469

459470
pub fn calculate_loss_score(&self) -> f64 {
@@ -464,21 +475,23 @@ impl RawMetrics {
464475

465476
let loss_ratio = self.packet_loss();
466477

467-
// small packet loss is easy to recover
468-
const GRACE_THRESHOLD: f64 = 0.02;
469-
if loss_ratio <= GRACE_THRESHOLD {
478+
// Audio PLC (Packet Loss Concealment) handles small loss well (~5%).
479+
// Video I-frames/P-frames are sensitive to loss (artifacts).
480+
let (grace_threshold, panic_threshold) = match self.kind {
481+
MediaKind::Audio => (0.05, 0.15),
482+
MediaKind::Video => (0.02, 0.10), // Video hates loss
483+
};
484+
485+
if loss_ratio <= grace_threshold {
470486
return 100.0;
471487
}
472-
473-
const PANIC_THRESHOLD: f64 = 0.12;
474-
if loss_ratio >= PANIC_THRESHOLD {
488+
if loss_ratio >= panic_threshold {
475489
return 0.0;
476490
}
477491

478-
// We map the range [0.02 ... 0.12] to score [100 ... 0]
479-
// Range size = 0.10
480-
let range = PANIC_THRESHOLD - GRACE_THRESHOLD;
481-
let excess_loss = loss_ratio - GRACE_THRESHOLD;
492+
// Map range to score [100 ... 0]
493+
let range = panic_threshold - grace_threshold;
494+
let excess_loss = loss_ratio - grace_threshold;
482495
let penalty_fraction = excess_loss / range;
483496

484497
(100.0 * (1.0 - penalty_fraction)).max(0.0)
@@ -487,7 +500,7 @@ impl RawMetrics {
487500
pub fn quality_hysteresis(&self, score: f64, current: StreamQuality) -> StreamQuality {
488501
match current {
489502
StreamQuality::Excellent => {
490-
if score < 80.0 {
503+
if score < 75.0 {
491504
StreamQuality::Good
492505
} else {
493506
StreamQuality::Excellent
@@ -653,6 +666,24 @@ impl DeltaDeltaState {
653666
* 1000.0
654667
/ self.frequency.get() as f64;
655668

669+
// If the gap between frames is huge (common in screen share or audio pause/DTX),
670+
// the network delay calculation will be misleadingly large (wake-up jitter).
671+
// If > 500ms, we assume this was a pause, not network congestion.
672+
if expected_ms > 500.0 {
673+
// Soft-reset the filter.
674+
// We set m_hat to 0 to assume "normal" delay for this specific jump.
675+
// We boost 'e' (variance) to adapt quickly to whatever new state exists.
676+
self.m_hat = 0.0;
677+
self.e = 0.5;
678+
679+
// Advance pointers but skip Kalman update
680+
self.last_arrival = group.last_arrival;
681+
self.last_rtp_ts = group.rtp_ts;
682+
self.packets_actual += 1;
683+
self.packets_expected += 1;
684+
return;
685+
}
686+
656687
// `d(i)` from the GCC paper (inter-group delay variation).
657688
let skew = actual_ms - expected_ms;
658689

0 commit comments

Comments
 (0)