Skip to content

Commit ad21cf7

Browse files
authored
Outbound sequence number overriding (#378)
1 parent ed83f1a commit ad21cf7

File tree

3 files changed

+127
-3
lines changed

3 files changed

+127
-3
lines changed

webrtc/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,14 @@ pub enum Error {
197197
#[error("new track must be of the same kind as previous")]
198198
ErrRTPSenderNewTrackHasIncorrectKind,
199199

200+
/// ErrRTPSenderDataSent indicates that the sequence number transformer tries to be enabled after the data sending began
201+
#[error("Sequence number transformer must be enabled before sending data")]
202+
ErrRTPSenderDataSent,
203+
204+
/// ErrRTPSenderSeqTransEnabled indicates that the sequence number transformer has been already enabled
205+
#[error("Sequence number transformer has been already enabled")]
206+
ErrRTPSenderSeqTransEnabled,
207+
200208
/// ErrUnbindFailed indicates that a TrackLocal was not able to be unbind
201209
#[error("failed to unbind TrackLocal from PeerConnection")]
202210
ErrUnbindFailed,

webrtc/src/rtp_transceiver/rtp_sender/mod.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
2222
use std::sync::{Arc, Weak};
2323
use tokio::sync::{mpsc, Mutex, Notify};
2424

25+
use super::srtp_writer_future::SequenceTransformer;
26+
2527
pub(crate) struct RTPSenderInternal {
2628
pub(crate) send_called_rx: Mutex<mpsc::Receiver<()>>,
2729
pub(crate) stop_called_rx: Arc<Notify>,
@@ -81,6 +83,7 @@ pub struct RTCRtpSender {
8183

8284
pub(crate) srtp_stream: Arc<SrtpWriterFuture>,
8385
pub(crate) stream_info: Mutex<StreamInfo>,
86+
seq_trans: Arc<SequenceTransformer>,
8487

8588
pub(crate) context: Mutex<TrackLocalContext>,
8689

@@ -132,7 +135,7 @@ impl RTCRtpSender {
132135
media_engine: Arc<MediaEngine>,
133136
interceptor: Arc<dyn Interceptor + Send + Sync>,
134137
start_paused: bool,
135-
) -> RTCRtpSender {
138+
) -> Self {
136139
let id = generate_crypto_random_string(
137140
32,
138141
b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ",
@@ -150,13 +153,15 @@ impl RTCRtpSender {
150153
rtcp_interceptor: Mutex::new(None),
151154
});
152155

156+
let seq_trans = Arc::new(SequenceTransformer::new());
153157
let srtp_stream = Arc::new(SrtpWriterFuture {
154158
closed: AtomicBool::new(false),
155159
ssrc,
156160
rtp_sender: Arc::downgrade(&internal),
157161
rtp_transport: Arc::clone(&transport),
158162
rtcp_read_stream: Mutex::new(None),
159163
rtp_write_session: Mutex::new(None),
164+
seq_trans: Arc::clone(&seq_trans),
160165
});
161166

162167
let srtp_rtcp_reader = Arc::clone(&srtp_stream) as Arc<dyn RTCPReader + Send + Sync>;
@@ -170,11 +175,12 @@ impl RTCRtpSender {
170175
.as_ref()
171176
.map(|track| vec![track.stream_id().to_string()])
172177
.unwrap_or_default();
173-
RTCRtpSender {
178+
Self {
174179
track: Mutex::new(track),
175180

176181
srtp_stream,
177182
stream_info: Mutex::new(StreamInfo::default()),
183+
seq_trans,
178184

179185
context: Mutex::new(TrackLocalContext::default()),
180186
transport,
@@ -327,6 +333,8 @@ impl RTCRtpSender {
327333
};
328334

329335
let result = if let Some(t) = &track {
336+
self.seq_trans.reset_offset();
337+
330338
let new_context = TrackLocalContext {
331339
id: context.id.clone(),
332340
params: self
@@ -479,6 +487,18 @@ impl RTCRtpSender {
479487
self.internal.read_rtcp(self.receive_mtu).await
480488
}
481489

490+
/// Enables overriding outgoing `RTP` packets' `sequence number`s.
491+
///
492+
/// Must be called once before any data sent or never called at all.
493+
///
494+
/// # Errors
495+
///
496+
/// Errors if this [`RTCRtpSender`] has started to send data or sequence
497+
/// transforming has been already enabled.
498+
pub fn enable_seq_transformer(&self) -> Result<()> {
499+
self.seq_trans.enable()
500+
}
501+
482502
/// has_sent tells if data has been ever sent for this instance
483503
pub(crate) async fn has_sent(&self) -> bool {
484504
let send_called_tx = self.send_called_tx.lock().await;

webrtc/src/rtp_transceiver/srtp_writer_future.rs

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,92 @@ use interceptor::{Attributes, RTCPReader, RTPWriter};
1212
use std::sync::atomic::{AtomicBool, Ordering};
1313
use std::sync::{Arc, Weak};
1414
use tokio::sync::Mutex;
15+
use util;
16+
17+
/// `RTP` packet sequence number manager.
18+
///
19+
/// Used to override outgoing `RTP` packets' sequence numbers. On creating it is
20+
/// unabled and can be enabled before sending data begining. Once data sending
21+
/// began it can not be enabled any more.
22+
pub(crate) struct SequenceTransformer(util::sync::Mutex<SequenceTransformerInner>);
23+
24+
/// [`SequenceTransformer`] inner.
25+
struct SequenceTransformerInner {
26+
offset: u16,
27+
last_sq: u16,
28+
reset_needed: bool,
29+
enabled: bool,
30+
data_sent: bool,
31+
}
32+
33+
impl SequenceTransformer {
34+
/// Creates a new [`SequenceTransformer`].
35+
pub(crate) fn new() -> Self {
36+
Self(util::sync::Mutex::new(SequenceTransformerInner {
37+
offset: 0,
38+
last_sq: rand::random(),
39+
reset_needed: false,
40+
enabled: false,
41+
data_sent: false,
42+
}))
43+
}
44+
45+
/// Enables this [`SequenceTransformer`].
46+
///
47+
/// # Errors
48+
///
49+
/// With [`Error::ErrRTPSenderSeqTransEnabled`] on trying to enable already
50+
/// enabled [`SequenceTransformer`].
51+
///
52+
/// With [`Error::ErrRTPSenderSeqTransEnabled`] on trying to enable
53+
/// [`SequenceTransformer`] after data sending began.
54+
pub(crate) fn enable(&self) -> Result<()> {
55+
let mut guard = self.0.lock();
56+
57+
if guard.enabled {
58+
return Err(Error::ErrRTPSenderSeqTransEnabled);
59+
}
60+
61+
(!guard.data_sent)
62+
.then(|| {
63+
guard.enabled = true;
64+
})
65+
.ok_or(Error::ErrRTPSenderDataSent)
66+
}
67+
68+
/// Indicates [`SequenceTransformer`] about necessity of recalculating
69+
/// `offset`.
70+
pub(crate) fn reset_offset(&self) {
71+
self.0.lock().reset_needed = true;
72+
}
73+
74+
/// Gets [`Some`] consistent `sequence number` if this [`SequenceTransformer`] is
75+
/// enabled or [`None`] if it is not.
76+
///
77+
/// Once this method is called, considers data sending began.
78+
fn seq_number(&self, raw_sn: u16) -> Option<u16> {
79+
let mut guard = self.0.lock();
80+
guard.data_sent = true;
81+
82+
if !guard.enabled {
83+
return None;
84+
}
85+
86+
let offset = guard
87+
.reset_needed
88+
.then(|| {
89+
guard.reset_needed = false;
90+
let offset = guard.last_sq.overflowing_sub(raw_sn.overflowing_sub(1).0).0;
91+
guard.offset = offset;
92+
offset
93+
})
94+
.unwrap_or(guard.offset);
95+
let next = raw_sn.overflowing_add(offset).0;
96+
guard.last_sq = next;
97+
98+
Some(next)
99+
}
100+
}
15101

16102
/// SrtpWriterFuture blocks Read/Write calls until
17103
/// the SRTP Session is available
@@ -22,6 +108,7 @@ pub(crate) struct SrtpWriterFuture {
22108
pub(crate) rtp_transport: Arc<RTCDtlsTransport>,
23109
pub(crate) rtcp_read_stream: Mutex<Option<Arc<Stream>>>, // atomic.Value // *
24110
pub(crate) rtp_write_session: Mutex<Option<Arc<Session>>>, // atomic.Value // *
111+
pub(crate) seq_trans: Arc<SequenceTransformer>,
25112
}
26113

27114
impl SrtpWriterFuture {
@@ -181,6 +268,15 @@ impl RTCPReader for SrtpWriterFuture {
181268
#[async_trait]
182269
impl RTPWriter for SrtpWriterFuture {
183270
async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> IResult<usize> {
184-
Ok(self.write_rtp(pkt).await?)
271+
Ok(
272+
match self.seq_trans.seq_number(pkt.header.sequence_number) {
273+
Some(seq_num) => {
274+
let mut new_pkt = pkt.clone();
275+
new_pkt.header.sequence_number = seq_num;
276+
self.write_rtp(&new_pkt).await?
277+
}
278+
None => self.write_rtp(pkt).await?,
279+
},
280+
)
185281
}
186282
}

0 commit comments

Comments
 (0)