Skip to content

Commit 5b79f08

Browse files
authored
remove unnecessary async in public API (#338)
> mind if i create a PR that makes all callback setters (e.g. RTCPeerConnection::on_ice_candidate()) non async? Currently callbacks are stored behind tokio::Mutex'es, i believe that ArcSwap can be used here, so we can use atomic load/store. It wont affect performance in any way, just some quality of life improvements. > All good with me. I'd like to get rid of tokio::Mutex, replacing it with sync variants, where that's possible So i've refactored all public callback setters here. Regarding removing `tokio::Mutex`es that you've mentioned, well, i've replaced some of those, but i don't want this PR to become even bigger, and it would be nice if you provide some feedback first. I've used both `ArcSwap` and `std::Mutex`, i guess its ok to use blocking mutex in simple cases, when you don't need to hold it, e.g. copy, store, swap, take one-liners. I will prepare follow-up PRs on this issue if this PR looks good to you. And one more thing regarding the callbacks, maybe it makes sense to make them just dumb `Box<dyn Fn(_)>`? So `Box<dyn (FnMut(ConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync>` => `Box<dyn (Fn(ConnectionState)) + Send + Sync>`. Yeah, that might hurt users in some rare cases, but it would make everything simpler. Been using this to play around with detecting deadlocks, didn't find anything though. ```diff diff --git a/util/src/sync/mod.rs b/util/src/sync/mod.rs index 6a10129..d5eb835d 100644 --- a/util/src/sync/mod.rs +++ b/util/src/sync/mod.rs @@ -1,4 +1,150 @@ -use std::{ops, sync}; +use std::backtrace::Backtrace; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use std::{fmt, ops, sync, thread}; + +const MAX_AGE: Duration = Duration::from_millis(5000); +const CHECK_INTERVAL: Duration = Duration::from_millis(20); +static DEADLOCK_THREAD_STARTED: AtomicBool = AtomicBool::new(false); +lazy_static! { + static ref TRACKED_LOCKS: Arc<sync::RwLock<Vec<TrackedLock>>> = + Arc::new(sync::RwLock::new(Vec::new())); +} + +struct TrackedLock { + backtrace: Backtrace, + tracker: AliveTracker, + typ: Type, + reported: AtomicBool, +} + +impl TrackedLock { + fn mark_reported(&self) -> bool { + self.reported + .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire) + .is_ok() + } +} + +#[derive(Debug)] +enum Type { + MutexLock, + RwReadLock, + RwWriteLock, +} + +impl fmt::Display for Type { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Type::MutexLock => write!(f, "MutexLock"), + Type::RwReadLock => write!(f, "RwReadLock"), + Type::RwWriteLock => write!(f, "RwWriteLock"), + } + } +} + +struct Alive { + live: Arc<AtomicBool>, + born_at: Instant, +} + +impl Alive { + fn new() -> Self { + Self { + live: Arc::new(AtomicBool::new(true)), + born_at: Instant::now(), + } + } + + fn track(&self) -> AliveTracker { + AliveTracker { + live: self.live.clone(), + born_at: self.born_at.clone(), + } + } + + fn mark_dead(&self) { + self.live.store(false, Ordering::Release); + } +} + +impl Drop for Alive { + fn drop(&mut self) { + self.mark_dead(); + } +} + +struct AliveTracker { + live: Arc<AtomicBool>, + born_at: Instant, +} + +impl AliveTracker { + fn is_dead(&self) -> bool { + !self.live.load(Ordering::Acquire) + } + + fn age(&self) -> Option<Duration> { + if self.is_dead() { + None + } else { + Some(Instant::now() - self.born_at) + } + } +} + +fn start_deadlock_thread() { + if DEADLOCK_THREAD_STARTED + .compare_exchange(false, true, Ordering::Acquire, Ordering::Acquire) + .is_err() + { + return; + } + + thread::spawn(|| loop { + let lock = TRACKED_LOCKS.read().unwrap(); + let mut needs_cleanup = false; + + for tracked in lock.iter() { + let age = tracked.tracker.age(); + match age { + Some(age) if age > MAX_AGE => { + if tracked.mark_reported() { + println!( + "Potential deadlock on {}. Backtrace:\n{}Held for: {}s", + tracked.typ, + tracked.backtrace, + age.as_secs_f64() + ); + } + } + None => { + // Lock is dead + needs_cleanup = true; + } + _ => {} + } + } + drop(lock); + + if needs_cleanup { + let mut write = TRACKED_LOCKS.write().unwrap(); + write.retain(|tracked| !tracked.tracker.is_dead()); + } + }); +} + +fn track(backtrace: Backtrace, alive: &Alive, typ: Type) { + start_deadlock_thread(); + let mut write = TRACKED_LOCKS.write().unwrap(); + write.push(TrackedLock { + backtrace, + tracker: alive.track(), + typ, + reported: AtomicBool::new(false), + }); +} /// A synchronous mutual exclusion primitive useful for protecting shared data. #[derive(Default, Debug)] @@ -13,8 +159,10 @@ impl<T> Mutex<T> { /// Acquires a mutex, blocking the current thread until it is able to do so. pub fn lock(&self) -> MutexGuard<'_, T> { let guard = self.0.lock().unwrap(); + let alive = Alive::new(); + track(Backtrace::capture(), &alive, Type::MutexLock); - MutexGuard(guard) + MutexGuard(guard, alive) } /// Consumes this mutex, returning the underlying data. @@ -25,7 +173,7 @@ impl<T> Mutex<T> { /// An RAII implementation of a "scoped lock" of a mutex. When this structure is /// dropped (falls out of scope), the lock will be unlocked. -pub struct MutexGuard<'a, T>(sync::MutexGuard<'a, T>); +pub struct MutexGuard<'a, T>(sync::MutexGuard<'a, T>, Alive); impl<'a, T> ops::Deref for MutexGuard<'a, T> { type Target = T; @@ -55,22 +203,26 @@ impl<T> RwLock<T> { /// until it can be acquired. pub fn read(&self) -> RwLockReadGuard<'_, T> { let guard = self.0.read().unwrap(); + let alive = Alive::new(); + track(Backtrace::capture(), &alive, Type::MutexLock); - RwLockReadGuard(guard) + RwLockReadGuard(guard, alive) } /// Locks this rwlock with exclusive write access, blocking the current /// thread until it can be acquired. pub fn write(&self) -> RwLockWriteGuard<'_, T> { let guard = self.0.write().unwrap(); + let alive = Alive::new(); + track(Backtrace::capture(), &alive, Type::MutexLock); - RwLockWriteGuard(guard) + RwLockWriteGuard(guard, alive) } } /// RAII structure used to release the shared read access of a lock when /// dropped. -pub struct RwLockReadGuard<'a, T>(sync::RwLockReadGuard<'a, T>); +pub struct RwLockReadGuard<'a, T>(sync::RwLockReadGuard<'a, T>, Alive); impl<'a, T> ops::Deref for RwLockReadGuard<'a, T> { type Target = T; @@ -82,7 +234,7 @@ impl<'a, T> ops::Deref for RwLockReadGuard<'a, T> { /// RAII structure used to release the shared read access of a lock when /// dropped. -pub struct RwLockWriteGuard<'a, T>(sync::RwLockWriteGuard<'a, T>); +pub struct RwLockWriteGuard<'a, T>(sync::RwLockWriteGuard<'a, T>, Alive); impl<'a, T> ops::Deref for RwLockWriteGuard<'a, T> { type Target = T; ```
1 parent 242db9d commit 5b79f08

File tree

116 files changed

+2451
-2589
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

116 files changed

+2451
-2589
lines changed

data/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## Unreleased
44

5+
### Breaking changes
6+
7+
* Make `DataChannel::on_buffered_amount_low` function non-async [#338](https://github.com/webrtc-rs/webrtc/pull/338).
8+
59
## v0.5.1
610

711
* Increased minimum support rust version to `1.60.0`.

data/src/data_channel/data_channel_test.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ async fn test_data_channel_channel_type_reliable_ordered() -> Result<()> {
282282
assert_eq!(dc0.config, cfg, "local config should match");
283283
assert_eq!(dc1.config, cfg, "remote config should match");
284284

285-
br.reorder_next_nwrites(0, 2).await; // reordering on the wire
285+
br.reorder_next_nwrites(0, 2); // reordering on the wire
286286

287287
sbuf[0..4].copy_from_slice(&1u32.to_be_bytes());
288288
let n = dc0.write(&Bytes::from(sbuf.clone())).await?;
@@ -475,8 +475,7 @@ async fn test_data_channel_buffered_amount() -> Result<()> {
475475
dc0.on_buffered_amount_low(Box::new(move || {
476476
n_cbs2.fetch_add(1, Ordering::SeqCst);
477477
Box::pin(async {})
478-
}))
479-
.await;
478+
}));
480479

481480
// Write 10 1000-byte packets (total 10,000 bytes)
482481
for i in 0..10 {

data/src/data_channel/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,8 +345,8 @@ impl DataChannel {
345345

346346
/// OnBufferedAmountLow sets the callback handler which would be called when the
347347
/// number of bytes of outgoing data buffered is lower than the threshold.
348-
pub async fn on_buffered_amount_low(&self, f: OnBufferedAmountLowFn) {
349-
self.stream.on_buffered_amount_low(f).await
348+
pub fn on_buffered_amount_low(&self, f: OnBufferedAmountLowFn) {
349+
self.stream.on_buffered_amount_low(f)
350350
}
351351

352352
fn commit_reliability_params(&self) {

dtls/examples/hub/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ impl Hub {
2929

3030
/// register adds a new conn to the Hub
3131
pub async fn register(&self, conn: Arc<dyn Conn + Send + Sync>) {
32-
println!("Connected to {}", conn.remote_addr().await.unwrap());
32+
println!("Connected to {}", conn.remote_addr().unwrap());
3333

34-
if let Some(remote_addr) = conn.remote_addr().await {
34+
if let Some(remote_addr) = conn.remote_addr() {
3535
let mut conns = self.conns.lock().await;
3636
conns.insert(remote_addr.to_string(), Arc::clone(&conn));
3737
}
@@ -60,7 +60,7 @@ impl Hub {
6060
conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>,
6161
conn: Arc<dyn Conn + Send + Sync>,
6262
) -> Result<(), Error> {
63-
if let Some(remote_addr) = conn.remote_addr().await {
63+
if let Some(remote_addr) = conn.remote_addr() {
6464
{
6565
let mut cs = conns.lock().await;
6666
cs.remove(&remote_addr.to_string());
@@ -82,7 +82,7 @@ impl Hub {
8282
if let Err(err) = conn.send(msg).await {
8383
println!(
8484
"Failed to write message to {:?}: {}",
85-
conn.remote_addr().await,
85+
conn.remote_addr(),
8686
err
8787
);
8888
}

dtls/src/conn/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl Conn for DTLSConn {
114114
self.read(buf, None).await.map_err(util::Error::from_std)
115115
}
116116
async fn recv_from(&self, buf: &mut [u8]) -> UtilResult<(usize, SocketAddr)> {
117-
if let Some(raddr) = self.conn.remote_addr().await {
117+
if let Some(raddr) = self.conn.remote_addr() {
118118
let n = self.read(buf, None).await.map_err(util::Error::from_std)?;
119119
Ok((n, raddr))
120120
} else {
@@ -129,11 +129,11 @@ impl Conn for DTLSConn {
129129
async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> UtilResult<usize> {
130130
Err(util::Error::Other("Not applicable".to_owned()))
131131
}
132-
async fn local_addr(&self) -> UtilResult<SocketAddr> {
133-
self.conn.local_addr().await
132+
fn local_addr(&self) -> UtilResult<SocketAddr> {
133+
self.conn.local_addr()
134134
}
135-
async fn remote_addr(&self) -> Option<SocketAddr> {
136-
self.conn.remote_addr().await
135+
fn remote_addr(&self) -> Option<SocketAddr> {
136+
self.conn.remote_addr()
137137
}
138138
async fn close(&self) -> UtilResult<()> {
139139
self.close().await.map_err(util::Error::from_std)
@@ -191,7 +191,7 @@ impl DTLSConn {
191191

192192
// Use host from conn address when server_name is not provided
193193
if is_client && server_name.is_empty() {
194-
if let Some(remote_addr) = conn.remote_addr().await {
194+
if let Some(remote_addr) = conn.remote_addr() {
195195
server_name = remote_addr.ip().to_string();
196196
} else {
197197
log::warn!("conn.remote_addr is empty, please set explicitly server_name in Config! Use default \"localhost\" as server_name now");

examples/examples/broadcast/broadcast.rs

Lines changed: 58 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -127,72 +127,68 @@ async fn main() -> Result<()> {
127127
// Set a handler for when a new remote track starts, this handler copies inbound RTP packets,
128128
// replaces the SSRC and sends them back
129129
let pc = Arc::downgrade(&peer_connection);
130-
peer_connection
131-
.on_track(Box::new(
132-
move |track: Option<Arc<TrackRemote>>, _receiver: Option<Arc<RTCRtpReceiver>>| {
133-
if let Some(track) = track {
134-
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
135-
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
136-
let media_ssrc = track.ssrc();
137-
let pc2 = pc.clone();
138-
tokio::spawn(async move {
139-
let mut result = Result::<usize>::Ok(0);
140-
while result.is_ok() {
141-
let timeout = tokio::time::sleep(Duration::from_secs(3));
142-
tokio::pin!(timeout);
143-
144-
tokio::select! {
145-
_ = timeout.as_mut() =>{
146-
if let Some(pc) = pc2.upgrade(){
147-
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
148-
sender_ssrc: 0,
149-
media_ssrc,
150-
})]).await.map_err(Into::into);
151-
}else{
152-
break;
153-
}
154-
}
155-
};
156-
}
157-
});
158-
159-
let local_track_chan_tx2 = Arc::clone(&local_track_chan_tx);
160-
tokio::spawn(async move {
161-
// Create Track that we send video back to browser on
162-
let local_track = Arc::new(TrackLocalStaticRTP::new(
163-
track.codec().await.capability,
164-
"video".to_owned(),
165-
"webrtc-rs".to_owned(),
166-
));
167-
let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;
168-
169-
// Read RTP packets being sent to webrtc-rs
170-
while let Ok((rtp, _)) = track.read_rtp().await {
171-
if let Err(err) = local_track.write_rtp(&rtp).await {
172-
if Error::ErrClosedPipe != err {
173-
print!("output track write_rtp got error: {} and break", err);
130+
peer_connection.on_track(Box::new(
131+
move |track: Option<Arc<TrackRemote>>, _receiver: Option<Arc<RTCRtpReceiver>>| {
132+
if let Some(track) = track {
133+
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
134+
// This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
135+
let media_ssrc = track.ssrc();
136+
let pc2 = pc.clone();
137+
tokio::spawn(async move {
138+
let mut result = Result::<usize>::Ok(0);
139+
while result.is_ok() {
140+
let timeout = tokio::time::sleep(Duration::from_secs(3));
141+
tokio::pin!(timeout);
142+
143+
tokio::select! {
144+
_ = timeout.as_mut() =>{
145+
if let Some(pc) = pc2.upgrade(){
146+
result = pc.write_rtcp(&[Box::new(PictureLossIndication{
147+
sender_ssrc: 0,
148+
media_ssrc,
149+
})]).await.map_err(Into::into);
150+
}else{
174151
break;
175-
} else {
176-
print!("output track write_rtp got error: {}", err);
177152
}
178153
}
154+
};
155+
}
156+
});
157+
158+
let local_track_chan_tx2 = Arc::clone(&local_track_chan_tx);
159+
tokio::spawn(async move {
160+
// Create Track that we send video back to browser on
161+
let local_track = Arc::new(TrackLocalStaticRTP::new(
162+
track.codec().await.capability,
163+
"video".to_owned(),
164+
"webrtc-rs".to_owned(),
165+
));
166+
let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;
167+
168+
// Read RTP packets being sent to webrtc-rs
169+
while let Ok((rtp, _)) = track.read_rtp().await {
170+
if let Err(err) = local_track.write_rtp(&rtp).await {
171+
if Error::ErrClosedPipe != err {
172+
print!("output track write_rtp got error: {} and break", err);
173+
break;
174+
} else {
175+
print!("output track write_rtp got error: {}", err);
176+
}
179177
}
180-
});
181-
}
178+
}
179+
});
180+
}
182181

183-
Box::pin(async {})
184-
},
185-
))
186-
.await;
182+
Box::pin(async {})
183+
},
184+
));
187185

188186
// Set the handler for Peer connection state
189187
// This will notify you when the peer has connected/disconnected
190-
peer_connection
191-
.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
192-
println!("Peer Connection State has changed: {}", s);
193-
Box::pin(async {})
194-
}))
195-
.await;
188+
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
189+
println!("Peer Connection State has changed: {}", s);
190+
Box::pin(async {})
191+
}));
196192

197193
// Set the remote SessionDescription
198194
peer_connection.set_remote_description(offer).await?;
@@ -275,12 +271,12 @@ async fn main() -> Result<()> {
275271

276272
// Set the handler for Peer connection state
277273
// This will notify you when the peer has connected/disconnected
278-
peer_connection
279-
.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
274+
peer_connection.on_peer_connection_state_change(Box::new(
275+
move |s: RTCPeerConnectionState| {
280276
println!("Peer Connection State has changed: {}", s);
281277
Box::pin(async {})
282-
}))
283-
.await;
278+
},
279+
));
284280

285281
// Set the remote SessionDescription
286282
peer_connection

examples/examples/data-channels-close/data-channels-close.rs

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -115,21 +115,19 @@ async fn main() -> Result<()> {
115115

116116
// Set the handler for Peer connection state
117117
// This will notify you when the peer has connected/disconnected
118-
peer_connection
119-
.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
120-
println!("Peer Connection State has changed: {}", s);
121-
122-
if s == RTCPeerConnectionState::Failed {
123-
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
124-
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
125-
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
126-
println!("Peer Connection has gone to failed exiting");
127-
let _ = done_tx.try_send(());
128-
}
118+
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
119+
println!("Peer Connection State has changed: {}", s);
120+
121+
if s == RTCPeerConnectionState::Failed {
122+
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
123+
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
124+
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
125+
println!("Peer Connection has gone to failed exiting");
126+
let _ = done_tx.try_send(());
127+
}
129128

130-
Box::pin(async {})
131-
}))
132-
.await;
129+
Box::pin(async {})
130+
}));
133131

134132
// Register data channel creation handling
135133
peer_connection
@@ -157,7 +155,7 @@ async fn main() -> Result<()> {
157155
let mut done = done_tx2.lock().await;
158156
done.take();
159157
})
160-
})).await;
158+
}));
161159

162160
let mut result = Result::<usize>::Ok(0);
163161
while result.is_ok() {
@@ -175,25 +173,24 @@ async fn main() -> Result<()> {
175173

176174
let cnt = close_after2.fetch_sub(1, Ordering::SeqCst);
177175
if cnt <= 0 {
178-
println!("Sent times out. Closing data channel '{}'-'{}'.", d2.label(), d2.id());
176+
println!("Sent times out. Closing data channel '{}'-'{}'.", d2.label(), d2.id());
179177
let _ = d2.close().await;
180178
break;
181179
}
182180
}
183181
};
184182
}
185183
})
186-
})).await;
184+
}));
187185

