Skip to content

Commit 82ed14b

Browse files
committed
update interceptor deps/pointer
1 parent 7047f96 commit 82ed14b

File tree

9 files changed

+292
-14
lines changed

9 files changed

+292
-14
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ turn = "0.5.1"
2020
ice = { package = "webrtc-ice", version = "0.5.1" }
2121
dtls = { package = "webrtc-dtls", version = "0.5.0" }
2222
rtp = "0.6.1"
23-
rtcp = "0.6.0"
24-
srtp = { package = "webrtc-srtp", version = "0.8.1" }
23+
rtcp = "0.6.1"
24+
srtp = { package = "webrtc-srtp", version = "0.8.2" }
2525
sctp = { package = "webrtc-sctp", version = "0.4.1" }
2626
data = { package = "webrtc-data", version = "0.3.1" }
2727
media = { package = "webrtc-media", version = "0.4.0" }
28-
interceptor = "0.6.2"
28+
interceptor = "0.6.3"
2929
tokio = { version = "1.12.0", features = ["full"] }
3030
log = "0.4.14"
3131
async-trait = "0.1.42"

crates/rtcp

crates/srtp

Submodule srtp updated 1 file
Lines changed: 278 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,278 @@
1-
//TODO: add tests
1+
/*TODO:
2+
use super::*;
3+
use crate::api::APIBuilder;
4+
use crate::peer_connection::configuration::RTCConfiguration;
5+
6+
use bytes::Bytes;
7+
use std::future::Future;
8+
use std::pin::Pin;
9+
use std::sync::Arc;
10+
11+
use interceptor::mock::mock_builder::MockBuilder;
12+
use interceptor::mock::mock_interceptor::MockInterceptor;
13+
use interceptor::stream_info::StreamInfo;
14+
use interceptor::{Attributes, Interceptor, RTPWriter, RTPWriterFn};
15+
16+
// E2E test of the features of Interceptors
17+
// * Assert an extension can be set on an outbound packet
18+
// * Assert an extension can be read on an outbound packet
19+
// * Assert that attributes set by an interceptor are returned to the Reader
20+
#[tokio::test]
21+
async fn test_peer_connection_interceptor() -> Result<()> {
22+
let create_pc = || async {
23+
let mut m = MediaEngine::default();
24+
m.register_default_codecs()?;
25+
26+
let mut ir = Registry::new();
27+
28+
let BindLocalStreamFn = |info: &StreamInfo,
29+
writer: Arc<dyn RTPWriter + Send + Sync>|
30+
-> Pin<
31+
Box<dyn Future<Output = Arc<dyn RTPWriter + Send + Sync>> + Send + Sync>,
32+
> {
33+
let writer2 = Arc::clone(&writer);
34+
Box::pin(async move {
35+
Arc::new(RTPWriterFn(Box::new(
36+
move |in_pkt: &rtp::packet::Packet,
37+
attributes: &Attributes|
38+
-> Pin<
39+
Box<
40+
dyn Future<Output = std::result::Result<usize, interceptor::Error>>
41+
+ Send
42+
+ Sync,
43+
>,
44+
> {
45+
let writer3 = Arc::clone(&writer2);
46+
let a = attributes.clone();
47+
// set extension on outgoing packet
48+
let mut out_pkt = in_pkt.clone();
49+
out_pkt.header.extension = true;
50+
out_pkt.header.extension_profile = 0xBEDE;
51+
52+
Box::pin(async move {
53+
out_pkt
54+
.header
55+
.set_extension(2, Bytes::from_static(b"foo"))?;
56+
//writer3.write(&out_pkt, &a).await
57+
Ok(0)
58+
})
59+
},
60+
))) as Arc<dyn RTPWriter + Send + Sync>
61+
})
62+
};
63+
64+
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
65+
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
66+
if a == nil {
67+
a = interceptor.Attributes{}
68+
}
69+
70+
a.Set("attribute", "value")
71+
return reader.Read(b, a)
72+
})
73+
},
74+
let mock_builder = Box::new(MockBuilder {
75+
build:
76+
Box::new(
77+
|_: &str| -> std::result::Result<
78+
Arc<dyn Interceptor + Send + Sync>,
79+
interceptor::Error,
80+
> {
81+
Ok(Arc::new(MockInterceptor {
82+
..Default::default()
83+
}))
84+
},
85+
),
86+
});
87+
let mock_builder = MockBuilder::new(
88+
|_: &str| -> std::result::Result<
89+
Arc<dyn Interceptor + Send + Sync>,
90+
interceptor::Error,
91+
> {
92+
Ok(Arc::new(MockInterceptor {
93+
..Default::default()
94+
}))
95+
},
96+
);
97+
ir.add(Box::new(mock_builder));
98+
99+
let api = APIBuilder::new()
100+
.with_media_engine(m)
101+
.with_interceptor_registry(ir)
102+
.build();
103+
api.new_peer_connection(RTCConfiguration::default()).await
104+
};
105+
106+
let offerer = create_pc().await?;
107+
let answerer = create_pc().await?;
108+
109+
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
110+
assert.NoError(t, err)
111+
112+
_, err = offerer.AddTrack(track)
113+
assert.NoError(t, err)
114+
115+
seenRTP, seenRTPCancel := context.WithCancel(context.Background())
116+
answerer.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
117+
p, attributes, readErr := track.ReadRTP()
118+
assert.NoError(t, readErr)
119+
120+
assert.Equal(t, p.Extension, true)
121+
assert.Equal(t, "foo", string(p.GetExtension(2)))
122+
assert.Equal(t, "value", attributes.Get("attribute"))
123+
124+
seenRTPCancel()
125+
})
126+
127+
assert.NoError(t, signalPair(offerer, answerer))
128+
129+
func() {
130+
ticker := time.NewTicker(time.Millisecond * 20)
131+
for {
132+
select {
133+
case <-seenRTP.Done():
134+
return
135+
case <-ticker.C:
136+
assert.NoError(t, track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}))
137+
}
138+
}
139+
}()
140+
141+
closePairNow(t, offerer, answerer)
142+
143+
Ok(())
144+
}
145+
146+
func Test_Interceptor_BindUnbind(t *testing.T) {
147+
lim := test.TimeOut(time.Second * 10)
148+
defer lim.Stop()
149+
150+
report := test.CheckRoutines(t)
151+
defer report()
152+
153+
m := &MediaEngine{}
154+
assert.NoError(t, m.RegisterDefaultCodecs())
155+
156+
var (
157+
cntBindRTCPReader uint32
158+
cntBindRTCPWriter uint32
159+
cntBindLocalStream uint32
160+
cntUnbindLocalStream uint32
161+
cntBindRemoteStream uint32
162+
cntUnbindRemoteStream uint32
163+
cntClose uint32
164+
)
165+
mockInterceptor := &mock_interceptor.Interceptor{
166+
BindRTCPReaderFn: func(reader interceptor.RTCPReader) interceptor.RTCPReader {
167+
atomic.AddUint32(&cntBindRTCPReader, 1)
168+
return reader
169+
},
170+
BindRTCPWriterFn: func(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
171+
atomic.AddUint32(&cntBindRTCPWriter, 1)
172+
return writer
173+
},
174+
BindLocalStreamFn: func(i *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
175+
atomic.AddUint32(&cntBindLocalStream, 1)
176+
return writer
177+
},
178+
UnbindLocalStreamFn: func(i *interceptor.StreamInfo) {
179+
atomic.AddUint32(&cntUnbindLocalStream, 1)
180+
},
181+
BindRemoteStreamFn: func(i *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
182+
atomic.AddUint32(&cntBindRemoteStream, 1)
183+
return reader
184+
},
185+
UnbindRemoteStreamFn: func(i *interceptor.StreamInfo) {
186+
atomic.AddUint32(&cntUnbindRemoteStream, 1)
187+
},
188+
CloseFn: func() error {
189+
atomic.AddUint32(&cntClose, 1)
190+
return nil
191+
},
192+
}
193+
ir := &interceptor.Registry{}
194+
ir.Add(&mock_interceptor.Factory{
195+
NewInterceptorFn: func(_ string) (interceptor.Interceptor, error) { return mockInterceptor, nil },
196+
})
197+
198+
sender, receiver, err := NewAPI(WithMediaEngine(m), WithInterceptorRegistry(ir)).newPair(Configuration{})
199+
assert.NoError(t, err)
200+
201+
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
202+
assert.NoError(t, err)
203+
204+
_, err = sender.AddTrack(track)
205+
assert.NoError(t, err)
206+
207+
receiverReady, receiverReadyFn := context.WithCancel(context.Background())
208+
receiver.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
209+
_, _, readErr := track.ReadRTP()
210+
assert.NoError(t, readErr)
211+
receiverReadyFn()
212+
})
213+
214+
assert.NoError(t, signalPair(sender, receiver))
215+
216+
ticker := time.NewTicker(time.Millisecond * 20)
217+
defer ticker.Stop()
218+
func() {
219+
for {
220+
select {
221+
case <-receiverReady.Done():
222+
return
223+
case <-ticker.C:
224+
// Send packet to make receiver track actual creates RTPReceiver.
225+
assert.NoError(t, track.WriteSample(media.Sample{Data: []byte{0xAA}, Duration: time.Second}))
226+
}
227+
}
228+
}()
229+
230+
closePairNow(t, sender, receiver)
231+
232+
// Bind/UnbindLocal/RemoteStream should be called from one side.
233+
if cnt := atomic.LoadUint32(&cntBindLocalStream); cnt != 1 {
234+
t.Errorf("BindLocalStreamFn is expected to be called once, but called %d times", cnt)
235+
}
236+
if cnt := atomic.LoadUint32(&cntUnbindLocalStream); cnt != 1 {
237+
t.Errorf("UnbindLocalStreamFn is expected to be called once, but called %d times", cnt)
238+
}
239+
if cnt := atomic.LoadUint32(&cntBindRemoteStream); cnt != 1 {
240+
t.Errorf("BindRemoteStreamFn is expected to be called once, but called %d times", cnt)
241+
}
242+
if cnt := atomic.LoadUint32(&cntUnbindRemoteStream); cnt != 1 {
243+
t.Errorf("UnbindRemoteStreamFn is expected to be called once, but called %d times", cnt)
244+
}
245+
246+
// BindRTCPWriter/Reader and Close should be called from both side.
247+
if cnt := atomic.LoadUint32(&cntBindRTCPWriter); cnt != 2 {
248+
t.Errorf("BindRTCPWriterFn is expected to be called twice, but called %d times", cnt)
249+
}
250+
if cnt := atomic.LoadUint32(&cntBindRTCPReader); cnt != 2 {
251+
t.Errorf("BindRTCPReaderFn is expected to be called twice, but called %d times", cnt)
252+
}
253+
if cnt := atomic.LoadUint32(&cntClose); cnt != 2 {
254+
t.Errorf("CloseFn is expected to be called twice, but called %d times", cnt)
255+
}
256+
}
257+
258+
func Test_InterceptorRegistry_Build(t *testing.T) {
259+
registryBuildCount := 0
260+
261+
ir := &interceptor.Registry{}
262+
ir.Add(&mock_interceptor.Factory{
263+
NewInterceptorFn: func(_ string) (interceptor.Interceptor, error) {
264+
registryBuildCount++
265+
return &interceptor.NoOp{}, nil
266+
},
267+
})
268+
269+
peerConnectionA, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{})
270+
assert.NoError(t, err)
271+
272+
peerConnectionB, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{})
273+
assert.NoError(t, err)
274+
275+
assert.Equal(t, 2, registryBuildCount)
276+
closePairNow(t, peerConnectionA, peerConnectionB)
277+
}
278+
*/

