Skip to content

Commit 582b52c

Browse files
authored
Merge pull request #450 from rogurotus/unmarshal-optimization
Unmarshal optimization
2 parents 71b2875 + 43f8e8b commit 582b52c

File tree

20 files changed

+233
-176
lines changed

20 files changed

+233
-176
lines changed

examples/examples/rtp-forwarder/rtp-forwarder.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndicat
1818
use webrtc::rtp_transceiver::rtp_codec::{
1919
RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType,
2020
};
21-
use webrtc::util::{Conn, Marshal, Unmarshal};
21+
use webrtc::util::{Conn, Marshal};
2222

2323
#[derive(Clone)]
2424
struct UdpConn {
@@ -208,10 +208,8 @@ async fn main() -> Result<()> {
208208

209209
tokio::spawn(async move {
210210
let mut b = vec![0u8; 1500];
211-
while let Ok((n, _)) = track.read(&mut b).await {
212-
// Unmarshal the packet and update the PayloadType
213-
let mut buf = &b[..n];
214-
let mut rtp_packet = webrtc::rtp::packet::Packet::unmarshal(&mut buf)?;
211+
while let Ok((mut rtp_packet, _)) = track.read(&mut b).await {
212+
// Update the PayloadType
215213
rtp_packet.header.payload_type = c.payload_type;
216214

217215
// Marshal into original buffer with updated PayloadType

interceptor/src/lib.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,19 @@ impl RTPWriter for RTPWriterFn {
104104
#[async_trait]
105105
pub trait RTPReader {
106106
/// read a rtp packet
107-
async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)>;
107+
async fn read(
108+
&self,
109+
buf: &mut [u8],
110+
attributes: &Attributes,
111+
) -> Result<(rtp::packet::Packet, Attributes)>;
108112
}
109113

110114
pub type RTPReaderBoxFn = Box<
111115
dyn (Fn(
112116
&mut [u8],
113117
&Attributes,
114-
) -> Pin<Box<dyn Future<Output = Result<(usize, Attributes)>> + Send + Sync>>)
118+
)
119+
-> Pin<Box<dyn Future<Output = Result<(rtp::packet::Packet, Attributes)>> + Send + Sync>>)
115120
+ Send
116121
+ Sync,
117122
>;
@@ -120,7 +125,11 @@ pub struct RTPReaderFn(pub RTPReaderBoxFn);
120125
#[async_trait]
121126
impl RTPReader for RTPReaderFn {
122127
/// read a rtp packet
123-
async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> {
128+
async fn read(
129+
&self,
130+
buf: &mut [u8],
131+
attributes: &Attributes,
132+
) -> Result<(rtp::packet::Packet, Attributes)> {
124133
self.0(buf, attributes).await
125134
}
126135
}
@@ -163,15 +172,28 @@ impl RTCPWriter for RTCPWriterFn {
163172
#[async_trait]
164173
pub trait RTCPReader {
165174
/// read a batch of rtcp packets
166-
async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)>;
175+
async fn read(
176+
&self,
177+
buf: &mut [u8],
178+
attributes: &Attributes,
179+
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)>;
167180
}
168181

169182
pub type RTCPReaderBoxFn = Box<
170183
dyn (Fn(
171184
&mut [u8],
172185
&Attributes,
173-
) -> Pin<Box<dyn Future<Output = Result<(usize, Attributes)>> + Send + Sync>>)
174-
+ Send
186+
) -> Pin<
187+
Box<
188+
dyn Future<
189+
Output = Result<(
190+
Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>,
191+
Attributes,
192+
)>,
193+
> + Send
194+
+ Sync,
195+
>,
196+
>) + Send
175197
+ Sync,
176198
>;
177199

@@ -180,7 +202,11 @@ pub struct RTCPReaderFn(pub RTCPReaderBoxFn);
180202
#[async_trait]
181203
impl RTCPReader for RTCPReaderFn {
182204
/// read a batch of rtcp packets
183-
async fn read(&self, buf: &mut [u8], attributes: &Attributes) -> Result<(usize, Attributes)> {
205+
async fn read(
206+
&self,
207+
buf: &mut [u8],
208+
attributes: &Attributes,
209+
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
184210
self.0(buf, attributes).await
185211
}
186212
}

interceptor/src/mock/mock_stream.rs

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{Attributes, Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWrite
55
use async_trait::async_trait;
66
use std::sync::Arc;
77
use tokio::sync::{mpsc, Mutex};
8-
use util::{Marshal, Unmarshal};
8+
use util::Marshal;
99

1010
type RTCPPackets = Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>;
1111

@@ -88,26 +88,15 @@ impl MockStream {
8888
let mut buf = vec![0u8; 1500];
8989
let a = Attributes::new();
9090
loop {
91-
let n = match rtcp_reader.read(&mut buf, &a).await {
91+
let pkts = match rtcp_reader.read(&mut buf, &a).await {
9292
Ok((n, _)) => n,
9393
Err(err) => {
94-
if Error::ErrIoEOF != err {
95-
let _ = rtcp_in_modified_tx.send(Err(err)).await;
96-
}
94+
let _ = rtcp_in_modified_tx.send(Err(err)).await;
9795
break;
9896
}
9997
};
10098

101-
let mut b = &buf[..n];
102-
let pkt = match rtcp::packet::unmarshal(&mut b) {
103-
Ok(pkt) => pkt,
104-
Err(err) => {
105-
let _ = rtcp_in_modified_tx.send(Err(err.into())).await;
106-
break;
107-
}
108-
};
109-
110-
let _ = rtcp_in_modified_tx.send(Ok(pkt)).await;
99+
let _ = rtcp_in_modified_tx.send(Ok(pkts)).await;
111100
}
112101
});
113102

@@ -121,21 +110,10 @@ impl MockStream {
121110
let mut buf = vec![0u8; 1500];
122111
let a = Attributes::new();
123112
loop {
124-
let n = match rtp_reader.read(&mut buf, &a).await {
125-
Ok((n, _)) => n,
113+
let pkt = match rtp_reader.read(&mut buf, &a).await {
114+
Ok((pkt, _)) => pkt,
126115
Err(err) => {
127-
if Error::ErrIoEOF != err {
128-
let _ = rtp_in_modified_tx.send(Err(err)).await;
129-
}
130-
break;
131-
}
132-
};
133-
134-
let mut b = &buf[..n];
135-
let pkt = match rtp::packet::Packet::unmarshal(&mut b) {
136-
Ok(pkt) => pkt,
137-
Err(err) => {
138-
let _ = rtp_in_modified_tx.send(Err(err.into())).await;
116+
let _ = rtp_in_modified_tx.send(Err(err)).await;
139117
break;
140118
}
141119
};
@@ -259,7 +237,11 @@ impl RTCPWriter for MockStream {
259237

260238
#[async_trait]
261239
impl RTCPReader for MockStream {
262-
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
240+
async fn read(
241+
&self,
242+
buf: &mut [u8],
243+
a: &Attributes,
244+
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
263245
let pkts = {
264246
let mut rtcp_in = self.rtcp_in_rx.lock().await;
265247
rtcp_in.recv().await.ok_or(Error::ErrIoEOF)?
@@ -272,7 +254,7 @@ impl RTCPReader for MockStream {
272254
}
273255

274256
buf[..n].copy_from_slice(&marshaled);
275-
Ok((n, a.clone()))
257+
Ok((pkts, a.clone()))
276258
}
277259
}
278260

@@ -286,7 +268,11 @@ impl RTPWriter for MockStream {
286268

287269
#[async_trait]
288270
impl RTPReader for MockStream {
289-
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
271+
async fn read(
272+
&self,
273+
buf: &mut [u8],
274+
a: &Attributes,
275+
) -> Result<(rtp::packet::Packet, Attributes)> {
290276
let pkt = {
291277
let mut rtp_in = self.rtp_in_rx.lock().await;
292278
rtp_in.recv().await.ok_or(Error::ErrIoEOF)?
@@ -299,7 +285,7 @@ impl RTPReader for MockStream {
299285
}
300286

301287
buf[..n].copy_from_slice(&marshaled);
302-
Ok((n, a.clone()))
288+
Ok((pkt, a.clone()))
303289
}
304290
}
305291

interceptor/src/nack/generator/generator_stream.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use super::*;
33
use crate::nack::UINT16SIZE_HALF;
44

55
use util::sync::Mutex;
6-
use util::Unmarshal;
76

87
struct GeneratorStreamInternal {
98
packets: Vec<u64>,
@@ -152,14 +151,16 @@ impl GeneratorStream {
152151
#[async_trait]
153152
impl RTPReader for GeneratorStream {
154153
/// read a rtp packet
155-
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
156-
let (n, attr) = self.parent_rtp_reader.read(buf, a).await?;
154+
async fn read(
155+
&self,
156+
buf: &mut [u8],
157+
a: &Attributes,
158+
) -> Result<(rtp::packet::Packet, Attributes)> {
159+
let (pkt, attr) = self.parent_rtp_reader.read(buf, a).await?;
157160

158-
let mut b = &buf[..n];
159-
let pkt = rtp::packet::Packet::unmarshal(&mut b)?;
160161
self.add(pkt.header.sequence_number);
161162

162-
Ok((n, attr))
163+
Ok((pkt, attr))
163164
}
164165
}
165166

interceptor/src/nack/responder/mod.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,12 @@ pub struct ResponderRtcpReader {
102102

103103
#[async_trait]
104104
impl RTCPReader for ResponderRtcpReader {
105-
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
106-
let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? };
107-
108-
let mut b = &buf[..n];
109-
let pkts = rtcp::packet::unmarshal(&mut b)?;
105+
async fn read(
106+
&self,
107+
buf: &mut [u8],
108+
a: &Attributes,
109+
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
110+
let (pkts, attr) = { self.parent_rtcp_reader.read(buf, a).await? };
110111
for p in &pkts {
111112
if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
112113
let nack = nack.clone();
@@ -117,7 +118,7 @@ impl RTCPReader for ResponderRtcpReader {
117118
}
118119
}
119120

120-
Ok((n, attr))
121+
Ok((pkts, attr))
121122
}
122123
}
123124

interceptor/src/noop.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,22 @@ impl Interceptor for NoOp {
6060

6161
#[async_trait]
6262
impl RTPReader for NoOp {
63-
async fn read(&self, _buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
64-
Ok((0, a.clone()))
63+
async fn read(
64+
&self,
65+
_buf: &mut [u8],
66+
a: &Attributes,
67+
) -> Result<(rtp::packet::Packet, Attributes)> {
68+
Ok((rtp::packet::Packet::default(), a.clone()))
6569
}
6670
}
6771

6872
#[async_trait]
6973
impl RTCPReader for NoOp {
70-
async fn read(&self, _buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
71-
Ok((0, a.clone()))
74+
async fn read(
75+
&self,
76+
_buf: &mut [u8],
77+
a: &Attributes,
78+
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
79+
Ok((vec![], a.clone()))
7280
}
7381
}

interceptor/src/report/receiver/mod.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ pub(crate) struct ReceiverReportRtcpReader {
2626

2727
#[async_trait]
2828
impl RTCPReader for ReceiverReportRtcpReader {
29-
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
30-
let (n, attr) = self.parent_rtcp_reader.read(buf, a).await?;
31-
32-
let mut b = &buf[..n];
33-
let pkts = rtcp::packet::unmarshal(&mut b)?;
29+
async fn read(
30+
&self,
31+
buf: &mut [u8],
32+
a: &Attributes,
33+
) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
34+
let (pkts, attr) = self.parent_rtcp_reader.read(buf, a).await?;
3435

3536
let now = if let Some(f) = &self.internal.now {
3637
f()
@@ -53,7 +54,7 @@ impl RTCPReader for ReceiverReportRtcpReader {
5354
}
5455
}
5556

56-
Ok((n, attr))
57+
Ok((pkts, attr))
5758
}
5859
}
5960

interceptor/src/report/receiver/receiver_stream.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use crate::{Attributes, RTPReader};
44
use async_trait::async_trait;
55
use std::time::SystemTime;
66
use util::sync::Mutex;
7-
use util::Unmarshal;
87

98
struct ReceiverStreamInternal {
109
ssrc: u32,
@@ -208,18 +207,20 @@ impl ReceiverStream {
208207
#[async_trait]
209208
impl RTPReader for ReceiverStream {
210209
/// read a rtp packet
211-
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
212-
let (n, attr) = self.parent_rtp_reader.read(buf, a).await?;
210+
async fn read(
211+
&self,
212+
buf: &mut [u8],
213+
a: &Attributes,
214+
) -> Result<(rtp::packet::Packet, Attributes)> {
215+
let (pkt, attr) = self.parent_rtp_reader.read(buf, a).await?;
213216

214-
let mut b = &buf[..n];
215-
let pkt = rtp::packet::Packet::unmarshal(&mut b)?;
216217
let now = if let Some(f) = &self.now {
217218
f()
218219
} else {
219220
SystemTime::now()
220221
};
221222
self.process_rtp(now, &pkt);
222223

223-
Ok((n, attr))
224+
Ok((pkt, attr))
224225
}
225226
}

0 commit comments

Comments
 (0)