Skip to content

Commit 690c3d0

Browse files
committed
consider loss, tweak midpoint, packet grouping
1 parent f74c6a8 commit 690c3d0

File tree

2 files changed

+90
-24
lines changed

2 files changed

+90
-24
lines changed

makefiles/net.mk

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ all: help
1111
# Presets should specify TARGET_RTT and jitter as if measuring end-to-end
1212
# This function divides by 2 automatically for localhost since packets traverse rules twice
1313
net-apply: net-clear net-verify
14-
$(eval HALF_LATENCY=$(shell echo "$(TARGET_LATENCY)" | sed 's/ms//' | awk '{print $$1/2}'))
14+
$(eval HALF_LATENCY=$(shell echo "$(TARGET_RTT)" | sed 's/ms//' | awk '{print $$1/2}'))
1515
$(eval HALF_UPLOAD_JITTER=$(shell echo "$(UPLOAD_JITTER)" | sed 's/ms.*//' | awk '{print $$1/2}'))
1616
$(eval HALF_DOWNLOAD_JITTER=$(shell echo "$(DOWNLOAD_JITTER)" | sed 's/ms.*//' | awk '{print $$1/2}'))
1717
$(eval JITTER_DIST=$(shell echo "$(UPLOAD_JITTER)" | grep -o 'distribution.*' || echo ""))
@@ -55,7 +55,6 @@ net-apply: net-clear net-verify
5555
net-good-home-wifi:
5656
@$(MAKE) net-apply \
5757
TARGET_RTT="40ms" \
58-
TARGET_LATENCY="40" \
5958
UPLOAD_RATE="25mbit" \
6059
DOWNLOAD_RATE="100mbit" \
6160
UPLOAD_JITTER="10ms" \
@@ -68,7 +67,6 @@ net-good-home-wifi:
6867
net-congested-wifi:
6968
@$(MAKE) net-apply \
7069
TARGET_RTT="50ms" \
71-
TARGET_LATENCY="50" \
7270
UPLOAD_RATE="5mbit" \
7371
DOWNLOAD_RATE="20mbit" \
7472
UPLOAD_JITTER="40ms" \
@@ -78,26 +76,34 @@ net-congested-wifi:
7876

7977
# Simulates a stable 4G/5G connection.
8078
# Target RTT: ~80ms
81-
net-stable-mobile:
79+
net-mobile-stable:
8280
@$(MAKE) net-apply \
8381
TARGET_RTT="80ms" \
84-
TARGET_LATENCY="80" \
8582
UPLOAD_RATE="8mbit" \
8683
DOWNLOAD_RATE="40mbit" \
8784
UPLOAD_JITTER="30ms" \
8885
DOWNLOAD_JITTER="30ms" \
8986
UPLOAD_PACKET_LOSS="0%" \
9087
DOWNLOAD_PACKET_LOSS="0.2%"
9188

89+
net-mobile-low-bandwidth:
90+
@$(MAKE) net-apply \
91+
TARGET_RTT="160ms" \
92+
UPLOAD_RATE="1mbit" \
93+
DOWNLOAD_RATE="10mbit" \
94+
UPLOAD_JITTER="30ms" \
95+
DOWNLOAD_JITTER="30ms" \
96+
UPLOAD_PACKET_LOSS="0%" \
97+
DOWNLOAD_PACKET_LOSS="0.2%"
98+
9299
# Simulates LTE with 1 bar signal - harsh bufferbloat environment
93100
# Real-world reference: Min ~75ms, Avg ~116ms, Max ~650ms, Loss ~0.3%
94101
# Calibration: TARGET 90ms sets the correct mode/baseline.
95102
# Jitter 70ms with paretonormal provides the heavy tail without dragging the Min/Avg too far.
96103
# 25% correlation simulates the "clumping" of latency spikes seen in the log.
97-
net-lte-1bar:
104+
net-mobile-1bar:
98105
@$(MAKE) net-apply \
99106
TARGET_RTT="90ms" \
100-
TARGET_LATENCY="90" \
101107
UPLOAD_RATE="3mbit" \
102108
DOWNLOAD_RATE="15mbit" \
103109
UPLOAD_JITTER="70ms 25% distribution pareto" \
@@ -110,7 +116,6 @@ net-lte-1bar:
110116
net-cross-continent:
111117
@$(MAKE) net-apply \
112118
TARGET_RTT="140ms" \
113-
TARGET_LATENCY="140" \
114119
UPLOAD_RATE="10mbit" \
115120
DOWNLOAD_RATE="50mbit" \
116121
UPLOAD_JITTER="20ms" \
@@ -123,7 +128,6 @@ net-cross-continent:
123128
net-unusable:
124129
@$(MAKE) net-apply \
125130
TARGET_RTT="300ms" \
126-
TARGET_LATENCY="300" \
127131
UPLOAD_RATE="500kbit" \
128132
DOWNLOAD_RATE="1mbit" \
129133
UPLOAD_JITTER="200ms" \