src/dtls_transport/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,12 @@ impl RTCDtlsTransport {
138138
/// packet is discarded.
139139
pub async fn write_rtcp(
140140
&self,
141-
pkt: &(dyn rtcp::packet::Packet + Send + Sync),
141+
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
142142
) -> Result<usize> {
143143
let srtcp_session = self.srtcp_session.lock().await;
144144
if let Some(srtcp_session) = &*srtcp_session {
145-
Ok(srtcp_session.write_rtcp(pkt).await?)
145+
let raw = rtcp::packet::marshal(pkts)?;
146+
Ok(srtcp_session.write(&raw, false).await?)
146147
} else {
147148
Ok(0)
148149
}

src/peer_connection/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1827,10 +1827,10 @@ impl RTCPeerConnection {
18271827
/// packet is discarded. It also runs any configured interceptors.
18281828
pub async fn write_rtcp(
18291829
&self,
1830-
pkt: &(dyn rtcp::packet::Packet + Send + Sync),
1830+
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
18311831
) -> Result<usize> {
18321832
let a = Attributes::new();
1833-
Ok(self.interceptor_rtcp_writer.write(pkt, &a).await?)
1833+
Ok(self.interceptor_rtcp_writer.write(pkts, &a).await?)
18341834
}
18351835

18361836
/// close ends the PeerConnection

src/peer_connection/peer_connection_internal.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,9 +1322,9 @@ type IResult<T> = std::result::Result<T, interceptor::Error>;
13221322
impl RTCPWriter for PeerConnectionInternal {
13231323
async fn write(
13241324
&self,
1325-
pkt: &(dyn rtcp::packet::Packet + Send + Sync),
1325+
pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
13261326
_a: &Attributes,
13271327
) -> IResult<usize> {
1328-
Ok(self.dtls_transport.write_rtcp(pkt).await?)
1328+
Ok(self.dtls_transport.write_rtcp(pkts).await?)
13291329
}
13301330
}

0 commit comments

Comments
 (0)