Skip to content

Commit 6638d86

Browse files
committed
refactor runtime::channel from unbounded to bounded channel
1 parent 0831e33 commit 6638d86

20 files changed

+89
-81
lines changed

examples/data-channels-close/data-channels-close.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ async fn async_main() -> anyhow::Result<()> {
165165

166166
// Everything below is the WebRTC-rs API! Thanks for using it ❤️.
167167

168-
let (done_tx, mut done_rx) = channel::<()>();
169-
let (gather_complete_tx, mut gather_complete_rx) = channel();
170-
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>();
168+
let (done_tx, mut done_rx) = channel::<()>(1);
169+
let (gather_complete_tx, mut gather_complete_rx) = channel(1);
170+
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>(1);
171171
ctrlc::set_handler(move || {
172172
let _ = ctrlc_tx.try_send(());
173173
})?;

examples/data-channels-create/data-channels-create.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ async fn async_main() -> anyhow::Result<()> {
9999

100100
// Everything below is the WebRTC-rs API! Thanks for using it ❤️.
101101

102-
let (done_tx, mut done_rx) = channel::<()>();
103-
let (gather_complete_tx, mut gather_complete_rx) = channel();
104-
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>();
102+
let (done_tx, mut done_rx) = channel::<()>(1);
103+
let (gather_complete_tx, mut gather_complete_rx) = channel(1);
104+
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>(1);
105105
ctrlc::set_handler(move || {
106106
let _ = ctrlc_tx.try_send(());
107107
})?;

examples/data-channels-flow-control/data-channels-flow-control.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ fn main() -> anyhow::Result<()> {
166166
}
167167

168168
async fn async_main() -> anyhow::Result<()> {
169-
let (done_tx, mut done_rx) = channel::<()>();
170-
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>();
169+
let (done_tx, mut done_rx) = channel::<()>(1);
170+
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>(1);
171171
ctrlc::set_handler(move || {
172172
let _ = ctrlc_tx.try_send(());
173173
})?;
@@ -176,7 +176,7 @@ async fn async_main() -> anyhow::Result<()> {
176176
default_runtime().ok_or_else(|| std::io::Error::other("no async runtime found"))?;
177177

178178
// ── Build requester peer connection ──────────────────────────────────────
179-
let (req_gather_tx, mut req_gather_rx) = channel::<()>();
179+
let (req_gather_tx, mut req_gather_rx) = channel::<()>(1);
180180
let mut req_media = MediaEngine::default();
181181
req_media.register_default_codecs()?;
182182
let req_registry = register_default_interceptors(Registry::new(), &mut req_media)?;
@@ -283,7 +283,7 @@ async fn async_main() -> anyhow::Result<()> {
283283
.ok_or_else(|| anyhow::anyhow!("requester has no local description"))?;
284284

285285
// ── Build responder peer connection ──────────────────────────────────────
286-
let (resp_gather_tx, mut resp_gather_rx) = channel::<()>();
286+
let (resp_gather_tx, mut resp_gather_rx) = channel::<()>(1);
287287
let mut resp_media = MediaEngine::default();
288288
resp_media.register_default_codecs()?;
289289
let resp_registry = register_default_interceptors(Registry::new(), &mut resp_media)?;

examples/data-channels-offer-answer/data-channels-answer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,15 @@ fn main() -> Result<()> {
135135
}
136136

137137
async fn async_main(cli: Cli) -> Result<()> {
138-
let (done_tx, mut done_rx) = channel::<()>();
139-
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>();
138+
let (done_tx, mut done_rx) = channel::<()>(1);
139+
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>(1);
140140
ctrlc::set_handler(move || {
141141
let _ = ctrlc_tx.try_send(());
142142
})?;
143143

144144
let runtime = default_runtime().ok_or_else(|| anyhow::anyhow!("no async runtime found"))?;
145145

146-
let (gather_tx, mut gather_rx) = channel::<()>();
146+
let (gather_tx, mut gather_rx) = channel::<()>(1);
147147

148148
let mut media = MediaEngine::default();
149149
media.register_default_codecs()?;

examples/data-channels-offer-answer/data-channels-offer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,15 @@ fn main() -> Result<()> {
7979
}
8080

8181
async fn async_main(cli: Cli) -> Result<()> {
82-
let (done_tx, mut done_rx) = channel::<()>();
83-
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>();
82+
let (done_tx, mut done_rx) = channel::<()>(1);
83+
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>(1);
8484
ctrlc::set_handler(move || {
8585
let _ = ctrlc_tx.try_send(());
8686
})?;
8787

8888
let runtime = default_runtime().ok_or_else(|| anyhow::anyhow!("no async runtime found"))?;
8989

90-
let (gather_tx, mut gather_rx) = channel::<()>();
90+
let (gather_tx, mut gather_rx) = channel::<()>(1);
9191

9292
let mut media = MediaEngine::default();
9393
media.register_default_codecs()?;

examples/data-channels-simple/data-channels-simple.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ async fn async_main() -> Result<()> {
9797
let (offer_tx, mut offer_rx) = channel::<(
9898
RTCSessionDescription,
9999
Sender<Result<RTCSessionDescription, String>>,
100-
)>();
101-
let (candidate_tx, mut candidate_rx) = channel::<RTCIceCandidateInit>();
100+
)>(8);
101+
let (candidate_tx, mut candidate_rx) = channel::<RTCIceCandidateInit>(8);
102102

103103
let state = Arc::new(AppState {
104104
offer_tx,
@@ -132,7 +132,7 @@ async fn async_main() -> Result<()> {
132132
let Some((offer, response_tx)) = msg else { break };
133133
info!("Received offer from browser");
134134

135-
let (gather_tx, mut gather_rx) = channel::<()>();
135+
let (gather_tx, mut gather_rx) = channel::<()>(1);
136136
let handler = Arc::new(Handler {
137137
gather_complete_tx: gather_tx,
138138
runtime: runtime.clone(),
@@ -233,7 +233,8 @@ async fn handle_request(
233233
}
234234
};
235235

236-
let (response_tx, mut response_rx) = channel::<Result<RTCSessionDescription, String>>();
236+
let (response_tx, mut response_rx) =
237+
channel::<Result<RTCSessionDescription, String>>(8);
237238

238239
if state.offer_tx.try_send((offer, response_tx)).is_err() {
239240
return Ok(Response::builder()

examples/data-channels/data-channels.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,9 @@ async fn async_main() -> anyhow::Result<()> {
148148

149149
// Everything below is the WebRTC-rs API! Thanks for using it ❤️.
150150

151-
let (done_tx, mut done_rx) = channel::<()>();
152-
let (gather_complete_tx, mut gather_complete_rx) = channel();
153-
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>();
151+
let (done_tx, mut done_rx) = channel::<()>(1);
152+
let (gather_complete_tx, mut gather_complete_rx) = channel(1);
153+
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>(1);
154154
ctrlc::set_handler(move || {
155155
let _ = ctrlc_tx.try_send(());
156156
})?;

examples/ice-restart/ice-restart.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ async fn remote_handler(
139139
};
140140

141141
// Register a fresh gather channel for this signaling exchange.
142-
let (gather_tx, mut gather_rx) = channel::<()>();
142+
let (gather_tx, mut gather_rx) = channel::<()>(1);
143143
shared.lock().await.gather_tx = Some(gather_tx);
144144

145145
if let Err(e) = pc.set_remote_description(offer).await {
@@ -223,7 +223,7 @@ fn main() -> Result<()> {
223223
}
224224

225225
async fn async_main(_cli: Cli) -> Result<()> {
226-
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>();
226+
let (ctrlc_tx, mut ctrlc_rx) = channel::<()>(1);
227227
ctrlc::set_handler(move || {
228228
let _ = ctrlc_tx.try_send(());
229229
})?;

src/peer_connection.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use crate::data_channel::{DataChannel, DataChannelEvent, DataChannelImpl};
44
use crate::ice_gatherer::RTCIceGatherOptions;
55
use crate::ice_gatherer::RTCIceGatherer;
66
use crate::media_stream::{TrackLocal, TrackRemote};
7-
use crate::peer_connection_driver::PeerConnectionDriver;
7+
use crate::peer_connection_driver::{
8+
DATA_CHANNEL_EVENT_CHANNEL_CAPACITY, MSG_CHANNEL_CAPACITY, PeerConnectionDriver,
9+
};
810
use crate::rtp_transceiver::{RtpReceiver, RtpSender, RtpTransceiver};
911
use crate::runtime::{JoinHandle, Runtime, default_runtime};
1012
use crate::runtime::{Mutex, Sender, channel};
@@ -97,12 +99,6 @@ pub trait PeerConnectionEventHandler: Send + Sync + 'static {
9799
/// Unified inner message type for the peer connection driver
98100
#[derive(Debug)]
99101
pub(crate) enum MessageInner {
100-
// Outgoing RTP packet from local track
101-
//SenderRtp(RTCRtpSenderId, rtc::rtp::Packet),
102-
// Outgoing RTCP packets from sender
103-
//SenderRtcp(RTCRtpSenderId, Vec<Box<dyn rtc::rtcp::Packet>>),
104-
// Outgoing RTCP packets from receiver
105-
//ReceiverRtcp(RTCRtpReceiverId, Vec<Box<dyn rtc::rtcp::Packet>>),
106102
WriteNotify,
107103
IceGathering,
108104
Close,
@@ -371,7 +367,7 @@ where
371367
}
372368
}
373369

374-
let (msg_tx, msg_rx) = channel();
370+
let (msg_tx, msg_rx) = channel(MSG_CHANNEL_CAPACITY);
375371
let peer_connection = Self {
376372
inner: Arc::new(PeerConnectionRef {
377373
core: Mutex::new(core),
@@ -550,7 +546,7 @@ where
550546
rtc_dc.id()
551547
};
552548

553-
let (evt_tx, evt_rx) = channel();
549+
let (evt_tx, evt_rx) = channel(DATA_CHANNEL_EVENT_CHANNEL_CAPACITY);
554550
{
555551
let mut data_channels = self.inner.data_channels.lock().await;
556552
data_channels.insert(channel_id, evt_tx);

src/peer_connection_driver.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ use std::net::SocketAddr;
2727
use std::sync::Arc;
2828
use std::time::{Duration, Instant};
2929

30+
/// Capacity of the internal driver message channel (WriteNotify, IceGathering, Close, …).
31+
pub(crate) const MSG_CHANNEL_CAPACITY: usize = 64;
32+
33+
/// Capacity of each data-channel event channel (OnOpen, OnMessage, OnClose, …).
34+
pub(crate) const DATA_CHANNEL_EVENT_CHANNEL_CAPACITY: usize = 256;
35+
36+
/// Capacity of each track-remote event channel (OnMute, OnUnmute, OnEnded, OnRtpPacket, OnRtcpPacket, …).
37+
pub(crate) const TRACK_REMOTE_EVENT_CHANNEL_CAPACITY: usize = 256;
38+
3039
const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(86400); // 1 day duration
3140

3241
/// The driver for a peer connection
@@ -325,7 +334,7 @@ where
325334
{
326335
let mut data_channels = self.inner.data_channels.lock().await;
327336
if let Entry::Vacant(e) = data_channels.entry(channel_id) {
328-
let (evt_tx, evt_rx) = channel();
337+
let (evt_tx, evt_rx) = channel(DATA_CHANNEL_EVENT_CHANNEL_CAPACITY);
329338
e.insert(evt_tx);
330339

331340
// Create our async wrapper

0 commit comments

Comments
 (0)