Skip to content

Commit 7f5b66e

Browse files
Merge pull request #695 from petrosagg/deflake-capture
capture: use length prefixed encoding to reliably receive events
2 parents 7534317 + 73108ab commit 7f5b66e

File tree

2 files changed

+56
-24
lines changed

2 files changed

+56
-24
lines changed

timely/src/dataflow/operators/core/capture/capture.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub trait Capture<T: Timestamp, C: Container + Data> {
2929
/// use std::rc::Rc;
3030
/// use std::sync::{Arc, Mutex};
3131
/// use timely::dataflow::Scope;
32-
/// use timely::dataflow::operators::{Capture, ToStream, Inspect};
32+
/// use timely::dataflow::operators::{Capture, ToStream};
3333
/// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
3434
///
3535
/// // get send and recv endpoints, wrap send to share
@@ -69,7 +69,7 @@ pub trait Capture<T: Timestamp, C: Container + Data> {
6969
/// use std::net::{TcpListener, TcpStream};
7070
/// use std::sync::{Arc, Mutex};
7171
/// use timely::dataflow::Scope;
72-
/// use timely::dataflow::operators::{Capture, ToStream, Inspect};
72+
/// use timely::dataflow::operators::{Capture, ToStream};
7373
/// use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract};
7474
///
7575
/// # #[cfg(miri)] fn main() {}
@@ -79,30 +79,26 @@ pub trait Capture<T: Timestamp, C: Container + Data> {
7979
/// let (send0, recv0) = ::std::sync::mpsc::channel();
8080
/// let send0 = Arc::new(Mutex::new(send0));
8181
///
82-
/// timely::execute(timely::Config::thread(), move |worker| {
83-
///
84-
/// // this is only to validate the output.
85-
/// let send0 = send0.lock().unwrap().clone();
86-
///
87-
/// // these allow us to capture / replay a timely stream.
88-
/// let list = TcpListener::bind("127.0.0.1:8001").unwrap();
89-
/// let send = TcpStream::connect("127.0.0.1:8001").unwrap();
90-
/// let recv = list.incoming().next().unwrap().unwrap();
91-
///
92-
/// recv.set_nonblocking(true).unwrap();
82+
/// // these allow us to capture / replay a timely stream.
83+
/// let list = TcpListener::bind("127.0.0.1:8001").unwrap();
84+
/// let send = TcpStream::connect("127.0.0.1:8001").unwrap();
85+
/// let recv = list.incoming().next().unwrap().unwrap();
86+
/// recv.set_nonblocking(true).unwrap();
9387
///
94-
/// worker.dataflow::<u64,_,_>(|scope1|
88+
/// std::thread::scope(move |s| {
89+
/// s.spawn(move || timely::example(move |scope1| {
9590
/// (0..10u64)
9691
/// .to_stream(scope1)
9792
/// .capture_into(EventWriter::new(send))
98-
/// );
99-
///
100-
/// worker.dataflow::<u64,_,_>(|scope2| {
93+
/// }));
94+
/// s.spawn(move || timely::example(move |scope2| {
95+
/// // this is only to validate the output.
96+
/// let send0 = send0.lock().unwrap().clone();
10197
/// Some(EventReader::<_,Vec<u64>,_>::new(recv))
10298
/// .replay_into(scope2)
10399
/// .capture_into(send0)
104-
/// });
105-
/// }).unwrap();
100+
/// }));
101+
/// });
106102
///
107103
/// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
108104
/// # }

timely/src/dataflow/operators/core/capture/event.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,12 @@ pub mod link {
132132
pub mod binary {
133133

134134
use std::borrow::Cow;
135+
use std::io::ErrorKind;
136+
use std::ops::DerefMut;
137+
use std::sync::Arc;
135138

136139
use serde::{de::DeserializeOwned, Serialize};
140+
use timely_communication::allocator::zero_copy::bytes_slab::{BytesRefill, BytesSlab};
137141

138142
use super::{Event, EventPusher, EventIterator};
139143

@@ -156,30 +160,62 @@ pub mod binary {
156160
impl<T: Serialize, C: Serialize, W: ::std::io::Write> EventPusher<T, C> for EventWriter<T, C, W> {
157161
fn push(&mut self, event: Event<T, C>) {
158162
// TODO: `push` has no mechanism to report errors, so we `unwrap`.
159-
::bincode::serialize_into(&mut self.stream, &event).expect("Event bincode/write failed");
163+
let len = ::bincode::serialized_size(&event).expect("Event bincode failed");
164+
self.stream.write_all(&len.to_le_bytes()).expect("Event write failed");
165+
::bincode::serialize_into(&mut self.stream, &event).expect("Event bincode failed");
160166
}
161167
}
162168

163169
/// A Wrapper for `R: Read` implementing `EventIterator<T, D>`.
164170
pub struct EventReader<T, C, R: ::std::io::Read> {
165171
reader: R,
166-
decoded: Option<Event<T, C>>,
172+
buf: BytesSlab,
173+
phant: ::std::marker::PhantomData<(T, C)>,
167174
}
168175

169176
impl<T, C, R: ::std::io::Read> EventReader<T, C, R> {
170177
/// Allocates a new `EventReader` wrapping a supplied reader.
171178
pub fn new(r: R) -> Self {
179+
let refill = BytesRefill {
180+
logic: Arc::new(|size| {
181+
Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target = [u8]>>
182+
}),
183+
limit: None,
184+
};
172185
Self {
173186
reader: r,
174-
decoded: None,
187+
buf: BytesSlab::new(20, refill),
188+
phant: ::std::marker::PhantomData,
175189
}
176190
}
177191
}
178192

179193
impl<T: DeserializeOwned + Clone, C: DeserializeOwned + Clone, R: ::std::io::Read> EventIterator<T, C> for EventReader<T, C, R> {
180194
fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
181-
self.decoded = ::bincode::deserialize_from(&mut self.reader).ok();
182-
self.decoded.take().map(Cow::Owned)
195+
self.buf.ensure_capacity(1);
196+
// Attempt to read some more bytes into self.buffer.
197+
match self.reader.read(self.buf.empty()) {
198+
Ok(n) => self.buf.make_valid(n),
199+
Err(e) if e.kind() == ErrorKind::WouldBlock => {}
200+
Err(e) => panic!("read failed: {e}"),
201+
};
202+
203+
let valid = self.buf.valid();
204+
if valid.len() >= 8 {
205+
let event_len = u64::from_le_bytes([
206+
valid[0], valid[1], valid[2], valid[3], valid[4], valid[5], valid[6], valid[7],
207+
]);
208+
let required_bytes = (event_len + 8) as usize;
209+
if valid.len() >= required_bytes {
210+
let bytes = self.buf.extract(required_bytes);
211+
let event = ::bincode::deserialize(&bytes[8..]).expect("Event decode failed");
212+
Some(Cow::Owned(event))
213+
} else {
214+
None
215+
}
216+
} else {
217+
None
218+
}
183219
}
184220
}
185221
}

0 commit comments

Comments
 (0)