Skip to content

Commit 7a93dc8

Browse files
authored
Merge pull request #316 from webrtc-rs/fix/track-binding-pause
Pause in TrackBinding when direction is not send
2 parents 0c35e61 + 6cfb302 commit 7a93dc8

File tree

5 files changed

+82
-4
lines changed

5 files changed

+82
-4
lines changed

webrtc/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
* Added more stats to `RemoteInboundRTPStats` and `RemoteOutboundRTPStats` [#282](https://github.com/webrtc-rs/webrtc/pull/282) by [@k0nserv](https://github.com/k0nserv).
66
* Don't register `video/rtx` codecs in `MediaEngine::register_default_codecs`. These weren't actually support and prevented RTX in the existing RTP stream from being used. Long term we should support RTX via this method, this is tracked in [#295](https://github.com/webrtc-rs/webrtc/issues/295). [#294 Remove video/rtx codecs](https://github.com/webrtc-rs/webrtc/pull/294) contributed by [k0nserv](https://github.com/k0nserv)
77
* Add IP filter to WebRTC `SettingEngine` [#306](https://github.com/webrtc-rs/webrtc/pull/306)
8-
8+
* Stop sequence numbers from increasing in `TrackLocalStaticSample` while the bound `RTCRtpSender` have
9+
directions that should not send. [#316](https://github.com/webrtc-rs/webrtc/pull/316)
910

1011
## 0.5.1
1112

webrtc/src/rtp_transceiver/rtp_sender/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ impl RTCRtpSender {
332332
.await,
333333
ssrc: context.ssrc,
334334
write_stream: context.write_stream.clone(),
335+
paused: self.paused.clone(),
335336
};
336337

337338
t.bind(&new_context).await
@@ -392,6 +393,7 @@ impl RTCRtpSender {
392393
write_stream: Some(
393394
Arc::clone(&write_stream) as Arc<dyn TrackLocalWriter + Send + Sync>
394395
),
396+
paused: self.paused.clone(),
395397
};
396398

397399
let codec = if let Some(t) = &*track {

webrtc/src/track/track_local/mod.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub struct TrackLocalContext {
3535
pub(crate) params: RTCRtpParameters,
3636
pub(crate) ssrc: SSRC,
3737
pub(crate) write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>,
38+
pub(crate) paused: Arc<AtomicBool>,
3839
}
3940

4041
impl TrackLocalContext {
@@ -104,20 +105,31 @@ pub(crate) struct TrackBinding {
104105
ssrc: SSRC,
105106
payload_type: PayloadType,
106107
write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>,
108+
sender_paused: Arc<AtomicBool>,
109+
}
110+
111+
impl TrackBinding {
112+
pub fn is_sender_paused(&self) -> bool {
113+
self.sender_paused.load(Ordering::SeqCst)
114+
}
107115
}
108116

109117
pub(crate) struct InterceptorToTrackLocalWriter {
110118
pub(crate) interceptor_rtp_writer: Mutex<Option<Arc<dyn RTPWriter + Send + Sync>>>,
111-
paused: Arc<AtomicBool>,
119+
sender_paused: Arc<AtomicBool>,
112120
}
113121

114122
impl InterceptorToTrackLocalWriter {
115123
pub(crate) fn new(paused: Arc<AtomicBool>) -> Self {
116124
InterceptorToTrackLocalWriter {
117125
interceptor_rtp_writer: Mutex::new(None),
118-
paused,
126+
sender_paused: paused,
119127
}
120128
}
129+
130+
fn is_sender_paused(&self) -> bool {
131+
self.sender_paused.load(Ordering::SeqCst)
132+
}
121133
}
122134

123135
impl std::fmt::Debug for InterceptorToTrackLocalWriter {
@@ -129,7 +141,7 @@ impl std::fmt::Debug for InterceptorToTrackLocalWriter {
129141
#[async_trait]
130142
impl TrackLocalWriter for InterceptorToTrackLocalWriter {
131143
async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
132-
if self.paused.load(Ordering::SeqCst) {
144+
if self.is_sender_paused() {
133145
return Ok(0);
134146
}
135147

webrtc/src/track/track_local/track_local_static_rtp.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,20 @@ impl TrackLocalStaticRTP {
2828
pub fn codec(&self) -> RTCRtpCodecCapability {
2929
self.codec.clone()
3030
}
31+
32+
pub async fn any_binding_paused(&self) -> bool {
33+
let bindings = self.bindings.lock().await;
34+
bindings
35+
.iter()
36+
.any(|b| b.sender_paused.load(Ordering::SeqCst))
37+
}
38+
39+
pub async fn all_binding_paused(&self) -> bool {
40+
let bindings = self.bindings.lock().await;
41+
bindings
42+
.iter()
43+
.all(|b| b.sender_paused.load(Ordering::SeqCst))
44+
}
3145
}
3246

3347
#[async_trait]
@@ -50,6 +64,7 @@ impl TrackLocal for TrackLocalStaticRTP {
5064
payload_type: codec.payload_type,
5165
write_stream: t.write_stream(),
5266
id: t.id(),
67+
sender_paused: t.paused.clone(),
5368
}));
5469
}
5570

@@ -112,6 +127,11 @@ impl TrackLocalWriter for TrackLocalStaticRTP {
112127
/// If one PeerConnection fails the packets will still be sent to
113128
/// all PeerConnections. The error message will contain the ID of the failed
114129
/// PeerConnections so you can remove them
130+
///
131+
/// If the RTCRtpSender direction is such that no packets should be sent, any call to this
132+
/// function are blocked internally. Care must be taken to not increase the sequence number
133+
/// while the sender is paused. While the actual _sending_ is blocked, the receiver will
134+
/// miss out when the sequence number "rolls over", which in turn will break SRTP.
115135
async fn write_rtp(&self, p: &rtp::packet::Packet) -> Result<usize> {
116136
let mut n = 0;
117137
let mut write_errs = vec![];
@@ -122,6 +142,10 @@ impl TrackLocalWriter for TrackLocalStaticRTP {
122142
bindings.clone()
123143
};
124144
for b in bindings {
145+
if b.is_sender_paused() {
146+
// See caveat in function doc.
147+
continue;
148+
}
125149
pkt.header.ssrc = b.ssrc;
126150
pkt.header.payload_type = b.payload_type;
127151
if let Some(write_stream) = &b.write_stream {

webrtc/src/track/track_local/track_local_static_sample.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use super::*;
33
use crate::error::flatten_errs;
44

55
use crate::track::RTP_OUTBOUND_MTU;
6+
use log::warn;
67
use media::Sample;
78
use tokio::sync::Mutex;
89

@@ -11,6 +12,7 @@ struct TrackLocalStaticSampleInternal {
1112
packetizer: Option<Box<dyn rtp::packetizer::Packetizer + Send + Sync>>,
1213
sequencer: Option<Box<dyn rtp::sequence::Sequencer + Send + Sync>>,
1314
clock_rate: f64,
15+
did_warn_about_wonky_pause: bool,
1416
}
1517

1618
/// TrackLocalStaticSample is a TrackLocal that has a pre-set codec and accepts Samples.
@@ -32,6 +34,7 @@ impl TrackLocalStaticSample {
3234
packetizer: None,
3335
sequencer: None,
3436
clock_rate: 0.0f64,
37+
did_warn_about_wonky_pause: false,
3538
}),
3639
}
3740
}
@@ -52,6 +55,42 @@ impl TrackLocalStaticSample {
5255
return Ok(());
5356
}
5457

58+
let (any_paused, all_paused) = (
59+
self.rtp_track.any_binding_paused().await,
60+
self.rtp_track.all_binding_paused().await,
61+
);
62+
63+
if all_paused {
64+
// Abort already here to not increment sequence numbers.
65+
return Ok(());
66+
}
67+
68+
if any_paused {
69+
// This is a problem state due to how this impl is structured. The sequencer will allocate
70+
// one sequence number per RTP packet regardless of how many TrackBinding that will send
71+
// the packet. I.e. we get the same sequence number per multiple SSRC, which is not good
72+
// for SRTP, but that's how it works.
73+
//
74+
// SRTP has a further problem with regards to jumps in sequence number. Consider this:
75+
//
76+
// 1. Create track local
77+
// 2. Bind track local to track 1.
78+
// 3. Bind track local to track 2.
79+
// 4. Pause track 1.
80+
// 5. Keep sending...
81+
//
82+
// At this point, the track local will keep incrementing the sequence number, because we have
83+
// one binding that is still active. However SRTP hmac verifying (tag), can only accept a
84+
// relatively small jump in sequence numbers since it uses the ROC (i.e. how many times the
85+
// sequence number has rolled over), which means if this pause state of one binding persists
86+
// for a longer time, the track can never be resumed since the receiver would have missed
87+
// the rollovers.
88+
if !internal.did_warn_about_wonky_pause {
89+
internal.did_warn_about_wonky_pause = true;
90+
warn!("Detected multiple track bindings where only one was paused");
91+
}
92+
}
93+
5594
// skip packets by the number of previously dropped packets
5695
if let Some(sequencer) = &internal.sequencer {
5796
for _ in 0..sample.prev_dropped_packets {

0 commit comments

Comments
 (0)