Skip to content

Commit d606d9f

Browse files
Avoid lagged receiver in TCP manager (#1672)
1 parent d782585 commit d606d9f

File tree

1 file changed

+12
-11
lines changed

1 file changed

+12
-11
lines changed

libafl/src/events/tcp.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use libafl_bolts::{shmem::StdShMemProvider, staterestore::StateRestorer};
3131
use serde::{de::DeserializeOwned, Deserialize, Serialize};
3232
use tokio::{
3333
io::{AsyncReadExt, AsyncWriteExt},
34-
sync::{broadcast, mpsc},
34+
sync::{broadcast, broadcast::error::RecvError, mpsc},
3535
task::{spawn, JoinHandle},
3636
};
3737
#[cfg(feature = "std")]
@@ -111,8 +111,8 @@ where
111111
#[tokio::main(flavor = "current_thread")]
112112
#[allow(clippy::too_many_lines)]
113113
pub async fn broker_loop(&mut self) -> Result<(), Error> {
114-
let (tx_bc, rx) = broadcast::channel(1024);
115-
let (tx, mut rx_mpsc) = mpsc::channel(1024);
114+
let (tx_bc, rx) = broadcast::channel(65536);
115+
let (tx, mut rx_mpsc) = mpsc::channel(65536);
116116

117117
let exit_cleanly_after = self.exit_cleanly_after;
118118

@@ -224,13 +224,14 @@ where
224224
spawn(async move {
225225
// In a loop, read data from the socket and write the data back.
226226
loop {
227-
let buf: Vec<u8> = rx_inner
228-
.lock()
229-
.await
230-
.recv()
231-
.await
232-
.expect("Could not receive");
233-
// TODO handle full capacity, Lagged https://docs.rs/tokio/latest/tokio/sync/broadcast/error/enum.RecvError.html
227+
let buf: Vec<u8> = match rx_inner.lock().await.recv().await {
228+
Ok(buf) => buf,
229+
Err(RecvError::Lagged(num)) => {
230+
log::error!("Receiver lagged, skipping {num} messages");
231+
continue;
232+
}
233+
_ => panic!("Could not receive"),
234+
};
234235

235236
#[cfg(feature = "tcp_debug")]
236237
println!("{buf:?}");
@@ -704,7 +705,7 @@ where
704705
if self_id == other_client_id {
705706
panic!("Own ID should never have been sent by the broker");
706707
} else {
707-
println!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}");
708+
log::info!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}");
708709

709710
let event = postcard::from_bytes(&buf[4..])?;
710711
self.handle_in_client(fuzzer, executor, state, other_client_id, event)?;

0 commit comments

Comments
 (0)