Skip to content

Commit cca71f2

Browse files
committed
improve bounded channel try_send error handling
1 parent 6638d86 commit cca71f2

File tree

3 files changed

+56
-36
lines changed

3 files changed

+56
-36
lines changed

src/data_channel.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,7 @@ where
233233
self.inner
234234
.msg_tx
235235
.try_send(MessageInner::WriteNotify)
236-
.map_err(|e| Error::Other(format!("{:?}", e)))?;
237-
238-
Ok(())
236+
.map_err(|e| Error::Other(format!("{:?}", e)))
239237
}
240238

241239
/// buffered_amount_low_threshold represents the threshold at which the
@@ -266,9 +264,7 @@ where
266264
self.inner
267265
.msg_tx
268266
.try_send(MessageInner::WriteNotify)
269-
.map_err(|e| Error::Other(format!("{:?}", e)))?;
270-
271-
Ok(())
267+
.map_err(|e| Error::Other(format!("{:?}", e)))
272268
}
273269

274270
/// Send binary data
@@ -296,9 +292,10 @@ where
296292

297293
// Wake the driver so it flushes SCTP output (poll_write) and checks
298294
// for newly generated events (e.g. OnBufferedAmountHigh).
299-
let _ = self.inner.msg_tx.try_send(MessageInner::WriteNotify);
300-
301-
Ok(())
295+
self.inner
296+
.msg_tx
297+
.try_send(MessageInner::WriteNotify)
298+
.map_err(|e| Error::Other(format!("{:?}", e)))
302299
}
303300

304301
/// Send text data
@@ -323,9 +320,10 @@ where
323320
.send_text(text)?;
324321
}
325322

326-
let _ = self.inner.msg_tx.try_send(MessageInner::WriteNotify);
327-
328-
Ok(())
323+
self.inner
324+
.msg_tx
325+
.try_send(MessageInner::WriteNotify)
326+
.map_err(|e| Error::Other(format!("{:?}", e)))
329327
}
330328

331329
async fn poll(&self) -> Option<DataChannelEvent> {
@@ -344,8 +342,6 @@ where
344342
self.inner
345343
.msg_tx
346344
.try_send(MessageInner::WriteNotify)
347-
.map_err(|e| Error::Other(format!("{:?}", e)))?;
348-
349-
Ok(())
345+
.map_err(|e| Error::Other(format!("{:?}", e)))
350346
}
351347
}

src/peer_connection.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::ice_gatherer::RTCIceGatherOptions;
55
use crate::ice_gatherer::RTCIceGatherer;
66
use crate::media_stream::{TrackLocal, TrackRemote};
77
use crate::peer_connection_driver::{
8-
DATA_CHANNEL_EVENT_CHANNEL_CAPACITY, MSG_CHANNEL_CAPACITY, PeerConnectionDriver,
8+
DATA_CHANNEL_EVENT_CHANNEL_CAPACITY, MESSAGE_INNER_CHANNEL_CAPACITY, PeerConnectionDriver,
99
};
1010
use crate::rtp_transceiver::{RtpReceiver, RtpSender, RtpTransceiver};
1111
use crate::runtime::{JoinHandle, Runtime, default_runtime};
@@ -367,7 +367,7 @@ where
367367
}
368368
}
369369

