Skip to content

Commit ef0f6c5

Browse files
committed
change SendMutex to tokio::sync::Mutex
1 parent 3de429e commit ef0f6c5

File tree

6 files changed

+57
-54
lines changed

6 files changed

+57
-54
lines changed

webrtc/src/peer_connection/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ impl RTCPeerConnection {
469469
None => return true, // doesn't contain a single a=msid line
470470
};
471471

472-
let sender = t.sender();
472+
let sender = t.sender().await;
473473
// (...)or the number of MSIDs from the a=msid lines in this m= section,
474474
// or the MSID values themselves, differ from what is in
475475
// transceiver.sender.[[AssociatedMediaStreamIds]], return true.
@@ -1585,7 +1585,7 @@ impl RTCPeerConnection {
15851585
pub(crate) async fn start_rtp_senders(&self) -> Result<()> {
15861586
let current_transceivers = self.internal.rtp_transceivers.lock().await;
15871587
for transceiver in &*current_transceivers {
1588-
let sender = transceiver.sender();
1588+
let sender = transceiver.sender().await;
15891589
if sender.is_negotiated() && !sender.has_sent() {
15901590
sender.send(&sender.get_parameters().await).await?;
15911591
}
@@ -1643,7 +1643,7 @@ impl RTCPeerConnection {
16431643
let mut senders = vec![];
16441644
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
16451645
for transceiver in &*rtp_transceivers {
1646-
let sender = transceiver.sender();
1646+
let sender = transceiver.sender().await;
16471647
senders.push(sender);
16481648
}
16491649
senders
@@ -1654,7 +1654,7 @@ impl RTCPeerConnection {
16541654
let mut receivers = vec![];
16551655
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
16561656
for transceiver in &*rtp_transceivers {
1657-
receivers.push(transceiver.receiver());
1657+
receivers.push(transceiver.receiver().await);
16581658
}
16591659
receivers
16601660
}
@@ -1678,7 +1678,7 @@ impl RTCPeerConnection {
16781678
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
16791679
for t in &*rtp_transceivers {
16801680
if !t.stopped.load(Ordering::SeqCst) && t.kind == track.kind() {
1681-
let sender = t.sender();
1681+
let sender = t.sender().await;
16821682
if sender.track().await.is_none() {
16831683
if let Err(err) = sender.replace_track(Some(track)).await {
16841684
let _ = sender.stop().await;
@@ -1705,7 +1705,7 @@ impl RTCPeerConnection {
17051705
.add_rtp_transceiver(Arc::clone(&transceiver))
17061706
.await;
17071707

1708-
Ok(transceiver.sender())
1708+
Ok(transceiver.sender().await)
17091709
}
17101710

17111711
/// remove_track removes a Track from the PeerConnection
@@ -1718,7 +1718,7 @@ impl RTCPeerConnection {
17181718
{
17191719
let rtp_transceivers = self.internal.rtp_transceivers.lock().await;
17201720
for t in &*rtp_transceivers {
1721-
if t.sender().id == sender.id {
1721+
if t.sender().await.id == sender.id {
17221722
if sender.track().await.is_none() {
17231723
return Ok(());
17241724
}

webrtc/src/peer_connection/peer_connection_internal.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ impl PeerConnectionInternal {
166166
self.undeclared_media_processor();
167167
} else {
168168
for t in &current_transceivers {
169-
let receiver = t.receiver();
169+
let receiver = t.receiver().await;
170170
let tracks = receiver.tracks().await;
171171
if tracks.is_empty() {
172172
continue;
@@ -216,7 +216,7 @@ impl PeerConnectionInternal {
216216
Arc::clone(&self.media_engine),
217217
interceptor,
218218
));
219-
t.set_receiver(receiver);
219+
t.set_receiver(receiver).await;
220220
}
221221
}
222222

@@ -337,7 +337,7 @@ impl PeerConnectionInternal {
337337
for incoming_track in incoming_tracks {
338338
// If we already have a TrackRemote for a given SSRC don't handle it again
339339
for t in local_transceivers {
340-
let receiver = t.receiver();
340+
let receiver = t.receiver().await;
341341
for track in receiver.tracks().await {
342342
for ssrc in &incoming_track.ssrcs {
343343
if *ssrc == track.ssrc() {
@@ -363,7 +363,7 @@ impl PeerConnectionInternal {
363363
continue;
364364
}
365365

366-
let receiver = t.receiver();
366+
let receiver = t.receiver().await;
367367
if receiver.have_received().await {
368368
continue;
369369
}
@@ -666,7 +666,7 @@ impl PeerConnectionInternal {
666666
}
667667

668668
// TODO: This is dubious because of rollbacks.
669-
t.sender().set_negotiated();
669+
t.sender().await.set_negotiated();
670670
media_sections.push(MediaSection {
671671
id: t.mid().unwrap(),
672672
transceivers: vec![Arc::clone(t)],
@@ -755,7 +755,7 @@ impl PeerConnectionInternal {
755755
}
756756

757757
if let Some(t) = find_by_mid(mid_value, &mut local_transceivers).await {
758-
t.sender().set_negotiated();
758+
t.sender().await.set_negotiated();
759759
let media_transceivers = vec![t];
760760

761761
// NB: The below could use `then_some`, but with our current MSRV
@@ -780,7 +780,7 @@ impl PeerConnectionInternal {
780780
// If we are offering also include unmatched local transceivers
781781
if include_unmatched {
782782
for t in &local_transceivers {
783-
t.sender().set_negotiated();
783+
t.sender().await.set_negotiated();
784784
media_sections.push(MediaSection {
785785
id: t.mid().unwrap(),
786786
transceivers: vec![Arc::clone(t)],
@@ -886,7 +886,7 @@ impl PeerConnectionInternal {
886886
)
887887
.await?;
888888

889-
let receiver = t.receiver();
889+
let receiver = t.receiver().await;
890890
PeerConnectionInternal::start_receiver(
891891
self.setting_engine.get_receive_mtu(),
892892
&incoming,
@@ -1004,7 +1004,7 @@ impl PeerConnectionInternal {
10041004
continue;
10051005
}
10061006

1007-
let receiver = t.receiver();
1007+
let receiver = t.receiver().await;
10081008

10091009
if !rsid.is_empty() {
10101010
return receiver
@@ -1206,7 +1206,7 @@ impl PeerConnectionInternal {
12061206
}
12071207
let mut track_infos = vec![];
12081208
for transeiver in transceivers {
1209-
let receiver = transeiver.receiver();
1209+
let receiver = transeiver.receiver().await;
12101210

12111211
if let Some(mid) = transeiver.mid() {
12121212
let tracks = receiver.tracks().await;
@@ -1331,7 +1331,7 @@ impl PeerConnectionInternal {
13311331
}
13321332
let mut track_infos = vec![];
13331333
for transceiver in transceivers {
1334-
let sender = transceiver.sender();
1334+
let sender = transceiver.sender().await;
13351335

13361336
let mid = match transceiver.mid() {
13371337
Some(mid) => mid,

webrtc/src/peer_connection/sdp/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ pub(crate) async fn add_transceiver_sdp(
461461
}
462462
if codecs.is_empty() {
463463
// If we are sender and we have no codecs throw an error early
464-
if t.sender().track().await.is_some() {
464+
if t.sender().await.track().await.is_some() {
465465
return Err(Error::ErrSenderWithNoCodecs);
466466
}
467467

@@ -528,7 +528,7 @@ pub(crate) async fn add_transceiver_sdp(
528528
}
529529

530530
for mt in transceivers {
531-
let sender = mt.sender();
531+
let sender = mt.sender().await;
532532
if let Some(track) = sender.track().await {
533533
media = media.with_media_source(
534534
sender.ssrc,

webrtc/src/peer_connection/sdp/sdp_test.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -642,17 +642,19 @@ async fn test_media_description_fingerprints() -> Result<()> {
642642
"video".to_owned(),
643643
"webrtc-rs".to_owned(),
644644
));
645-
media[i].transceivers[0].set_sender(Arc::new(
646-
RTCRtpSender::new(
647-
api.setting_engine.get_receive_mtu(),
648-
Some(track),
649-
Arc::new(RTCDtlsTransport::default()),
650-
Arc::clone(&api.media_engine),
651-
Arc::clone(&interceptor),
652-
false,
653-
)
654-
.await,
655-
));
645+
media[i].transceivers[0]
646+
.set_sender(Arc::new(
647+
RTCRtpSender::new(
648+
api.setting_engine.get_receive_mtu(),
649+
Some(track),
650+
Arc::new(RTCDtlsTransport::default()),
651+
Arc::clone(&api.media_engine),
652+
Arc::clone(&interceptor),
653+
false,
654+
)
655+
.await,
656+
))
657+
.await;
656658
media[i].transceivers[0].set_direction_internal(RTCRtpTransceiverDirection::Sendonly);
657659
}
658660

webrtc/src/rtp_transceiver/mod.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ pub mod rtp_receiver;
3030
pub mod rtp_sender;
3131
pub mod rtp_transceiver_direction;
3232
pub(crate) mod srtp_writer_future;
33-
use util::sync::Mutex as SyncMutex;
3433

3534
/// SSRC represents a synchronization source
3635
/// A synchronization source is a randomly chosen
@@ -174,9 +173,9 @@ pub type TriggerNegotiationNeededFnOption =
174173

175174
/// RTPTransceiver represents a combination of an RTPSender and an RTPReceiver that share a common mid.
176175
pub struct RTCRtpTransceiver {
177-
mid: OnceCell<String>, //atomic.Value
178-
sender: SyncMutex<Arc<RTCRtpSender>>, //atomic.Value
179-
receiver: SyncMutex<Arc<RTCRtpReceiver>>, //atomic.Value
176+
mid: OnceCell<String>, //atomic.Value
177+
sender: Mutex<Arc<RTCRtpSender>>, //atomic.Value
178+
receiver: Mutex<Arc<RTCRtpReceiver>>, //atomic.Value
180179

181180
direction: AtomicU8, //RTPTransceiverDirection
182181
current_direction: AtomicU8, //RTPTransceiverDirection
@@ -206,8 +205,8 @@ impl RTCRtpTransceiver {
206205

207206
let t = Arc::new(RTCRtpTransceiver {
208207
mid: OnceCell::new(),
209-
sender: SyncMutex::new(sender),
210-
receiver: SyncMutex::new(receiver),
208+
sender: Mutex::new(sender),
209+
receiver: Mutex::new(receiver),
211210

212211
direction: AtomicU8::new(direction as u8),
213212
current_direction: AtomicU8::new(RTCRtpTransceiverDirection::Unspecified as u8),
@@ -218,7 +217,9 @@ impl RTCRtpTransceiver {
218217
media_engine,
219218
trigger_negotiation_needed: Mutex::new(trigger_negotiation_needed),
220219
});
221-
t.sender().set_rtp_transceiver(Some(Arc::downgrade(&t)));
220+
t.sender()
221+
.await
222+
.set_rtp_transceiver(Some(Arc::downgrade(&t)));
222223

223224
t
224225
}
@@ -248,8 +249,8 @@ impl RTCRtpTransceiver {
248249
}
249250

250251
/// sender returns the RTPTransceiver's RTPSender if it has one
251-
pub fn sender(&self) -> Arc<RTCRtpSender> {
252-
let sender = self.sender.lock();
252+
pub async fn sender(&self) -> Arc<RTCRtpSender> {
253+
let sender = self.sender.lock().await;
253254
sender.clone()
254255
}
255256

@@ -259,33 +260,33 @@ impl RTCRtpTransceiver {
259260
sender: Arc<RTCRtpSender>,
260261
track: Option<Arc<dyn TrackLocal + Send + Sync>>,
261262
) -> Result<()> {
262-
self.set_sender(sender);
263+
self.set_sender(sender).await;
263264
self.set_sending_track(track).await
264265
}
265266

266-
pub fn set_sender(self: &Arc<Self>, s: Arc<RTCRtpSender>) {
267+
pub async fn set_sender(self: &Arc<Self>, s: Arc<RTCRtpSender>) {
267268
s.set_rtp_transceiver(Some(Arc::downgrade(self)));
268269

269-
let prev_sender = self.sender();
270+
let prev_sender = self.sender().await;
270271
prev_sender.set_rtp_transceiver(None);
271272

272273
{
273-
let mut sender = self.sender.lock();
274+
let mut sender = self.sender.lock().await;
274275
*sender = s;
275276
}
276277
}
277278

278279
/// receiver returns the RTPTransceiver's RTPReceiver if it has one
279-
pub fn receiver(&self) -> Arc<RTCRtpReceiver> {
280-
let receiver = self.receiver.lock();
280+
pub async fn receiver(&self) -> Arc<RTCRtpReceiver> {
281+
let receiver = self.receiver.lock().await;
281282
receiver.clone()
282283
}
283284

284-
pub(crate) fn set_receiver(&self, r: Arc<RTCRtpReceiver>) {
285+
pub(crate) async fn set_receiver(&self, r: Arc<RTCRtpReceiver>) {
285286
r.set_transceiver_codecs(Some(Arc::clone(&self.codecs)));
286287

287288
{
288-
let mut receiver = self.receiver.lock();
289+
let mut receiver = self.receiver.lock().await;
289290
(*receiver).set_transceiver_codecs(None);
290291

291292
*receiver = r;
@@ -396,7 +397,7 @@ impl RTCRtpTransceiver {
396397
}
397398

398399
{
399-
let receiver = self.receiver.lock().clone();
400+
let receiver = self.receiver.lock().await.clone();
400401
let pause_receiver = !current_direction.has_recv();
401402

402403
if pause_receiver {
@@ -408,7 +409,7 @@ impl RTCRtpTransceiver {
408409

409410
let pause_sender = !current_direction.has_send();
410411
{
411-
let sender = &*self.sender.lock();
412+
let sender = &*self.sender.lock().await;
412413
sender.set_paused(pause_sender);
413414
}
414415

@@ -424,11 +425,11 @@ impl RTCRtpTransceiver {
424425
self.stopped.store(true, Ordering::SeqCst);
425426

426427
{
427-
let sender = self.sender.lock().clone();
428+
let sender = self.sender.lock().await;
428429
sender.stop().await?;
429430
}
430431
{
431-
let r = self.receiver.lock().clone();
432+
let r = self.receiver.lock().await;
432433
r.stop().await?;
433434
}
434435

@@ -443,7 +444,7 @@ impl RTCRtpTransceiver {
443444
) -> Result<()> {
444445
let track_is_none = track.is_none();
445446
{
446-
let sender = self.sender.lock().clone();
447+
let sender = self.sender.lock().await.clone();
447448
sender.replace_track(track).await?;
448449
}
449450

webrtc/src/rtp_transceiver/rtp_sender/rtp_sender_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ async fn test_rtp_sender_get_parameters() -> Result<()> {
133133

134134
signal_pair(&mut offerer, &mut answerer).await?;
135135

136-
let sender = rtp_transceiver.sender();
136+
let sender = rtp_transceiver.sender().await;
137137
let parameters = sender.get_parameters().await;
138138
assert_ne!(0, parameters.rtp_parameters.codecs.len());
139139
assert_eq!(1, parameters.encodings.len());

0 commit comments

Comments
 (0)