Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions timely/src/dataflow/operators/core/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub trait Capture<T: Timestamp, C: Container + Data> {
/// use std::rc::Rc;
/// use std::sync::{Arc, Mutex};
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Capture, ToStream, Inspect};
/// use timely::dataflow::operators::{Capture, ToStream};
/// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
///
/// // get send and recv endpoints, wrap send to share
Expand Down Expand Up @@ -69,7 +69,7 @@ pub trait Capture<T: Timestamp, C: Container + Data> {
/// use std::net::{TcpListener, TcpStream};
/// use std::sync::{Arc, Mutex};
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Capture, ToStream, Inspect};
/// use timely::dataflow::operators::{Capture, ToStream};
/// use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract};
///
/// # #[cfg(miri)] fn main() {}
Expand All @@ -79,30 +79,26 @@ pub trait Capture<T: Timestamp, C: Container + Data> {
/// let (send0, recv0) = ::std::sync::mpsc::channel();
/// let send0 = Arc::new(Mutex::new(send0));
///
/// timely::execute(timely::Config::thread(), move |worker| {
///
/// // this is only to validate the output.
/// let send0 = send0.lock().unwrap().clone();
///
/// // these allow us to capture / replay a timely stream.
/// let list = TcpListener::bind("127.0.0.1:8001").unwrap();
/// let send = TcpStream::connect("127.0.0.1:8001").unwrap();
/// let recv = list.incoming().next().unwrap().unwrap();
///
/// recv.set_nonblocking(true).unwrap();
/// // these allow us to capture / replay a timely stream.
/// let list = TcpListener::bind("127.0.0.1:8001").unwrap();
/// let send = TcpStream::connect("127.0.0.1:8001").unwrap();
/// let recv = list.incoming().next().unwrap().unwrap();
/// recv.set_nonblocking(true).unwrap();
///
/// worker.dataflow::<u64,_,_>(|scope1|
/// std::thread::scope(move |s| {
/// s.spawn(move || timely::example(move |scope1| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the two sides in separate threads since if one operator blocks while writing to the socket nothing will unblock it

/// (0..10u64)
/// .to_stream(scope1)
/// .capture_into(EventWriter::new(send))
/// );
///
/// worker.dataflow::<u64,_,_>(|scope2| {
/// }));
/// s.spawn(move || timely::example(move |scope2| {
/// // this is only to validate the output.
/// let send0 = send0.lock().unwrap().clone();
/// Some(EventReader::<_,Vec<u64>,_>::new(recv))
/// .replay_into(scope2)
/// .capture_into(send0)
/// });
/// }).unwrap();
/// }));
/// });
///
/// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
/// # }
Expand Down
46 changes: 41 additions & 5 deletions timely/src/dataflow/operators/core/capture/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,12 @@ pub mod link {
pub mod binary {

use std::borrow::Cow;
use std::io::ErrorKind;
use std::ops::DerefMut;
use std::sync::Arc;

use serde::{de::DeserializeOwned, Serialize};
use timely_communication::allocator::zero_copy::bytes_slab::{BytesRefill, BytesSlab};

use super::{Event, EventPusher, EventIterator};

Expand All @@ -156,30 +160,62 @@ pub mod binary {
impl<T: Serialize, C: Serialize, W: ::std::io::Write> EventPusher<T, C> for EventWriter<T, C, W> {
fn push(&mut self, event: Event<T, C>) {
// TODO: `push` has no mechanism to report errors, so we `unwrap`.
::bincode::serialize_into(&mut self.stream, &event).expect("Event bincode/write failed");
let len = ::bincode::serialized_size(&event).expect("Event bincode failed");
self.stream.write_all(&len.to_le_bytes()).expect("Event write failed");
::bincode::serialize_into(&mut self.stream, &event).expect("Event bincode failed");
}
}

/// A Wrapper for `R: Read` implementing `EventIterator<T, D>`.
pub struct EventReader<T, C, R: ::std::io::Read> {
reader: R,
decoded: Option<Event<T, C>>,
buf: BytesSlab,
phant: ::std::marker::PhantomData<(T, C)>,
}

impl<T, C, R: ::std::io::Read> EventReader<T, C, R> {
/// Allocates a new `EventReader` wrapping a supplied reader.
pub fn new(r: R) -> Self {
let refill = BytesRefill {
logic: Arc::new(|size| {
Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target = [u8]>>
}),
limit: None,
};
Self {
reader: r,
decoded: None,
buf: BytesSlab::new(20, refill),
phant: ::std::marker::PhantomData,
}
}
}

impl<T: DeserializeOwned + Clone, C: DeserializeOwned + Clone, R: ::std::io::Read> EventIterator<T, C> for EventReader<T, C, R> {
fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
self.decoded = ::bincode::deserialize_from(&mut self.reader).ok();
self.decoded.take().map(Cow::Owned)
self.buf.ensure_capacity(1);
// Attempt to read some more bytes into self.buffer.
match self.reader.read(self.buf.empty()) {
Ok(n) => self.buf.make_valid(n),
Err(e) if e.kind() == ErrorKind::WouldBlock => {}
Err(e) => panic!("read failed: {e}"),
};

let valid = self.buf.valid();
if valid.len() >= 8 {
let event_len = u64::from_le_bytes([
valid[0], valid[1], valid[2], valid[3], valid[4], valid[5], valid[6], valid[7],
]);
let required_bytes = (event_len + 8) as usize;
if valid.len() >= required_bytes {
let bytes = self.buf.extract(required_bytes);
let event = ::bincode::deserialize(&bytes[8..]).expect("Event decode failed");
Some(Cow::Owned(event))
} else {
None
}
} else {
None
}
}
}
}
Loading