From 73108ab18b95776c4d5214a795aaa38e14476f8a Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Thu, 14 Aug 2025 13:38:53 +0300 Subject: [PATCH] capture: use length prefixed encoding to reliably receive events The existing capture/replay mechamism would produce a concatenation of bincode encoded messages. However, on the receiving end the bincode format does not have enough information for messages to be decoded accurately from a non-delimited stream, which could lead to the receiving end getting confused. This PR fixes the issue by prefixing each messages with its length in bytes. Signed-off-by: Petros Angelatos --- .../operators/core/capture/capture.rs | 34 ++++++-------- .../dataflow/operators/core/capture/event.rs | 46 +++++++++++++++++-- 2 files changed, 56 insertions(+), 24 deletions(-) diff --git a/timely/src/dataflow/operators/core/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs index c9eea6785..410fe486e 100644 --- a/timely/src/dataflow/operators/core/capture/capture.rs +++ b/timely/src/dataflow/operators/core/capture/capture.rs @@ -29,7 +29,7 @@ pub trait Capture { /// use std::rc::Rc; /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; + /// use timely::dataflow::operators::{Capture, ToStream}; /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract}; /// /// // get send and recv endpoints, wrap send to share @@ -69,7 +69,7 @@ pub trait Capture { /// use std::net::{TcpListener, TcpStream}; /// use std::sync::{Arc, Mutex}; /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Capture, ToStream, Inspect}; + /// use timely::dataflow::operators::{Capture, ToStream}; /// use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract}; /// /// # #[cfg(miri)] fn main() {} @@ -79,30 +79,26 @@ pub trait Capture { /// let (send0, recv0) = ::std::sync::mpsc::channel(); /// let send0 = Arc::new(Mutex::new(send0)); /// - /// timely::execute(timely::Config::thread(), move |worker| { - /// - /// // this is only to validate the output. - /// let send0 = send0.lock().unwrap().clone(); - /// - /// // these allow us to capture / replay a timely stream. - /// let list = TcpListener::bind("127.0.0.1:8001").unwrap(); - /// let send = TcpStream::connect("127.0.0.1:8001").unwrap(); - /// let recv = list.incoming().next().unwrap().unwrap(); - /// - /// recv.set_nonblocking(true).unwrap(); + /// // these allow us to capture / replay a timely stream. + /// let list = TcpListener::bind("127.0.0.1:8001").unwrap(); + /// let send = TcpStream::connect("127.0.0.1:8001").unwrap(); + /// let recv = list.incoming().next().unwrap().unwrap(); + /// recv.set_nonblocking(true).unwrap(); /// - /// worker.dataflow::(|scope1| + /// std::thread::scope(move |s| { + /// s.spawn(move || timely::example(move |scope1| { /// (0..10u64) /// .to_stream(scope1) /// .capture_into(EventWriter::new(send)) - /// ); - /// - /// worker.dataflow::(|scope2| { + /// })); + /// s.spawn(move || timely::example(move |scope2| { + /// // this is only to validate the output. + /// let send0 = send0.lock().unwrap().clone(); /// Some(EventReader::<_,Vec,_>::new(recv)) /// .replay_into(scope2) /// .capture_into(send0) - /// }); - /// }).unwrap(); + /// })); + /// }); /// /// assert_eq!(recv0.extract()[0].1, (0..10).collect::>()); /// # } diff --git a/timely/src/dataflow/operators/core/capture/event.rs b/timely/src/dataflow/operators/core/capture/event.rs index 35dc158cb..7fff34af0 100644 --- a/timely/src/dataflow/operators/core/capture/event.rs +++ b/timely/src/dataflow/operators/core/capture/event.rs @@ -132,8 +132,12 @@ pub mod link { pub mod binary { use std::borrow::Cow; + use std::io::ErrorKind; + use std::ops::DerefMut; + use std::sync::Arc; use serde::{de::DeserializeOwned, Serialize}; + use timely_communication::allocator::zero_copy::bytes_slab::{BytesRefill, BytesSlab}; use super::{Event, EventPusher, EventIterator}; @@ -156,30 +160,62 @@ pub mod binary { impl EventPusher for EventWriter { fn push(&mut self, event: Event) { // TODO: `push` has no mechanism to report errors, so we `unwrap`. - ::bincode::serialize_into(&mut self.stream, &event).expect("Event bincode/write failed"); + let len = ::bincode::serialized_size(&event).expect("Event bincode failed"); + self.stream.write_all(&len.to_le_bytes()).expect("Event write failed"); + ::bincode::serialize_into(&mut self.stream, &event).expect("Event bincode failed"); } } /// A Wrapper for `R: Read` implementing `EventIterator`. pub struct EventReader { reader: R, - decoded: Option>, + buf: BytesSlab, + phant: ::std::marker::PhantomData<(T, C)>, } impl EventReader { /// Allocates a new `EventReader` wrapping a supplied reader. pub fn new(r: R) -> Self { + let refill = BytesRefill { + logic: Arc::new(|size| { + Box::new(vec![0_u8; size]) as Box> + }), + limit: None, + }; Self { reader: r, - decoded: None, + buf: BytesSlab::new(20, refill), + phant: ::std::marker::PhantomData, } } } impl EventIterator for EventReader { fn next(&mut self) -> Option>> { - self.decoded = ::bincode::deserialize_from(&mut self.reader).ok(); - self.decoded.take().map(Cow::Owned) + self.buf.ensure_capacity(1); + // Attempt to read some more bytes into self.buffer. + match self.reader.read(self.buf.empty()) { + Ok(n) => self.buf.make_valid(n), + Err(e) if e.kind() == ErrorKind::WouldBlock => {} + Err(e) => panic!("read failed: {e}"), + }; + + let valid = self.buf.valid(); + if valid.len() >= 8 { + let event_len = u64::from_le_bytes([ + valid[0], valid[1], valid[2], valid[3], valid[4], valid[5], valid[6], valid[7], + ]); + let required_bytes = (event_len + 8) as usize; + if valid.len() >= required_bytes { + let bytes = self.buf.extract(required_bytes); + let event = ::bincode::deserialize(&bytes[8..]).expect("Event decode failed"); + Some(Cow::Owned(event)) + } else { + None + } + } else { + None + } } } }