pulsebeam/src/rtp/monitor.rs

Lines changed: 77 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,11 @@ impl StreamMonitor {
178178
);
179179
}
180180

181-
let metrics: RawMetrics = (&self.delta_delta).into();
181+
let mut metrics: RawMetrics = (&self.delta_delta).into();
182+
// Reset the counters so we measure the interval (windowed) metrics next time.
183+
// This makes the loss score reactive to RECENT conditions rather than lifetime average.
184+
self.delta_delta.snapshot_and_reset();
185+
182186
let was_inactive = self.shared_state.is_inactive();
183187
let is_inactive = self
184188
.determine_inactive_state(now, metrics.frame_duration * INACTIVE_TIMEOUT_MULTIPLIER);
@@ -244,9 +248,11 @@ impl StreamMonitor {
244248
}
245249
}
246250

247-
let quality_score = metrics.calculate_jitter_score();
248-
let new_quality = metrics.quality_hysteresis(quality_score, self.current_quality);
251+
let jitter_score = metrics.calculate_jitter_score();
252+
let loss_score = metrics.calculate_loss_score();
253+
let quality_score = jitter_score.min(loss_score);
249254

255+
let new_quality = metrics.quality_hysteresis(quality_score, self.current_quality);
250256
if new_quality == self.current_quality {
251257
return;
252258
}
@@ -283,10 +289,12 @@ impl StreamMonitor {
283289
// Finally, commit the state change.
284290
tracing::info!(
285291
stream_id = %self.stream_id,
286-
"Stream quality transition: {:?} -> {:?} (score: {:.1}, loss: {:.2}%, m_hat: {:.3}, bitrate: {})",
292+
"Stream quality transition: {:?} -> {:?} (score: {:.1}, jitter_score: {:.1}, loss_score: {:.1}, loss: {:.2}%, m_hat: {:.3}, bitrate: {})",
287293
self.current_quality,
288294
new_quality,
289295
quality_score,
296+
jitter_score,
297+
loss_score,
290298
metrics.packet_loss() * 100.0,
291299
metrics.m_hat,
292300
Bitrate::from(self.bwe.bwe_bps_ewma),
@@ -423,9 +431,7 @@ impl RawMetrics {
423431
// and underuse (negative, which can also indicate instability).
424432
// The midpoint will need to be re-tuned. The paper RECOMMENDS
425433
// a threshold of 12.5ms to detect overuse.
426-
//
427-
// midpoint=10.0ms to make it a bit more sensitive to measurement.
428-
sigmoid(self.m_hat.abs(), 100.0, -0.2, 10.0)
434+
sigmoid(self.m_hat.abs(), 100.0, -0.2, 12.5)
429435
}
430436

431437
pub fn calculate_loss_score(&self) -> f64 {
@@ -483,6 +489,13 @@ fn sigmoid(value: f64, range_max: f64, k: f64, midpoint: f64) -> f64 {
483489
range_max / (1.0 + (-k * (value - midpoint)).exp())
484490
}
485491

492+
#[derive(Debug, Clone, Copy)]
493+
struct PacketGroup {
494+
first_arrival: Instant,
495+
last_arrival: Instant,
496+
rtp_ts: MediaTime,
497+
}
498+
486499
#[derive(Debug)]
487500
struct DeltaDeltaState {
488501
head: SeqNo, // Next expected seq
@@ -491,7 +504,7 @@ struct DeltaDeltaState {
491504
last_rtp_ts: MediaTime,
492505
last_arrival: Instant,
493506

494-
m_hat: f64, // The estimate of the queue delay trend, m_hat(i-1)
507+
m_hat: f64, // The Kalman-filtered queue delay trend, m_hat(i-1)
495508
e: f64, // The variance of the estimate, e(i-1)
496509
var_v_hat: f64, // The variance of the measurement noise, var_v_hat(i-1)
497510

@@ -500,6 +513,7 @@ struct DeltaDeltaState {
500513
frame_duration_ms_ewma: f64,
501514

502515
buffer: Vec<Option<PacketStatus>>,
516+
pending_group: Option<PacketGroup>,
503517
initialized: bool,
504518
}
505519

@@ -518,10 +532,16 @@ impl DeltaDeltaState {
518532
packets_expected: 0,
519533
frame_duration_ms_ewma: 1000.0,
520534
buffer: vec![None; cap],
535+
pending_group: None,
521536
initialized: false,
522537
}
523538
}
524539

540+
pub fn snapshot_and_reset(&mut self) {
541+
self.packets_actual = 0;
542+
self.packets_expected = 0;
543+
}
544+
525545
pub fn update(&mut self, packet: &RtpPacket) {
526546
if !self.initialized {
527547
self.init(packet);
@@ -583,9 +603,21 @@ impl DeltaDeltaState {
583603
}
584604

585605
/// Implements https://www.ietf.org/archive/id/draft-ietf-rmcat-gcc-02.txt.
586-
fn advance(&mut self, pkt: &PacketStatus) {
587-
let actual_ms = pkt.arrival.duration_since(self.last_arrival).as_secs_f64() * 1000.0;
588-
let expected_ms = (pkt.rtp_ts.numer().wrapping_sub(self.last_rtp_ts.numer()) as f64)
606+
fn advance_group(&mut self, group: &PacketGroup) {
607+
let actual_ms = if group.last_arrival >= self.last_arrival {
608+
group
609+
.last_arrival
610+
.duration_since(self.last_arrival)
611+
.as_secs_f64()
612+
* 1000.0
613+
} else {
614+
-(self
615+
.last_arrival
616+
.duration_since(group.last_arrival)
617+
.as_secs_f64()
618+
* 1000.0)
619+
};
620+
let expected_ms = (group.rtp_ts.numer().wrapping_sub(self.last_rtp_ts.numer()) as f64)
589621
* 1000.0
590622
/ self.frequency.get() as f64;
591623

@@ -623,8 +655,8 @@ impl DeltaDeltaState {
623655
let z_clamped = z.abs().min(3.0 * self.var_v_hat.sqrt());
624656
self.var_v_hat = (alpha * self.var_v_hat + (1.0 - alpha) * z_clamped.powi(2)).max(1.0);
625657

626-
self.last_arrival = pkt.arrival;
627-
self.last_rtp_ts = pkt.rtp_ts;
658+
self.last_arrival = group.last_arrival;
659+
self.last_rtp_ts = group.rtp_ts;
628660
self.packets_actual += 1;
629661
self.packets_expected += 1;
630662

@@ -666,7 +698,7 @@ impl DeltaDeltaState {
666698
break;
667699
};
668700

669-
self.advance(&pkt);
701+
self.step_group(&pkt);
670702
self.tail = self.tail.wrapping_add(1).into();
671703
}
672704
}
@@ -722,11 +754,41 @@ impl DeltaDeltaState {
722754
continue;
723755
};
724756

725-
self.advance(&pkt);
757+
self.step_group(&pkt);
726758
}
727759
self.tail = end;
728760
}
729761

762+
fn step_group(&mut self, pkt: &PacketStatus) {
763+
let group = if let Some(mut group) = self.pending_group.take() {
764+
if group.rtp_ts == pkt.rtp_ts {
765+
// Same group (frame), update arrival time
766+
if pkt.arrival > group.last_arrival {
767+
group.last_arrival = pkt.arrival;
768+
}
769+
group
770+
} else {
771+
// New group detected. Process the pending one.
772+
self.advance_group(&group);
773+
774+
// Start new group
775+
PacketGroup {
776+
first_arrival: pkt.arrival,
777+
last_arrival: pkt.arrival,
778+
rtp_ts: pkt.rtp_ts,
779+
}
780+
}
781+
} else {
782+
// No pending group, start one
783+
PacketGroup {
784+
first_arrival: pkt.arrival,
785+
last_arrival: pkt.arrival,
786+
rtp_ts: pkt.rtp_ts,
787+
}
788+
};
789+
self.pending_group = Some(group);
790+
}
791+
730792
fn as_index(&self, seq: SeqNo) -> usize {
731793
(*seq % self.buffer.len() as u64) as usize
732794
}

0 commit comments

Comments
 (0)