370-
let (msg_tx, msg_rx) = channel(MSG_CHANNEL_CAPACITY);
370+
let (msg_tx, msg_rx) = channel(MESSAGE_INNER_CHANNEL_CAPACITY);
371371
let peer_connection = Self {
372372
inner: Arc::new(PeerConnectionRef {
373373
core: Mutex::new(core),
@@ -451,8 +451,7 @@ where
451451
.msg_tx
452452
.send(MessageInner::IceGathering)
453453
.await
454-
.map_err(|e| Error::Other(format!("{:?}", e)))?;
455-
Ok(())
454+
.map_err(|e| Error::Other(format!("{:?}", e)))
456455
}
457456

458457
async fn local_description(&self) -> Option<RTCSessionDescription> {
@@ -485,8 +484,10 @@ where
485484
// internally, which arms the ICE connectivity-check timer. Without this
486485
// notify the driver would sleep until its previous (possibly 1-day default)
487486
// timer expired and never send the initial STUN binding requests.
488-
let _ = self.inner.msg_tx.try_send(MessageInner::WriteNotify);
489-
Ok(())
487+
self.inner
488+
.msg_tx
489+
.try_send(MessageInner::WriteNotify)
490+
.map_err(|e| Error::Other(format!("{:?}", e)))
490491
}
491492

492493
async fn remote_description(&self) -> Option<RTCSessionDescription> {
@@ -520,8 +521,7 @@ where
520521
.msg_tx
521522
.send(MessageInner::IceGathering)
522523
.await
523-
.map_err(|e| Error::Other(format!("{:?}", e)))?;
524-
Ok(())
524+
.map_err(|e| Error::Other(format!("{:?}", e)))
525525
}
526526

527527
async fn get_configuration(&self) -> RTCConfiguration {

src/peer_connection_driver.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use crate::data_channel::{DataChannel, DataChannelEvent, DataChannelImpl};
88
use crate::ice_gatherer::{RTCIceGatherer, RTCIceGathererEvent};
99
use crate::peer_connection::MessageInner;
1010
use crate::peer_connection::PeerConnectionRef;
11-
use crate::runtime::{AsyncUdpSocket, Receiver, channel};
11+
use crate::runtime::{AsyncUdpSocket, Receiver, TrySendError, channel};
1212
use bytes::BytesMut;
1313
use futures::FutureExt; // For .fuse() in futures::select!
1414
use futures::stream::{FuturesUnordered, StreamExt};
15-
use log::{error, trace, warn};
15+
use log::{error, trace};
1616
use rtc::interceptor::{Interceptor, NoopInterceptor};
1717
use rtc::peer_connection::event::{RTCDataChannelEvent, RTCPeerConnectionEvent};
1818
use rtc::peer_connection::message::RTCMessage;
@@ -28,7 +28,7 @@ use std::sync::Arc;
2828
use std::time::{Duration, Instant};
2929

3030
/// Capacity of the internal driver message channel (WriteNotify, IceGathering, Close, …).
31-
pub(crate) const MSG_CHANNEL_CAPACITY: usize = 64;
31+
pub(crate) const MESSAGE_INNER_CHANNEL_CAPACITY: usize = 64;
3232

3333
/// Capacity of each data-channel event channel (OnOpen, OnMessage, OnClose, …).
3434
pub(crate) const DATA_CHANNEL_EVENT_CHANNEL_CAPACITY: usize = 256;
@@ -348,26 +348,38 @@ where
348348
if let Some(evt_tx) = data_channels.get(&channel_id) {
349349
let result = match evt {
350350
RTCDataChannelEvent::OnOpen(_) => {
351-
evt_tx.send(DataChannelEvent::OnOpen).await
351+
evt_tx.try_send(DataChannelEvent::OnOpen)
352352
}
353353
RTCDataChannelEvent::OnError(_) => {
354-
evt_tx.send(DataChannelEvent::OnError).await
354+
evt_tx.try_send(DataChannelEvent::OnError)
355355
}
356356
RTCDataChannelEvent::OnClosing(_) => {
357-
evt_tx.send(DataChannelEvent::OnClosing).await
357+
evt_tx.try_send(DataChannelEvent::OnClosing)
358358
}
359359
RTCDataChannelEvent::OnClose(_) => {
360-
evt_tx.send(DataChannelEvent::OnClose).await
360+
evt_tx.try_send(DataChannelEvent::OnClose)
361361
}
362362
RTCDataChannelEvent::OnBufferedAmountLow(_) => {
363-
evt_tx.send(DataChannelEvent::OnBufferedAmountLow).await
363+
evt_tx.try_send(DataChannelEvent::OnBufferedAmountLow)
364364
}
365365
RTCDataChannelEvent::OnBufferedAmountHigh(_) => {
366-
evt_tx.send(DataChannelEvent::OnBufferedAmountHigh).await
366+
evt_tx.try_send(DataChannelEvent::OnBufferedAmountHigh)
367367
}
368368
};
369369
if let Err(err) = result {
370-
warn!("Failed to send to data channel {}: {:?}", channel_id, err);
370+
match err {
371+
TrySendError::Full(_) => error!(
372+
"Failed to send to data channel {} due to channel is full, {:?} is dropped",
373+
channel_id, evt
374+
),
375+
TrySendError::Disconnected(_) => {
376+
error!(
377+
"Failed to send to data channel {} due to channel is disconnected, {:?} is dropped, data channel {} got removed",
378+
channel_id, evt, channel_id
379+
);
380+
data_channels.remove(&channel_id);
381+
}
382+
}
371383
}
372384
}
373385
}
@@ -385,10 +397,22 @@ where
385397
async fn handle_rtc_message(&mut self, message: RTCMessage) {
386398
match message {
387399
RTCMessage::DataChannelMessage(channel_id, dc_message) => {
388-
let data_channels = self.inner.data_channels.lock().await;
400+
let mut data_channels = self.inner.data_channels.lock().await;
389401
if let Some(evt_tx) = data_channels.get(&channel_id) {
390-
if let Err(err) = evt_tx.send(DataChannelEvent::OnMessage(dc_message)).await {
391-
warn!("Failed to send to data channel {}: {:?}", channel_id, err);
402+
if let Err(err) = evt_tx.try_send(DataChannelEvent::OnMessage(dc_message)) {
403+
match err {
404+
TrySendError::Full(_) => error!(
405+
"Failed to send to data channel {} due to channel is full, DataChannelMessage is dropped",
406+
channel_id
407+
),
408+
TrySendError::Disconnected(_) => {
409+
error!(
410+
"Failed to send to data channel {} due to channel is disconnected, DataChannelMessage is dropped, data channel {} got removed",
411+
channel_id, channel_id
412+
);
413+
data_channels.remove(&channel_id);
414+
}
415+
}
392416
}
393417
}
394418
}

0 commit comments

Comments
 (0)