188186
// Register text message handling
189187
d.on_message(Box::new(move |msg: DataChannelMessage| {
190188
let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
191189
println!("Message from DataChannel '{}': '{}'", d_label, msg_str);
192190
Box::pin(async {})
193-
})).await;
191+
}));
194192
})
195-
}))
196-
.await;
193+
}));
197194

198195
// Wait for the offer to be pasted
199196
let line = signal::must_read_stdin()?;

examples/examples/data-channels-create/data-channels-create.rs

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -101,21 +101,19 @@ async fn main() -> Result<()> {
101101

102102
// Set the handler for Peer connection state
103103
// This will notify you when the peer has connected/disconnected
104-
peer_connection
105-
.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
106-
println!("Peer Connection State has changed: {}", s);
107-
108-
if s == RTCPeerConnectionState::Failed {
109-
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
110-
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
111-
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
112-
println!("Peer Connection has gone to failed exiting");
113-
let _ = done_tx.try_send(());
114-
}
104+
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
105+
println!("Peer Connection State has changed: {}", s);
106+
107+
if s == RTCPeerConnectionState::Failed {
108+
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
109+
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
110+
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
111+
println!("Peer Connection has gone to failed exiting");
112+
let _ = done_tx.try_send(());
113+
}
115114

116-
Box::pin(async {})
117-
}))
118-
.await;
115+
Box::pin(async {})
116+
}));
119117

120118
// Register channel opening handling
121119
let d1 = Arc::clone(&data_channel);
@@ -138,17 +136,15 @@ async fn main() -> Result<()> {
138136
};
139137
}
140138
})
141-
})).await;
139+
}));
142140

143141
// Register text message handling
144142
let d_label = data_channel.label().to_owned();
145-
data_channel
146-
.on_message(Box::new(move |msg: DataChannelMessage| {
147-
let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
148-
println!("Message from DataChannel '{}': '{}'", d_label, msg_str);
149-
Box::pin(async {})
150-
}))
151-
.await;
143+
data_channel.on_message(Box::new(move |msg: DataChannelMessage| {
144+
let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
145+
println!("Message from DataChannel '{}': '{}'", d_label, msg_str);
146+
Box::pin(async {})
147+
}));
152148

153149
// Create an offer to send to the browser
154150
let offer = peer_connection.create_offer(None).await?;

0 commit comments

Comments
 (0)