diff --git a/README.md b/README.md index ffd337b8b..58bc174fe 100644 --- a/README.md +++ b/README.md @@ -82,9 +82,10 @@ fn main() { } input.advance_to(round + 1); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } + Ok(()) }).unwrap(); } ``` diff --git a/communication/examples/comm_hello.rs b/communication/examples/comm_hello.rs index 85d67faa2..c5919fcf2 100644 --- a/communication/examples/comm_hello.rs +++ b/communication/examples/comm_hello.rs @@ -1,7 +1,7 @@ extern crate timely_communication; use std::ops::Deref; -use timely_communication::{Message, Allocate}; +use timely_communication::{Message, Allocate, Result}; fn main() { @@ -16,8 +16,8 @@ fn main() { // send typed data along each channel for i in 0 .. allocator.peers() { - senders[i].send(Message::from_typed(format!("hello, {}", i))); - senders[i].done(); + senders[i].send(Message::from_typed(format!("hello, {}", i)))?; + senders[i].done()?; } // no support for termination notification, @@ -25,23 +25,23 @@ fn main() { let mut received = 0; while received < allocator.peers() { - allocator.receive(); + allocator.receive()?; if let Some(message) = receiver.recv() { println!("worker {}: received: <{}>", allocator.index(), message.deref()); received += 1; } - allocator.release(); + allocator.release()?; } - allocator.index() + Result::Ok(allocator.index()) }); // computation runs until guards are joined or dropped. if let Ok(guards) = guards { for guard in guards.join() { - println!("result: {:?}", guard); + println!("result: {:?}", guard.unwrap().unwrap()); } } else { println!("error in computation"); } diff --git a/communication/src/allocator/counters.rs b/communication/src/allocator/counters.rs index 68d1fa4ab..8ff7443d2 100644 --- a/communication/src/allocator/counters.rs +++ b/communication/src/allocator/counters.rs @@ -31,7 +31,7 @@ impl> Pusher { impl> Push for Pusher { #[inline] - fn push(&mut self, element: &mut Option) { + fn push(&mut self, element: &mut Option) -> crate::Result<()> { // if element.is_none() { // if self.count != 0 { // self.events @@ -81,7 +81,7 @@ impl> ArcPusher { impl> Push for ArcPusher { #[inline] - fn push(&mut self, element: &mut Option) { + fn push(&mut self, element: &mut Option) -> crate::Result<()> { // if element.is_none() { // if self.count != 0 { // self.events @@ -98,11 +98,12 @@ impl> Push for ArcPusher { // we first enqueue data, second enqueue interest in the channel, // and finally awaken the thread. Other orders are defective when // multiple threads are involved. - self.pusher.push(element); + self.pusher.push(element)?; let _ = self.events.send((self.index, Event::Pushed(1))); // TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown). // .expect("Failed to send message count"); self.buzzer.buzz(); + Ok(()) } } diff --git a/communication/src/allocator/generic.rs b/communication/src/allocator/generic.rs index 625de6cd6..03f5d252f 100644 --- a/communication/src/allocator/generic.rs +++ b/communication/src/allocator/generic.rs @@ -57,7 +57,7 @@ impl Generic { } } /// Perform work before scheduling operators. - fn receive(&mut self) { + fn receive(&mut self) -> crate::Result<()> { match self { Generic::Thread(t) => t.receive(), Generic::Process(p) => p.receive(), @@ -66,7 +66,7 @@ impl Generic { } } /// Perform work after scheduling operators. - pub fn release(&mut self) { + pub fn release(&mut self) -> crate::Result<()> { match self { Generic::Thread(t) => t.release(), Generic::Process(p) => p.release(), @@ -91,8 +91,8 @@ impl Allocate for Generic { self.allocate(identifier) } - fn receive(&mut self) { self.receive(); } - fn release(&mut self) { self.release(); } + fn receive(&mut self) -> crate::Result<()> { self.receive() } + fn release(&mut self) -> crate::Result<()> { self.release() } fn events(&self) -> &Rc>> { self.events() } fn await_events(&self, _duration: Option) { match self { diff --git a/communication/src/allocator/mod.rs b/communication/src/allocator/mod.rs index 4c29b85e1..d0863f96d 100644 --- a/communication/src/allocator/mod.rs +++ b/communication/src/allocator/mod.rs @@ -70,7 +70,7 @@ pub trait Allocate { /// present messages contained in otherwise scarce resources (for example /// network buffers), under the premise that someone is about to consume /// the messages and release the resources. - fn receive(&mut self) { } + fn receive(&mut self) -> crate::Result<()> { Ok(()) } /// Signal the completion of a batch of reads from channels. /// @@ -79,7 +79,7 @@ pub trait Allocate { /// the fabric should consider re-acquiring scarce resources. This can /// lead to the fabric performing defensive copies out of un-consumed /// buffers, and can be a performance problem if invoked casually. - fn release(&mut self) { } + fn release(&mut self) -> crate::Result<()> { Ok(()) } /// Constructs a pipeline channel from the worker to itself. /// diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index 132bc6de5..212e38cfd 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -182,11 +182,12 @@ impl Allocate for Process { self.inner.await_events(duration); } - fn receive(&mut self) { + fn receive(&mut self) -> crate::Result<()> { let mut events = self.inner.events().borrow_mut(); while let Ok((index, event)) = self.counters_recv.try_recv() { events.push_back((index, event)); } + Ok(()) } } @@ -204,12 +205,13 @@ impl Clone for Pusher { } impl Push for Pusher { - #[inline] fn push(&mut self, element: &mut Option) { + #[inline] fn push(&mut self, element: &mut Option) -> crate::Result<()> { if let Some(element) = element.take() { // The remote endpoint could be shut down, and so // it is not fundamentally an error to fail to send. let _ = self.target.send(element); } + Ok(()) } } diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index ba5407e4d..50a893f8c 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -81,12 +81,13 @@ pub struct Pusher { impl Push for Pusher { #[inline] - fn push(&mut self, element: &mut Option) { + fn push(&mut self, element: &mut Option) -> crate::Result<()> { let mut borrow = self.target.borrow_mut(); if let Some(element) = element.take() { borrow.0.push_back(element); } *element = borrow.1.pop_front(); + Ok(()) } } diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 64225ff29..92e05f648 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -190,7 +190,7 @@ impl Allocate for TcpAllocator { // Perform preparatory work, most likely reading binary buffers from self.recv. #[inline(never)] - fn receive(&mut self) { + fn receive(&mut self) -> crate::Result<()> { // Check for channels whose `Puller` has been dropped. let mut canaries = self.canaries.borrow_mut(); @@ -207,10 +207,10 @@ impl Allocate for TcpAllocator { } ::std::mem::drop(canaries); - self.inner.receive(); + self.inner.receive()?; for recv in self.recvs.iter_mut() { - recv.drain_into(&mut self.staged); + recv.drain_into(&mut self.staged)?; } let mut events = self.inner.events().borrow_mut(); @@ -251,13 +251,14 @@ impl Allocate for TcpAllocator { } } } + Ok(()) } // Perform postparatory work, most likely sending un-full binary buffers. - fn release(&mut self) { + fn release(&mut self) -> crate::Result<()> { // Publish outgoing byte ledgers. for send in self.sends.iter_mut() { - send.borrow_mut().publish(); + send.borrow_mut().publish()?; } // OPTIONAL: Tattle on channels sitting on borrowed data. @@ -268,6 +269,7 @@ impl Allocate for TcpAllocator { // eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len); // } // } + Ok(()) } fn events(&self) -> &Rc>> { self.inner.events() @@ -275,4 +277,4 @@ impl Allocate for TcpAllocator { fn await_events(&self, duration: Option) { self.inner.await_events(duration); } -} \ No newline at end of file +} diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index dd2815a50..cbba13a0c 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -159,7 +159,7 @@ impl Allocate for ProcessAllocator { // Perform preparatory work, most likely reading binary buffers from self.recv. #[inline(never)] - fn receive(&mut self) { + fn receive(&mut self) -> crate::Result<()> { // Check for channels whose `Puller` has been dropped. let mut canaries = self.canaries.borrow_mut(); @@ -179,7 +179,7 @@ impl Allocate for ProcessAllocator { let mut events = self.events.borrow_mut(); for recv in self.recvs.iter_mut() { - recv.drain_into(&mut self.staged); + recv.drain_into(&mut self.staged)?; } for mut bytes in self.staged.drain(..) { @@ -218,13 +218,14 @@ impl Allocate for ProcessAllocator { } } } + Ok(()) } // Perform postparatory work, most likely sending un-full binary buffers. - fn release(&mut self) { + fn release(&mut self) -> crate::Result<()> { // Publish outgoing byte ledgers. for send in self.sends.iter_mut() { - send.borrow_mut().publish(); + send.borrow_mut().publish()?; } // OPTIONAL: Tattle on channels sitting on borrowed data. @@ -235,6 +236,7 @@ impl Allocate for ProcessAllocator { // eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len); // } // } + Ok(()) } fn events(&self) -> &Rc>> { @@ -250,4 +252,4 @@ impl Allocate for ProcessAllocator { } } } -} \ No newline at end of file +} diff --git a/communication/src/allocator/zero_copy/bytes_exchange.rs b/communication/src/allocator/zero_copy/bytes_exchange.rs index ddedf806a..8fa46ea7f 100644 --- a/communication/src/allocator/zero_copy/bytes_exchange.rs +++ b/communication/src/allocator/zero_copy/bytes_exchange.rs @@ -5,20 +5,21 @@ use std::collections::VecDeque; use bytes::arc::Bytes; use super::bytes_slab::BytesSlab; +use crate::err::CommError; /// A target for `Bytes`. pub trait BytesPush { // /// Pushes bytes at the instance. // fn push(&mut self, bytes: Bytes); /// Pushes many bytes at the instance. - fn extend>(&mut self, iter: I); + fn extend>(&mut self, iter: I) -> crate::Result<()>; } /// A source for `Bytes`. pub trait BytesPull { // /// Pulls bytes from the instance. // fn pull(&mut self) -> Option; /// Drains many bytes from the instance. - fn drain_into(&mut self, vec: &mut Vec); + fn drain_into(&mut self, vec: &mut Vec) -> crate::Result<()>; } use std::sync::atomic::{AtomicBool, Ordering}; @@ -43,23 +44,28 @@ impl MergeQueue { } } /// Indicates that all input handles to the queue have dropped. - pub fn is_complete(&self) -> bool { - if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); } - Arc::strong_count(&self.queue) == 1 && self.queue.lock().expect("Failed to acquire lock").is_empty() + pub fn is_complete(&self) -> crate::Result { + if self.panic.load(Ordering::SeqCst) { return Err(CommError::Poison); } + Ok(Arc::strong_count(&self.queue) == 1 && self.queue.lock()?.is_empty()) + } + + /// Mark self as poisoned, which causes all subsequent operations to error. + pub fn poison(&mut self) { + self.panic.store(true, Ordering::SeqCst); } } impl BytesPush for MergeQueue { - fn extend>(&mut self, iterator: I) { + fn extend>(&mut self, iterator: I) -> crate::Result<()> { - if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); } + if self.panic.load(Ordering::SeqCst) { return Err(CommError::Poison); } // try to acquire lock without going to sleep (Rust's lock() might yield) let mut lock_ok = self.queue.try_lock(); - while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok { + while let Err(::std::sync::TryLockError::WouldBlock) = lock_ok { lock_ok = self.queue.try_lock(); } - let mut queue = lock_ok.expect("MergeQueue mutex poisoned."); + let mut queue = lock_ok?; let mut iterator = iterator.into_iter(); let mut should_ping = false; @@ -88,21 +94,23 @@ impl BytesPush for MergeQueue { if should_ping { self.buzzer.buzz(); // only signal from empty to non-empty. } + Ok(()) } } impl BytesPull for MergeQueue { - fn drain_into(&mut self, vec: &mut Vec) { - if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); } + fn drain_into(&mut self, vec: &mut Vec) -> crate::Result<()> { + if self.panic.load(Ordering::SeqCst) { return Err(CommError::Poison); } // try to acquire lock without going to sleep (Rust's lock() might yield) let mut lock_ok = self.queue.try_lock(); - while let Result::Err(::std::sync::TryLockError::WouldBlock) = lock_ok { + while let Err(::std::sync::TryLockError::WouldBlock) = lock_ok { lock_ok = self.queue.try_lock(); } - let mut queue = lock_ok.expect("MergeQueue mutex poisoned."); + let mut queue = lock_ok?; vec.extend(queue.drain(..)); + Ok(()) } } @@ -134,11 +142,12 @@ pub struct SendEndpoint { impl SendEndpoint

{ /// Moves `self.buffer` into `self.send`, replaces with empty buffer. - fn send_buffer(&mut self) { + fn send_buffer(&mut self) -> crate::Result<()> { let valid_len = self.buffer.valid().len(); if valid_len > 0 { - self.send.extend(Some(self.buffer.extract(valid_len))); + self.send.extend(Some(self.buffer.extract(valid_len)))?; } + Ok(()) } /// Allocates a new `BytesSendEndpoint` from a shared queue. @@ -151,30 +160,30 @@ impl SendEndpoint

{ /// Makes the next `bytes` bytes valid. /// /// The current implementation also sends the bytes, to ensure early visibility. - pub fn make_valid(&mut self, bytes: usize) { + pub fn make_valid(&mut self, bytes: usize) -> crate::Result<()> { self.buffer.make_valid(bytes); - self.send_buffer(); + self.send_buffer() } /// Acquires a prefix of `self.empty()` of length at least `capacity`. - pub fn reserve(&mut self, capacity: usize) -> &mut [u8] { + pub fn reserve(&mut self, capacity: usize) -> crate::Result<&mut [u8]> { if self.buffer.empty().len() < capacity { - self.send_buffer(); + self.send_buffer()?; self.buffer.ensure_capacity(capacity); } assert!(self.buffer.empty().len() >= capacity); - self.buffer.empty() + Ok(self.buffer.empty()) } /// Marks all written data as valid, makes visible. - pub fn publish(&mut self) { - self.send_buffer(); + pub fn publish(&mut self) -> crate::Result<()> { + self.send_buffer() } } impl Drop for SendEndpoint

{ fn drop(&mut self) { - self.send_buffer(); + // Ignore errors. + let _ = self.send_buffer(); } } - diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index be1494fa4..c6b86089c 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -13,18 +13,18 @@ use super::stream::Stream; /// On drop, the guard joins with each of the threads to ensure that they complete /// cleanly and send all necessary data. pub struct CommsGuard { - send_guards: Vec<::std::thread::JoinHandle<()>>, - recv_guards: Vec<::std::thread::JoinHandle<()>>, + send_guards: Vec<::std::thread::JoinHandle>>, + recv_guards: Vec<::std::thread::JoinHandle>>, } impl Drop for CommsGuard { fn drop(&mut self) { for handle in self.send_guards.drain(..) { - handle.join().expect("Send thread panic"); + handle.join().expect("Send thread panic").unwrap(); } // println!("SEND THREADS JOINED"); for handle in self.recv_guards.drain(..) { - handle.join().expect("Recv thread panic"); + handle.join().expect("Recv thread panic").unwrap(); } // println!("RECV THREADS JOINED"); } @@ -97,7 +97,7 @@ pub fn initialize_networking_from_sockets( remote: Some(index), }); - send_loop(stream, remote_recv, my_index, index, logger); + send_loop(stream, remote_recv, my_index, index, logger) })?; send_guards.push(join_guard); @@ -118,7 +118,7 @@ pub fn initialize_networking_from_sockets( sender: false, remote: Some(index), }); - recv_loop(stream, remote_send, threads * my_index, my_index, index, logger); + recv_loop(stream, remote_send, threads * my_index, my_index, index, logger) })?; recv_guards.push(join_guard); diff --git a/communication/src/allocator/zero_copy/mod.rs b/communication/src/allocator/zero_copy/mod.rs index 81b90c01f..350bc4f97 100644 --- a/communication/src/allocator/zero_copy/mod.rs +++ b/communication/src/allocator/zero_copy/mod.rs @@ -11,4 +11,4 @@ pub mod allocator; pub mod allocator_process; pub mod initialize; pub mod push_pull; -pub mod stream; \ No newline at end of file +pub mod stream; diff --git a/communication/src/allocator/zero_copy/push_pull.rs b/communication/src/allocator/zero_copy/push_pull.rs index fe423c21a..d987cf6b4 100644 --- a/communication/src/allocator/zero_copy/push_pull.rs +++ b/communication/src/allocator/zero_copy/push_pull.rs @@ -37,7 +37,7 @@ impl Pusher { impl Push> for Pusher { #[inline] - fn push(&mut self, element: &mut Option>) { + fn push(&mut self, element: &mut Option>) -> crate::Result<()> { if let Some(ref mut element) = *element { // determine byte lengths and build header. @@ -49,14 +49,15 @@ impl Push> for Pusher { // acquire byte buffer and write header, element. let mut borrow = self.sender.borrow_mut(); { - let mut bytes = borrow.reserve(header.required_bytes()); + let mut bytes = borrow.reserve(header.required_bytes())?; assert!(bytes.len() >= header.required_bytes()); let writer = &mut bytes; - header.write_to(writer).expect("failed to write header!"); + header.write_to(writer)?; element.into_bytes(writer); } - borrow.make_valid(header.required_bytes()); + borrow.make_valid(header.required_bytes())?; } + Ok(()) } } @@ -139,4 +140,4 @@ impl Pull> for PullerInner { &mut self.current } } -} \ No newline at end of file +} diff --git a/communication/src/allocator/zero_copy/tcp.rs b/communication/src/allocator/zero_copy/tcp.rs index f2099f516..f68386f33 100644 --- a/communication/src/allocator/zero_copy/tcp.rs +++ b/communication/src/allocator/zero_copy/tcp.rs @@ -1,6 +1,6 @@ //! -use std::io::{self, Write}; +use std::io::Write; use crossbeam_channel::{Sender, Receiver}; use crate::networking::MessageHeader; @@ -11,16 +11,9 @@ use super::stream::Stream; use logging_core::Logger; +use crate::err::CommError; use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent}; -fn tcp_panic(context: &'static str, cause: io::Error) -> ! { - // NOTE: some downstream crates sniff out "timely communication error:" from - // the panic message. Avoid removing or rewording this message if possible. - // It'd be nice to instead use `panic_any` here with a structured error - // type, but the panic message for `panic_any` is no good (Box). - panic!("timely communication error: {}: {}", context, cause) -} - /// Repeatedly reads from a TcpStream and carves out messages. /// /// The intended communication pattern is a sequence of (header, message)^* for valid @@ -36,6 +29,7 @@ pub fn recv_loop( process: usize, remote: usize, mut logger: Option>) +-> crate::Result<()> where S: Stream, { @@ -52,6 +46,28 @@ where stageds.push(Vec::new()); } + let result = recv_loop_inner(&mut reader, worker_offset, &mut logger, &mut targets, &mut buffer, &mut stageds); + + for target in &mut targets { + target.poison(); + } + + // Log the receive thread's end. + logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, })); + result +} + +fn recv_loop_inner( + reader: &mut S, + worker_offset: usize, + logger: &mut Option>, + targets: &mut Vec, + buffer: &mut BytesSlab, + stageds: &mut Vec> +) -> crate::Result<()> +where + S: Stream +{ // Each loop iteration adds to `self.Bytes` and consumes all complete messages. // At the start of each iteration, `self.buffer[..self.length]` represents valid // data, and the remaining capacity is available for reading from the reader. @@ -67,15 +83,9 @@ where assert!(!buffer.empty().is_empty()); // Attempt to read some more bytes into self.buffer. - let read = match reader.read(&mut buffer.empty()) { - Err(x) => tcp_panic("reading data", x), - Ok(n) if n == 0 => { - tcp_panic( - "reading data", - std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"), - ); - } - Ok(n) => n, + let read = match reader.read(&mut buffer.empty())? { + 0 => { return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof).into()); }, + n => n, }; buffer.make_valid(read); @@ -99,11 +109,11 @@ where // Shutting down; confirm absence of subsequent data. active = false; if !buffer.valid().is_empty() { - panic!("Clean shutdown followed by data."); + return Err(CommError::UnexpectedData); } buffer.ensure_capacity(1); - if reader.read(&mut buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 { - panic!("Clean shutdown followed by data."); + if reader.read(&mut buffer.empty())? > 0 { + return Err(CommError::UnexpectedData); } } } @@ -112,12 +122,10 @@ where for (index, staged) in stageds.iter_mut().enumerate() { // FIXME: try to merge `staged` before handing it to BytesPush::extend use crate::allocator::zero_copy::bytes_exchange::BytesPush; - targets[index].extend(staged.drain(..)); + targets[index].extend(staged.drain(..))?; } } - - // Log the receive thread's end. - logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, })); + Ok(()) } /// Repeatedly sends messages into a TcpStream. @@ -134,17 +142,18 @@ pub fn send_loop( sources: Vec>, process: usize, remote: usize, - mut logger: Option>) + mut logger: Option>, +) -> crate::Result<()> { // Log the send thread's start. logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, })); - let mut sources: Vec = sources.into_iter().map(|x| { + let mut sources: Vec> = sources.into_iter().map(|x| { let buzzer = crate::buzzer::Buzzer::new(); let queue = MergeQueue::new(buzzer); x.send(queue.clone()).expect("failed to send MergeQueue"); - queue + Some(queue) }).collect(); let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer); @@ -153,9 +162,9 @@ pub fn send_loop( while !sources.is_empty() { // TODO: Round-robin better, to release resources fairly when overloaded. - for source in sources.iter_mut() { + for source in sources.iter_mut().flat_map(|s| s) { use crate::allocator::zero_copy::bytes_exchange::BytesPull; - source.drain_into(&mut stash); + source.drain_into(&mut stash)?; } if stash.is_empty() { @@ -165,8 +174,15 @@ pub fn send_loop( // still be a signal incoming. // // We could get awoken by more data, a channel closing, or spuriously perhaps. - writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e)); - sources.retain(|source| !source.is_complete()); + writer.flush()?; + for source in sources.iter_mut() { + if let Some(s) = source { + if s.is_complete()? { + *source = None; + } + } + } + sources.retain(Option::is_some); if !sources.is_empty() { std::thread::park(); } @@ -184,7 +200,7 @@ pub fn send_loop( } }); - writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e)); + writer.write_all(&bytes[..])?; } } } @@ -199,11 +215,12 @@ pub fn send_loop( length: 0, seqno: 0, }; - header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e)); - writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e)); - writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e)); + header.write_to(&mut writer)?; + writer.flush()?; + writer.get_mut().shutdown(::std::net::Shutdown::Write)?; logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header })); // Log the send thread's end. logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, })); + Ok(()) } diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index 3c76624fc..c18f3527b 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -148,7 +148,7 @@ impl Config { } /// Attempts to assemble the described communication infrastructure. - pub fn try_build(self) -> Result<(Vec, Box), String> { + pub fn try_build(self) -> crate::Result<(Vec, Box)> { match self { Config::Thread => { Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(()))) @@ -160,12 +160,8 @@ impl Config { Ok((ProcessBuilder::new_vector(threads).into_iter().map(|x| GenericBuilder::ProcessBinary(x)).collect(), Box::new(()))) }, Config::Cluster { threads, process, addresses, report, log_fn } => { - match initialize_networking(addresses, process, threads, report, log_fn) { - Ok((stuff, guard)) => { - Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard))) - }, - Err(err) => Err(format!("failed to initialize networking: {}", err)) - } + let (stuff, guard) = initialize_networking(addresses, process, threads, report, log_fn)?; + Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard))) }, } } @@ -239,7 +235,7 @@ impl Config { pub fn initializeT+Send+Sync+'static>( config: Config, func: F, -) -> Result,String> { +) -> crate::Result> { let (allocators, others) = config.try_build()?; initialize_from(allocators, others, func) } @@ -299,7 +295,7 @@ pub fn initialize_from( builders: Vec, others: Box, func: F, -) -> Result,String> +) -> crate::Result> where A: AllocateBuilder+'static, T: Send+'static, @@ -314,8 +310,7 @@ where .spawn(move || { let communicator = builder.build(); (*clone)(communicator) - }) - .map_err(|e| format!("{:?}", e))?); + })?); } Ok(WorkerGuards { guards, others }) diff --git a/communication/src/lib.rs b/communication/src/lib.rs index e3302cdb0..7ed850569 100644 --- a/communication/src/lib.rs +++ b/communication/src/lib.rs @@ -97,6 +97,8 @@ pub mod buzzer; use std::any::Any; +use crate::err::CommError; + #[cfg(feature = "bincode")] use serde::{Serialize, Deserialize}; #[cfg(not(feature = "bincode"))] @@ -131,25 +133,25 @@ implDeserialize<'a>+'static> Data for T { } /// another call to `push()` may not be coming. pub trait Push { /// Pushes `element` with the opportunity to take ownership. - fn push(&mut self, element: &mut Option); + fn push(&mut self, element: &mut Option) -> Result<()>; /// Pushes `element` and drops any resulting resources. #[inline] - fn send(&mut self, element: T) { self.push(&mut Some(element)); } + fn send(&mut self, element: T) -> Result<()> { self.push(&mut Some(element)) } /// Pushes `None`, conventionally signalling a flush. #[inline] - fn done(&mut self) { self.push(&mut None); } + fn done(&mut self) -> Result<()> { self.push(&mut None) } } impl> Push for Box

{ #[inline] - fn push(&mut self, element: &mut Option) { (**self).push(element) } + fn push(&mut self, element: &mut Option) -> Result<()> { (**self).push(element) } } /// Pulling elements of type `T`. pub trait Pull { /// Pulls an element and provides the opportunity to take ownership. /// - /// The puller may mutate the result, in particular take ownership of the data by + /// The puller may mutate the result, in particular take ownership of the data by /// replacing it with other data or even `None`. This allows the puller to return /// resources to the implementor. /// @@ -188,4 +190,52 @@ fn promise_futures(sends: usize, recvs: usize) -> (Vec>>, Vec = std::result::Result; + +pub mod err { + //! blubb + use std::io::Error as IOError; + use std::sync::{PoisonError, TryLockError}; + + /// blubb + #[derive(Debug)] + pub enum CommError { + /// blubb + Panic, + /// blubb + Poison, + /// blubb + IO(IOError), + /// blubb + UnexpectedData, + /// blubb + String(String), + } + + impl From for CommError { + fn from(value: IOError) -> Self { + Self::IO(value) + } + } + + impl From for CommError { + fn from(value: String) -> Self { + Self::String(value) + } + } + + impl From> for CommError { + fn from(_value: PoisonError) -> Self { + Self::Poison + } + } + + impl From> for CommError { + fn from(_value: TryLockError) -> Self { + Self::Poison + } + } +} diff --git a/container/src/columnation.rs b/container/src/columnation.rs index 97d3ecdd6..e72a70baa 100644 --- a/container/src/columnation.rs +++ b/container/src/columnation.rs @@ -297,10 +297,11 @@ mod container { } impl PushPartitioned for TimelyStack { - fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) + fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) + -> Result<(), E> where I: FnMut(&Self::Item) -> usize, - F: FnMut(usize, &mut Self), + F: FnMut(usize, &mut Self) -> Result<(), E>, { fn ensure_capacity(this: &mut TimelyStack) { let capacity = this.local.capacity(); @@ -315,10 +316,11 @@ mod container { ensure_capacity(&mut buffers[index]); buffers[index].copy(datum); if buffers[index].len() == buffers[index].local.capacity() { - flush(index, &mut buffers[index]); + flush(index, &mut buffers[index])?; } } self.clear(); + Ok(()) } } } diff --git a/container/src/lib.rs b/container/src/lib.rs index d1ffd2bda..123667092 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -112,17 +112,19 @@ pub trait PushPartitioned: Container { /// /// Drain all elements from `self`, and use the function `index` to determine which `buffer` to /// append an element to. Call `flush` with an index and a buffer to send the data downstream. - fn push_partitioned(&mut self, buffers: &mut [Self], index: I, flush: F) + fn push_partitioned(&mut self, buffers: &mut [Self], index: I, flush: F) + -> Result<(), E> where I: FnMut(&Self::Item) -> usize, - F: FnMut(usize, &mut Self); + F: FnMut(usize, &mut Self) -> Result<(), E>; } impl PushPartitioned for Vec { - fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) + fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) + -> Result<(), E> where I: FnMut(&Self::Item) -> usize, - F: FnMut(usize, &mut Self), + F: FnMut(usize, &mut Self) -> Result<(), E>, { fn ensure_capacity(this: &mut Vec) { let capacity = this.capacity(); @@ -137,9 +139,10 @@ impl PushPartitioned for Vec { ensure_capacity(&mut buffers[index]); buffers[index].push(datum); if buffers[index].len() == buffers[index].capacity() { - flush(index, &mut buffers[index]); + flush(index, &mut buffers[index])?; } } + Ok(()) } } diff --git a/experiments/benches/exchange_bench.rs b/experiments/benches/exchange_bench.rs index cf9337ca4..64a22b3e1 100644 --- a/experiments/benches/exchange_bench.rs +++ b/experiments/benches/exchange_bench.rs @@ -87,10 +87,10 @@ fn experiment_exchange( time += 1; input.advance_to(time); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } - timer.elapsed() + Ok(timer.elapsed()) }) .unwrap() .join() @@ -98,6 +98,7 @@ fn experiment_exchange( .next() .unwrap() .unwrap() + .unwrap() } criterion_group!(benches, bench); diff --git a/kafkaesque/src/bin/capture_recv.rs b/kafkaesque/src/bin/capture_recv.rs index 862d44cbf..0a58e6bc9 100644 --- a/kafkaesque/src/bin/capture_recv.rs +++ b/kafkaesque/src/bin/capture_recv.rs @@ -41,6 +41,7 @@ fn main() { .count() .inspect(|x| println!("replayed: {:?}", x)) ; + Ok(()) }) }).unwrap(); // asserts error-free execution } diff --git a/kafkaesque/src/bin/capture_send.rs b/kafkaesque/src/bin/capture_send.rs index ea2502507..a71892178 100644 --- a/kafkaesque/src/bin/capture_send.rs +++ b/kafkaesque/src/bin/capture_send.rs @@ -27,5 +27,6 @@ fn main() { .to_stream(scope) .capture_into(producer) ); + Ok(()) }).unwrap(); } diff --git a/kafkaesque/src/bin/kafka_source.rs b/kafkaesque/src/bin/kafka_source.rs index f71f8df7f..d6cf4c115 100644 --- a/kafkaesque/src/bin/kafka_source.rs +++ b/kafkaesque/src/bin/kafka_source.rs @@ -56,8 +56,8 @@ fn main() { }); + Ok(()) }).expect("Timely computation failed somehow"); println!("Hello, world!"); } - diff --git a/kafkaesque/src/kafka_source.rs b/kafkaesque/src/kafka_source.rs index 0167913dd..8166e724f 100644 --- a/kafkaesque/src/kafka_source.rs +++ b/kafkaesque/src/kafka_source.rs @@ -78,7 +78,7 @@ use rdkafka::consumer::{ConsumerContext, BaseConsumer}; /// strings.inspect(|x| println!("Observed: {:?}", x)); /// /// }); -/// +/// Ok(()) /// }).expect("Timely computation failed somehow"); /// /// println!("Hello, world!"); @@ -135,4 +135,4 @@ where } }) -} \ No newline at end of file +} diff --git a/kafkaesque/src/lib.rs b/kafkaesque/src/lib.rs index ba91061e6..b2ad96ab8 100644 --- a/kafkaesque/src/lib.rs +++ b/kafkaesque/src/lib.rs @@ -66,13 +66,14 @@ impl EventProducerCore { } impl EventPusherCore for EventProducerCore { - fn push(&mut self, event: EventCore) { - unsafe { ::abomonation::encode(&event, &mut self.buffer).expect("Encode failure"); } + fn push(&mut self, event: EventCore) -> timely::Result<()> { + unsafe { ::abomonation::encode(&event, &mut self.buffer) }?; // println!("sending {:?} bytes", self.buffer.len()); self.producer.send::<(),[u8]>(BaseRecord::to(self.topic.as_str()).payload(&self.buffer[..])).unwrap(); self.counter.fetch_add(1, Ordering::SeqCst); self.producer.poll(std::time::Duration::from_millis(0)); self.buffer.clear(); + Ok(()) } } diff --git a/mdbook/src/chapter_0/chapter_0_1.md b/mdbook/src/chapter_0/chapter_0_1.md index 0dc5c234a..a5e578db8 100644 --- a/mdbook/src/chapter_0/chapter_0_1.md +++ b/mdbook/src/chapter_0/chapter_0_1.md @@ -32,9 +32,10 @@ fn main() { } input.advance_to(round + 1); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } + Ok(()) }).unwrap(); } ``` diff --git a/mdbook/src/chapter_1/chapter_1_1.md b/mdbook/src/chapter_1/chapter_1_1.md index 9d5c75381..84dd2fc34 100644 --- a/mdbook/src/chapter_1/chapter_1_1.md +++ b/mdbook/src/chapter_1/chapter_1_1.md @@ -36,8 +36,9 @@ fn main() { input.send(round); } input.advance_to(round + 1); - // worker.step_while(|| probe.less_than(input.time())); + // worker.step_while(|| probe.less_than(input.time()))?; } + Ok(()) }).unwrap(); } ``` diff --git a/mdbook/src/chapter_1/chapter_1_3.md b/mdbook/src/chapter_1/chapter_1_3.md index 0d7107ec7..11eb3fdda 100644 --- a/mdbook/src/chapter_1/chapter_1_3.md +++ b/mdbook/src/chapter_1/chapter_1_3.md @@ -31,8 +31,9 @@ fn main() { input.send(round); } input.advance_to(round + 1); - worker.step_while(|| probe.less_than(input.time())); + worker.step_while(|| probe.less_than(input.time()))?; } + Ok(()) }).unwrap(); } ``` @@ -41,7 +42,7 @@ We'll put the whole program up here, but there are really just two lines that de ```rust,ignore input.advance_to(round + 1); -worker.step_while(|| probe.less_than(input.time())); +worker.step_while(|| probe.less_than(input.time()))?; ``` Let's talk about each of them. diff --git a/mdbook/src/chapter_2/chapter_2_1.md b/mdbook/src/chapter_2/chapter_2_1.md index 8f8fc6b94..8c9ca752d 100644 --- a/mdbook/src/chapter_2/chapter_2_1.md +++ b/mdbook/src/chapter_2/chapter_2_1.md @@ -25,7 +25,7 @@ fn main() { let stream2 = (0 .. 9).to_stream(scope); }); - + Ok(()) }).unwrap(); } ``` diff --git a/mdbook/src/chapter_2/chapter_2_2.md b/mdbook/src/chapter_2/chapter_2_2.md index 2ee48778d..091620733 100644 --- a/mdbook/src/chapter_2/chapter_2_2.md +++ b/mdbook/src/chapter_2/chapter_2_2.md @@ -16,6 +16,7 @@ fn main() { .to_stream(scope) .inspect(|x| println!("hello: {}", x)); }); + Ok(()) }).unwrap(); } ``` @@ -38,6 +39,7 @@ fn main() { .to_stream(scope) .inspect_batch(|t, xs| println!("hello: {:?} @ {:?}", xs, t)); }); + Ok(()) }).unwrap(); } ``` diff --git a/mdbook/src/chapter_2/chapter_2_3.md b/mdbook/src/chapter_2/chapter_2_3.md index 71e5774be..35d9fa0b9 100644 --- a/mdbook/src/chapter_2/chapter_2_3.md +++ b/mdbook/src/chapter_2/chapter_2_3.md @@ -23,6 +23,7 @@ fn main() { .map(|x| x + 1) .inspect(|x| println!("hello: {}", x)); }); + Ok(()) }).unwrap(); } ``` @@ -43,6 +44,7 @@ fn main() { .map(|mut x| { x.truncate(5); x } ) .inspect(|x| println!("hello: {}", x)); }); + Ok(()) }).unwrap(); } ``` @@ -67,6 +69,7 @@ fn main() { .map_in_place(|x| x.truncate(5)) .inspect(|x| println!("hello: {}", x)); }); + Ok(()) }).unwrap(); } ``` @@ -86,6 +89,7 @@ fn main() { .flat_map(|x| 0 .. x) .inspect(|x| println!("hello: {}", x)); }); + Ok(()) }).unwrap(); } ``` @@ -107,6 +111,7 @@ fn main() { .filter(|x| *x % 2 == 0) .inspect(|x| println!("hello: {}", x)); }); + Ok(()) }).unwrap(); } ``` diff --git a/mdbook/src/chapter_2/chapter_2_5.md b/mdbook/src/chapter_2/chapter_2_5.md index 5b4c75b27..13adca5ad 100644 --- a/mdbook/src/chapter_2/chapter_2_5.md +++ b/mdbook/src/chapter_2/chapter_2_5.md @@ -60,9 +60,10 @@ fn main() { input.send(("round".to_owned(), 1)); input.advance_to(round + 1); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } + Ok(()) }).unwrap(); } ``` @@ -100,7 +101,7 @@ Having constructed the dataflow, we feed it some data. input.send(("round".to_owned(), 1)); input.advance_to(round + 1); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } ``` @@ -146,9 +147,10 @@ Rather than repeat all the code up above, I'm just going to show you the fragmen # input.send(("round".to_owned(), 1)); # input.advance_to(round + 1); # while probe.less_than(input.time()) { -# worker.step(); +# worker.step()?; # } # } +# Ok(()) # }).unwrap(); # } ``` @@ -249,9 +251,10 @@ As before, I'm just going to show you the new code, which now lives just after ` # input.send(("round".to_owned(), 1)); # input.advance_to(round + 1); # while probe.less_than(input.time()) { -# worker.step(); +# worker.step()?; # } # } +# Ok(()) # }).unwrap(); # } ``` diff --git a/mdbook/src/chapter_3/chapter_3_3.md b/mdbook/src/chapter_3/chapter_3_3.md index a102b14b4..50c908fbf 100644 --- a/mdbook/src/chapter_3/chapter_3_3.md +++ b/mdbook/src/chapter_3/chapter_3_3.md @@ -3,7 +3,7 @@ Perhaps the most important statement in a timely dataflow program: ```rust,ignore -worker.step() +worker.step()? ``` This is the method that tells the worker that it is now a good time to schedule each of the operators. If you recall, when designing our dataflow we wrote these operators, each of which were programmed by what they would do when shown their input and output handles. This is where we run that code. diff --git a/timely/examples/barrier.rs b/timely/examples/barrier.rs index ff7f38283..d618cd8a8 100644 --- a/timely/examples/barrier.rs +++ b/timely/examples/barrier.rs @@ -27,5 +27,6 @@ fn main() { ) .connect_loop(handle); }); + Ok(()) }).unwrap(); // asserts error-free execution; } diff --git a/timely/examples/bfs.rs b/timely/examples/bfs.rs index 0dc2d6987..f16aec93a 100644 --- a/timely/examples/bfs.rs +++ b/timely/examples/bfs.rs @@ -136,5 +136,6 @@ fn main() { .concat(&(0..1).map(|x| (x,x)).to_stream(scope)) .connect_loop(handle); }); + Ok(()) }).unwrap(); // asserts error-free execution; } diff --git a/timely/examples/capture_recv.rs b/timely/examples/capture_recv.rs index d6f7291c1..9b5df2f12 100644 --- a/timely/examples/capture_recv.rs +++ b/timely/examples/capture_recv.rs @@ -24,6 +24,7 @@ fn main() { replayers .replay_into(scope) .inspect(|x| println!("replayed: {:?}", x)); + Ok(()) }) }).unwrap(); // asserts error-free execution } diff --git a/timely/examples/capture_send.rs b/timely/examples/capture_send.rs index ccabfb238..8c288d185 100644 --- a/timely/examples/capture_send.rs +++ b/timely/examples/capture_send.rs @@ -15,5 +15,6 @@ fn main() { .to_stream(scope) .capture_into(EventWriter::new(send)) ); + Ok(()) }).unwrap(); } diff --git a/timely/examples/distinct.rs b/timely/examples/distinct.rs index 28971384a..ef8f8190a 100644 --- a/timely/examples/distinct.rs +++ b/timely/examples/distinct.rs @@ -48,8 +48,9 @@ fn main() { } input.advance_to(round + 1); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } + Ok(()) }).unwrap(); } diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index ee3b52515..ebaf1c7f6 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -42,11 +42,11 @@ fn main() { inputs[dataflow].advance_to(round); let mut steps = 0; while probes[dataflow].less_than(&round) { - worker.step(); + worker.step()?; steps += 1; } println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); } - + Ok(()) }).unwrap(); } diff --git a/timely/examples/exchange.rs b/timely/examples/exchange.rs index aa105c294..70ae8231a 100644 --- a/timely/examples/exchange.rs +++ b/timely/examples/exchange.rs @@ -30,7 +30,7 @@ fn main() { input.advance_to(round); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } @@ -40,6 +40,6 @@ fn main() { let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0); println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds); - + Ok(()) }).unwrap(); } diff --git a/timely/examples/flow_controlled.rs b/timely/examples/flow_controlled.rs index 9ccbe334b..1b0a4d1a1 100644 --- a/timely/examples/flow_controlled.rs +++ b/timely/examples/flow_controlled.rs @@ -31,5 +31,6 @@ fn main() { .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d)) .probe_with(&mut probe_handle); }); + Ok(()) }).unwrap(); } diff --git a/timely/examples/hashjoin.rs b/timely/examples/hashjoin.rs index ad8ef8809..228273945 100644 --- a/timely/examples/hashjoin.rs +++ b/timely/examples/hashjoin.rs @@ -99,11 +99,11 @@ fn main() { input1.advance_to(next); input2.advance_to(next); while probe.less_than(input1.time()) { - worker.step(); + worker.step()?; } println!("{:?}\tworker {} batch complete", timer.elapsed(), index) } - + Ok(()) }).unwrap(); // asserts error-free execution; } diff --git a/timely/examples/hello.rs b/timely/examples/hello.rs index 7b84f8ccf..f21066466 100644 --- a/timely/examples/hello.rs +++ b/timely/examples/hello.rs @@ -26,8 +26,9 @@ fn main() { } input.advance_to(round + 1); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } + Ok(()) }).unwrap(); } diff --git a/timely/examples/logging-recv.rs b/timely/examples/logging-recv.rs index 1c2a5ccda..723748d6e 100644 --- a/timely/examples/logging-recv.rs +++ b/timely/examples/logging-recv.rs @@ -27,6 +27,7 @@ fn main() { replayers .replay_into(scope) .inspect(|x| println!("replayed: {:?}", x)); - }) + }); + Ok(()) }).unwrap(); // asserts error-free execution } diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 23228f4e8..b5491046a 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -82,7 +82,7 @@ fn main() { input_logger.log(()); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } @@ -92,6 +92,6 @@ fn main() { let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0); println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds); - + Ok(()) }).unwrap(); } diff --git a/timely/examples/loopdemo.rs b/timely/examples/loopdemo.rs index f0fcc934e..c0fda5e78 100644 --- a/timely/examples/loopdemo.rs +++ b/timely/examples/loopdemo.rs @@ -97,7 +97,7 @@ fn main() { inserted_ns = target_ns; } - worker.step(); + worker.step()?; } // Report observed latency measurements. @@ -120,6 +120,6 @@ fn main() { println!("{}\t{}", latency, fraction); } } - + Ok(()) }).unwrap(); -} \ No newline at end of file +} diff --git a/timely/examples/openloop.rs b/timely/examples/openloop.rs index b027d2cee..ca49ae4c7 100644 --- a/timely/examples/openloop.rs +++ b/timely/examples/openloop.rs @@ -16,7 +16,7 @@ fn main() { let peers = worker.peers(); // re-synchronize all workers (account for start-up). - timely::synchronization::Barrier::new(worker).wait(); + timely::synchronization::Barrier::new(worker).wait()?; let timer = std::time::Instant::now(); @@ -96,7 +96,7 @@ fn main() { inserted_ns = target_ns; } - worker.step(); + worker.step()?; } // Report observed latency measurements. @@ -119,6 +119,6 @@ fn main() { println!("{}\t{}", latency, fraction); } } - + Ok(()) }).unwrap(); } diff --git a/timely/examples/pagerank.rs b/timely/examples/pagerank.rs index 0a2da2351..fb26c219e 100644 --- a/timely/examples/pagerank.rs +++ b/timely/examples/pagerank.rs @@ -171,7 +171,7 @@ fn main() { input.advance_to(1); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } for i in 1 .. 1000 { @@ -179,10 +179,10 @@ fn main() { input.send(((rng2.gen_range(0..nodes), rng2.gen_range(0..nodes)), -1)); input.advance_to(i + 1); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } - + Ok(()) }).unwrap(); // asserts error-free execution; } diff --git a/timely/examples/pingpong.rs b/timely/examples/pingpong.rs index 91de9f87d..b0316af89 100644 --- a/timely/examples/pingpong.rs +++ b/timely/examples/pingpong.rs @@ -22,5 +22,6 @@ fn main() { .branch_when(move |t| t < &iterations).1 .connect_loop(helper); }); + Ok(()) }).unwrap(); } diff --git a/timely/examples/rc.rs b/timely/examples/rc.rs index 18f6fe972..f11bb2279 100644 --- a/timely/examples/rc.rs +++ b/timely/examples/rc.rs @@ -34,7 +34,8 @@ fn main() { for round in 0..10 { input.send(Test { field: Rc::new(round) } ); input.advance_to(round + 1); - worker.step_while(|| probe.less_than(input.time())); + worker.step_while(|| probe.less_than(input.time()))?; } + Ok(()) }).unwrap(); } diff --git a/timely/examples/sequence.rs b/timely/examples/sequence.rs index 7cf318933..f12a704e4 100644 --- a/timely/examples/sequence.rs +++ b/timely/examples/sequence.rs @@ -19,8 +19,8 @@ fn main() { while let Some(element) = sequencer.next() { println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element); } - worker.step(); + worker.step()?; } - + Ok(()) }).unwrap(); // asserts error-free execution; } diff --git a/timely/examples/threadless.rs b/timely/examples/threadless.rs index b5fc0da7b..b6f2e61b8 100644 --- a/timely/examples/threadless.rs +++ b/timely/examples/threadless.rs @@ -4,7 +4,7 @@ use timely::dataflow::{InputHandle, ProbeHandle}; use timely::dataflow::operators::{Inspect, Probe}; use timely::WorkerConfig; -fn main() { +fn main() -> timely::Result<()> { // create a naked single-threaded worker. let allocator = timely::communication::allocator::Thread::new(); @@ -27,7 +27,8 @@ fn main() { input.send(i); input.advance_to(i); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } -} \ No newline at end of file + Ok(()) +} diff --git a/timely/examples/unionfind.rs b/timely/examples/unionfind.rs index ab3cfa814..be52e5811 100644 --- a/timely/examples/unionfind.rs +++ b/timely/examples/unionfind.rs @@ -42,11 +42,11 @@ fn main() { let next = input.epoch() + 1; input.advance_to(next); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } } - + Ok(()) }).unwrap(); // asserts error-free execution; } diff --git a/timely/examples/unordered_input.rs b/timely/examples/unordered_input.rs index fb48fc1bb..2e4e80488 100644 --- a/timely/examples/unordered_input.rs +++ b/timely/examples/unordered_input.rs @@ -16,7 +16,8 @@ fn main() { for round in 0..10 { input.session(cap.clone()).give(round); cap = cap.delayed(&(round + 1)); - worker.step(); + worker.step()?; } + Ok(()) }).unwrap(); } diff --git a/timely/examples/wordcount.rs b/timely/examples/wordcount.rs index a92ee3b6d..0c2171a32 100644 --- a/timely/examples/wordcount.rs +++ b/timely/examples/wordcount.rs @@ -60,8 +60,9 @@ fn main() { input.send(("round".to_owned(), 1)); input.advance_to(round + 1); while probe.less_than(input.time()) { - worker.step(); + worker.step()?; } } + Ok(()) }).unwrap(); } diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 84303a039..74c11b11e 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -46,13 +46,13 @@ impl Message { /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher /// leaves in place, or the container's default element. #[inline] - pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) { + pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) -> crate::Result<()> { let data = ::std::mem::take(buffer); let message = Message::new(time, data, 0, 0); let mut bundle = Some(BundleCore::from_typed(message)); - pusher.push(&mut bundle); + pusher.push(&mut bundle)?; if let Some(message) = bundle { if let Some(message) = message.if_typed() { @@ -60,5 +60,6 @@ impl Message { buffer.clear(); } } + Ok(()) } } diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 976023a19..aab9005cd 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -118,7 +118,7 @@ impl>> LogPusher { impl>> Push> for LogPusher { #[inline] - fn push(&mut self, pair: &mut Option>) { + fn push(&mut self, pair: &mut Option>) -> crate::Result<()> { if let Some(bundle) = pair { self.counter += 1; @@ -141,7 +141,7 @@ impl>> Push> for LogP } } - self.pusher.push(pair); + self.pusher.push(pair) } } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 6f92dabb0..23609624b 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -1,6 +1,9 @@ //! Buffering and session mechanisms to provide the appearance of record-at-a-time sending, //! with the performance of batched sends. +use std::cell::RefCell; +use std::rc::Rc; +use timely_communication::err::CommError; use crate::dataflow::channels::{Bundle, BundleCore, Message}; use crate::progress::Timestamp; use crate::dataflow::operators::Capability; @@ -18,6 +21,8 @@ pub struct BufferCore>> { /// a buffer for records, to send at self.time buffer: D, pusher: P, + /// An error produced by a pusher. + error: Rc>>, } /// A buffer specialized to vector-based containers. @@ -26,11 +31,12 @@ pub type Buffer = BufferCore, P>; impl>> BufferCore where T: Eq+Clone { /// Creates a new `Buffer`. - pub fn new(pusher: P) -> Self { + pub fn new(pusher: P, error: Rc>>) -> Self { Self { time: None, buffer: Default::default(), pusher, + error, } } @@ -58,14 +64,18 @@ impl>> BufferCore where T: Eq /// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush. pub fn cease(&mut self) { self.flush(); - self.pusher.push(&mut None); + if let Err(e) = self.pusher.push(&mut None) { + *self.error.borrow_mut() = Some(e); + } } /// moves the contents of fn flush(&mut self) { if !self.buffer.is_empty() { let time = self.time.as_ref().unwrap().clone(); - Message::push_at(&mut self.buffer, time, &mut self.pusher); + if let Err(e) = Message::push_at(&mut self.buffer, time, &mut self.pusher) { + *self.error.borrow_mut() = Some(e); + } } } @@ -75,7 +85,9 @@ impl>> BufferCore where T: Eq self.flush(); let time = self.time.as_ref().expect("Buffer::give_container(): time is None.").clone(); - Message::push_at(vector, time, &mut self.pusher); + if let Err(e) = Message::push_at(vector, time, &mut self.pusher) { + *self.error.borrow_mut() = Some(e); + } } } @@ -99,7 +111,9 @@ impl>> Buffer where T: Eq+Clone { self.flush(); let time = self.time.as_ref().expect("Buffer::give_vec(): time is None.").clone(); - Message::push_at(vector, time, &mut self.pusher); + if let Err(e) = Message::push_at(vector, time, &mut self.pusher) { + *self.error.borrow_mut() = Some(e); + } } } diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index 59ccacf32..13739d4b1 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -22,15 +22,16 @@ pub type Counter = CounterCore, P>; impl Push> for CounterCore where P: Push> { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) -> crate::Result<()>{ if let Some(message) = message { self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64); } // only propagate `None` if dirty (indicates flush) if message.is_some() || !self.produced.borrow_mut().is_empty() { - self.pushee.push(message); + self.pushee.push(message)?; } + Ok(()) } } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 9ea271d31..c38c542b6 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -31,12 +31,13 @@ impl>, H: FnMut(&D) -> } } #[inline] - fn flush(&mut self, index: usize) { + fn flush(&mut self, index: usize) -> crate::Result<()> { if !self.buffers[index].is_empty() { if let Some(ref time) = self.current { - Message::push_at(&mut self.buffers[index], time.clone(), &mut self.pushers[index]); + Message::push_at(&mut self.buffers[index], time.clone(), &mut self.pushers[index])?; } } + Ok(()) } } @@ -45,10 +46,10 @@ where C: PushPartitioned { #[inline(never)] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) -> crate::Result<()> { // if only one pusher, no exchange if self.pushers.len() == 1 { - self.pushers[0].push(message); + self.pushers[0].push(message)?; } else if let Some(message) = message { @@ -59,7 +60,7 @@ where // if the time isn't right, flush everything. if self.current.as_ref().map_or(false, |x| x != time) { for index in 0..self.pushers.len() { - self.flush(index); + self.flush(index)?; } } self.current = Some(time.clone()); @@ -74,9 +75,9 @@ where &mut self.buffers, move |datum| ((hash_func)(datum) & mask) as usize, |index, buffer| { - Message::push_at(buffer, time.clone(), &mut pushers[index]); + Message::push_at(buffer, time.clone(), &mut pushers[index]) } - ); + )?; } // as a last resort, use mod (%) else { @@ -86,18 +87,19 @@ where &mut self.buffers, move |datum| ((hash_func)(datum) % num_pushers) as usize, |index, buffer| { - Message::push_at(buffer, time.clone(), &mut pushers[index]); + Message::push_at(buffer, time.clone(), &mut pushers[index]) } - ); + )?; } } else { // flush for index in 0..self.pushers.len() { - self.flush(index); - self.pushers[index].push(&mut None); + self.flush(index)?; + self.pushers[index].push(&mut None)?; } } + Ok(()) } } diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 1ec05e4e7..d83e32c40 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -22,23 +22,24 @@ pub type Tee = TeeCore>; impl Push> for TeeCore { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) -> crate::Result<()> { let mut pushers = self.shared.borrow_mut(); if let Some(message) = message { for index in 1..pushers.len() { self.buffer.clone_from(&message.data); - Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1]); + Message::push_at(&mut self.buffer, message.time.clone(), &mut pushers[index-1])?; } } else { for index in 1..pushers.len() { - pushers[index-1].push(&mut None); + pushers[index-1].push(&mut None)?; } } if pushers.len() > 0 { let last = pushers.len() - 1; - pushers[last].push(message); + pushers[last].push(message)?; } + Ok(()) } } diff --git a/timely/src/dataflow/operators/capture/capture.rs b/timely/src/dataflow/operators/capture/capture.rs index c62b95417..65c71c9b2 100644 --- a/timely/src/dataflow/operators/capture/capture.rs +++ b/timely/src/dataflow/operators/capture/capture.rs @@ -54,6 +54,7 @@ pub trait Capture { /// handle2.replay_into(scope2) /// .capture_into(send) /// }); + /// Ok(()) /// }).unwrap(); /// /// assert_eq!(recv.extract()[0].1, (0..10).collect::>()); @@ -99,6 +100,7 @@ pub trait Capture { /// .replay_into(scope2) /// .capture_into(send0) /// }); + /// Ok(()) /// }).unwrap(); /// /// assert_eq!(recv0.extract()[0].1, (0..10).collect::>()); @@ -131,7 +133,7 @@ impl Capture for StreamCore { if !progress.frontiers[0].is_empty() { // transmit any frontier progress. let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new()); - event_pusher.push(EventCore::Progress(to_send.into_inner())); + event_pusher.push(EventCore::Progress(to_send.into_inner()))?; } use crate::communication::message::RefOrMut; @@ -143,10 +145,10 @@ impl Capture for StreamCore { RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)), }; let vector = data.replace(Default::default()); - event_pusher.push(EventCore::Messages(time.clone(), vector)); + event_pusher.push(EventCore::Messages(time.clone(), vector))?; } input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]); - false + Ok(false) } ); } diff --git a/timely/src/dataflow/operators/capture/event.rs b/timely/src/dataflow/operators/capture/event.rs index 252b7361c..2650b190f 100644 --- a/timely/src/dataflow/operators/capture/event.rs +++ b/timely/src/dataflow/operators/capture/event.rs @@ -43,7 +43,7 @@ impl>> EventIterator for E { /// Receives `EventCore` events. pub trait EventPusherCore { /// Provides a new `Event` to the pusher. - fn push(&mut self, event: EventCore); + fn push(&mut self, event: EventCore) -> crate::Result<()>; } /// A [EventPusherCore] specialized to vector-based containers. @@ -54,10 +54,11 @@ impl>> EventPusher for E {} // implementation for the linked list behind a `Handle`. impl EventPusherCore for ::std::sync::mpsc::Sender> { - fn push(&mut self, event: EventCore) { + fn push(&mut self, event: EventCore) -> crate::Result<()> { // NOTE: An Err(x) result just means "data not accepted" most likely // because the receiver is gone. No need to panic. let _ = self.send(event); + Ok(()) } } @@ -92,10 +93,11 @@ pub mod link { // implementation for the linked list behind a `Handle`. impl EventPusherCore for Rc> { - fn push(&mut self, event: EventCore) { + fn push(&mut self, event: EventCore) -> crate::Result<()> { *self.next.borrow_mut() = Some(Rc::new(EventLinkCore { event: Some(event), next: RefCell::new(None) })); let next = self.next.borrow().as_ref().unwrap().clone(); *self = next; + Ok(()) } } @@ -131,12 +133,13 @@ pub mod link { } #[test] - fn avoid_stack_overflow_in_drop() { + fn avoid_stack_overflow_in_drop() -> crate::Result<()> { let mut event1 = Rc::new(EventLinkCore::<(),()>::new()); let _event2 = event1.clone(); for _ in 0 .. 1_000_000 { - event1.push(EventCore::Progress(vec![])); + event1.push(EventCore::Progress(vec![]))?; } + Ok(()) } } @@ -167,9 +170,9 @@ pub mod binary { } impl EventPusherCore for EventWriterCore { - fn push(&mut self, event: EventCore) { - // TODO: `push` has no mechanism to report errors, so we `unwrap`. - unsafe { ::abomonation::encode(&event, &mut self.stream).expect("Event abomonation/write failed"); } + fn push(&mut self, event: EventCore) -> crate::Result<()> { + unsafe { ::abomonation::encode(&event, &mut self.stream)? }; + Ok(()) } } diff --git a/timely/src/dataflow/operators/capture/extract.rs b/timely/src/dataflow/operators/capture/extract.rs index dcf57ae3b..90ed5f767 100644 --- a/timely/src/dataflow/operators/capture/extract.rs +++ b/timely/src/dataflow/operators/capture/extract.rs @@ -42,6 +42,7 @@ pub trait Extract { /// handle2.replay_into(scope2) /// .capture_into(send) /// }); + /// Ok(()) /// }).unwrap(); /// /// assert_eq!(recv.extract()[0].1, (0..10).collect::>()); @@ -110,6 +111,7 @@ pub trait ExtractCore { /// handle2.replay_into(scope2) /// .capture_into(send) /// }); + /// Ok(()) /// }).unwrap(); /// /// assert_eq!(recv.extract_core().into_iter().flat_map(|x| x.1).collect::>(), (0..10).collect::>()); diff --git a/timely/src/dataflow/operators/capture/mod.rs b/timely/src/dataflow/operators/capture/mod.rs index 22d332ea0..83d0b8e35 100644 --- a/timely/src/dataflow/operators/capture/mod.rs +++ b/timely/src/dataflow/operators/capture/mod.rs @@ -36,6 +36,7 @@ //! worker.dataflow(|scope2| { //! handle2.replay_into(scope2) //! .inspect(|x| println!("replayed: {:?}", x)); +//! Ok(()) //! }) //! }).unwrap(); //! ``` @@ -69,6 +70,7 @@ //! Some(EventReader::<_,u64,_>::new(recv)) //! .replay_into(scope2) //! .inspect(|x| println!("replayed: {:?}", x)); +//! Ok(()) //! }) //! }).unwrap(); //! ``` diff --git a/timely/src/dataflow/operators/capture/replay.rs b/timely/src/dataflow/operators/capture/replay.rs index 2cc325419..739152cc1 100644 --- a/timely/src/dataflow/operators/capture/replay.rs +++ b/timely/src/dataflow/operators/capture/replay.rs @@ -38,6 +38,8 @@ //! allowing the replay to occur in a timely dataflow computation with more or fewer workers //! than that in which the stream was captured. +use std::cell::RefCell; +use std::rc::Rc; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pushers::CounterCore as PushCounter; use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; @@ -74,13 +76,17 @@ where I : IntoIterator, let (targets, stream) = builder.new_output(); - let mut output = PushBuffer::new(PushCounter::new(targets)); + let error = Rc::new(RefCell::new(None)); + let mut output = PushBuffer::new(PushCounter::new(targets), Rc::clone(&error)); let mut event_streams = self.into_iter().collect::>(); let mut started = false; let mut allocation: C = Default::default(); builder.build( move |progress| { + if let Some(error) = error.borrow_mut().take() { + return Err(error); + } if !started { // The first thing we do is modify our capabilities to match the number of streams we manage. @@ -112,7 +118,7 @@ where I : IntoIterator, output.cease(); output.inner().produced().borrow_mut().drain_into(&mut progress.produceds[0]); - false + Ok(false) } ); diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index bd9b1f86a..9e59e857b 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -25,7 +25,7 @@ use crate::progress::Timestamp; use crate::progress::timestamp::Refines; use crate::progress::{Source, Target}; use crate::order::Product; -use crate::{Container, Data}; +use crate::{Container, Data, Result}; use crate::communication::Push; use crate::dataflow::channels::pushers::{CounterCore, TeeCore}; use crate::dataflow::channels::{BundleCore, Message}; @@ -150,12 +150,12 @@ struct IngressNub, TData: C } impl, TData: Container> Push> for IngressNub { - fn push(&mut self, element: &mut Option>) { + fn push(&mut self, element: &mut Option>) -> Result<()> { if let Some(message) = element { let outer_message = message.as_mut(); let data = ::std::mem::take(&mut outer_message.data); let mut inner_message = Some(BundleCore::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); - self.targets.push(&mut inner_message); + self.targets.push(&mut inner_message)?; if let Some(inner_message) = inner_message { if let Some(inner_message) = inner_message.if_typed() { outer_message.data = inner_message.data; @@ -168,8 +168,9 @@ impl, TData: Container> Pus self.activator.activate(); self.active = false; } - self.targets.done(); + self.targets.done()?; } + Ok(()) } } @@ -181,19 +182,20 @@ struct EgressNub, TData: Da impl Push> for EgressNub where TOuter: Timestamp, TInner: Timestamp+Refines, TData: Data { - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) -> Result<()> { if let Some(message) = message { let inner_message = message.as_mut(); let data = ::std::mem::take(&mut inner_message.data); let mut outer_message = Some(BundleCore::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); - self.targets.push(&mut outer_message); + self.targets.push(&mut outer_message)?; if let Some(outer_message) = outer_message { if let Some(outer_message) = outer_message.if_typed() { inner_message.data = outer_message.data; } } } - else { self.targets.done(); } + else { self.targets.done()?; } + Ok(()) } } @@ -239,9 +241,10 @@ mod test { } input.advance_to(round + 1); while probe.less_than(input.time()) { - worker.step_or_park(None); + worker.step_or_park(None)?; } } + Ok(()) }).unwrap(); } diff --git a/timely/src/dataflow/operators/flow_controlled.rs b/timely/src/dataflow/operators/flow_controlled.rs index 4f01bd802..34e856352 100644 --- a/timely/src/dataflow/operators/flow_controlled.rs +++ b/timely/src/dataflow/operators/flow_controlled.rs @@ -69,6 +69,7 @@ pub struct IteratorSourceInput, I: I /// .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d)) /// .probe_with(&mut probe_handle); /// }); +/// Ok(()) /// }).unwrap(); /// } /// ``` diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 8e97492af..0e5dc8598 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -156,7 +156,7 @@ impl OperatorBuilder { /// Creates an operator implementation from supplied logic constructor. pub fn build(mut self, logic: L) where - L: FnMut(&mut SharedProgress)->bool+'static + L: FnMut(&mut SharedProgress) -> crate::Result+'static { let inputs = self.shape.inputs; let outputs = self.shape.outputs; @@ -182,7 +182,7 @@ impl OperatorBuilder { struct OperatorCore where T: Timestamp, - L: FnMut(&mut SharedProgress)->bool+'static, + L: FnMut(&mut SharedProgress)->crate::Result+'static, { shape: OperatorShape, address: Vec, @@ -195,11 +195,11 @@ where impl Schedule for OperatorCore where T: Timestamp, - L: FnMut(&mut SharedProgress)->bool+'static, + L: FnMut(&mut SharedProgress)->crate::Result+'static, { fn name(&self) -> &str { &self.shape.name } fn path(&self) -> &[usize] { &self.address[..] } - fn schedule(&mut self) -> bool { + fn schedule(&mut self) -> crate::Result { let shared_progress = &mut *self.shared_progress.borrow_mut(); (self.logic)(shared_progress) } @@ -208,7 +208,7 @@ where impl Operate for OperatorCore where T: Timestamp, - L: FnMut(&mut SharedProgress)->bool+'static, + L: FnMut(&mut SharedProgress)->crate::Result+'static, { fn inputs(&self) -> usize { self.shape.inputs } fn outputs(&self) -> usize { self.shape.outputs } @@ -230,9 +230,10 @@ where } // initialize self.frontier antichains as indicated by hosting scope. - fn set_external_summary(&mut self) { + fn set_external_summary(&mut self) -> crate::Result<()> { // should we schedule the operator here, or just await the first invocation? - self.schedule(); + self.schedule()?; + Ok(()) } fn notify_me(&self) -> bool { self.shape.notify } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index ca80e3182..65dbcfdda 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -3,6 +3,7 @@ use std::rc::Rc; use std::cell::RefCell; use std::default::Default; +use timely_communication::err::CommError; use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::operate::SharedProgress; @@ -35,6 +36,7 @@ pub struct OperatorBuilder { summaries: Vec::Summary>>>>>, produced: Vec>>>, logging: Option, + error: Rc>>, } impl OperatorBuilder { @@ -50,6 +52,7 @@ impl OperatorBuilder { summaries: Vec::new(), produced: Vec::new(), logging, + error: Rc::new(RefCell::new(None)), } } @@ -112,7 +115,7 @@ impl OperatorBuilder { let internal = Rc::new(RefCell::new(ChangeBatch::new())); self.internal.borrow_mut().push(internal.clone()); - let mut buffer = PushBuffer::new(PushCounter::new(tee)); + let mut buffer = PushBuffer::new(PushCounter::new(tee), Rc::clone(&self.error)); self.produced.push(buffer.inner().produced().clone()); for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) { @@ -159,6 +162,7 @@ impl OperatorBuilder { let self_consumed = self.consumed; let self_internal = self.internal; let self_produced = self.produced; + let self_error = self.error; let raw_logic = move |progress: &mut SharedProgress| { @@ -188,7 +192,11 @@ impl OperatorBuilder { produced.borrow_mut().drain_into(progress); } - result + if let Some(err) = self_error.borrow_mut().take() { + Err(err) + } else { + Ok(result) + } }; self.builder.build(raw_logic); diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 6c8dd8ea9..ff27f1e87 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -230,6 +230,7 @@ fn notificator_delivers_notifications_in_topo_order() { /// } /// in1.close(); /// in2.close(); +/// Ok(()) /// }).unwrap(); /// ``` #[derive(Debug)] diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index afb7a25d1..2f98d9ed2 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -183,6 +183,7 @@ pub trait Operator { /// in2.send(i - 1); /// in2.advance_to(i); /// } + /// Ok(()) /// }).unwrap(); /// ``` fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore @@ -239,6 +240,7 @@ pub trait Operator { /// in2.send(i - 1); /// in2.advance_to(i); /// } + /// Ok(()) /// }).unwrap(); /// ``` fn binary_notify(&mut self) -> (Handle<::Timestamp, D>, Stream); @@ -89,9 +91,10 @@ pub trait Input : Scope { /// for round in 0..10 { /// input.send(round); /// input.advance_to(round + 1); - /// worker.step(); + /// worker.step()?; /// } - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore); @@ -121,9 +124,10 @@ pub trait Input : Scope { /// for round in 0..10 { /// input.send(round); /// input.advance_to(round + 1); - /// worker.step(); + /// worker.step()?; /// } - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream; @@ -153,9 +157,10 @@ pub trait Input : Scope { /// for round in 0..10 { /// input.send(round); /// input.advance_to(round + 1); - /// worker.step(); + /// worker.step()?; /// } - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore; } @@ -200,6 +205,7 @@ impl Input for G where ::Timestamp: TotalOrder { progress, messages: produced, copies, + error: Rc::clone(&handle.error), }), index); StreamCore::new(Source::new(index, 0), registrar, self.clone()) @@ -214,6 +220,7 @@ struct Operator { progress: Rc>>, // times closed since last asked messages: Rc>>, // messages sent since last asked copies: usize, + error: Rc>>, } impl Schedule for Operator { @@ -222,11 +229,15 @@ impl Schedule for Operator { fn path(&self) -> &[usize] { &self.address[..] } - fn schedule(&mut self) -> bool { + fn schedule(&mut self) -> Result { + // Report any pending error + if let Some(err) = self.error.borrow_mut().take() { + return Err(err); + } let shared_progress = &mut *self.shared_progress.borrow_mut(); self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]); self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]); - false + Ok(false) } } @@ -253,6 +264,7 @@ pub struct HandleCore { buffer1: C, buffer2: C, now_at: T, + error: Rc>>, } /// A handle specialized to vector-based containers. @@ -281,9 +293,10 @@ impl HandleCore { /// for round in 0..10 { /// input.send(round); /// input.advance_to(round + 1); - /// worker.step(); + /// worker.step()?; /// } - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn new() -> Self { Self { @@ -293,6 +306,7 @@ impl HandleCore { buffer1: Default::default(), buffer2: Default::default(), now_at: T::minimum(), + error: Rc::new(RefCell::new(None)), } } @@ -318,9 +332,10 @@ impl HandleCore { /// for round in 0..10 { /// input.send(round); /// input.advance_to(round + 1); - /// worker.step(); + /// worker.step()?; /// } - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn to_stream(&mut self, scope: &mut G) -> StreamCore where @@ -353,11 +368,15 @@ impl HandleCore { for index in 0 .. self.pushers.len() { if index < self.pushers.len() - 1 { self.buffer2.clone_from(&self.buffer1); - Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); + if let Err(e) = Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]) { + *self.error.borrow_mut() = Some(e); + } debug_assert!(self.buffer2.is_empty()); } else { - Message::push_at(&mut self.buffer1, self.now_at.clone(), &mut self.pushers[index]); + if let Err(e) = Message::push_at(&mut self.buffer1, self.now_at.clone(), &mut self.pushers[index]) { + *self.error.borrow_mut() = Some(e); + } debug_assert!(self.buffer1.is_empty()); } } @@ -368,7 +387,9 @@ impl HandleCore { fn close_epoch(&mut self) { if !self.buffer1.is_empty() { self.flush(); } for pusher in self.pushers.iter_mut() { - pusher.done(); + if let Err(e) = pusher.done() { + *self.error.borrow_mut() = Some(e); + } } for progress in self.progress.iter() { progress.borrow_mut().update(self.now_at.clone(), -1); @@ -403,9 +424,10 @@ impl HandleCore { /// for round in 0..10 { /// input.send_batch(&mut vec![format!("{}", round)]); /// input.advance_to(round + 1); - /// worker.step(); + /// worker.step()?; /// } - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn send_batch(&mut self, buffer: &mut D) { @@ -417,11 +439,15 @@ impl HandleCore { for index in 0 .. self.pushers.len() { if index < self.pushers.len() - 1 { self.buffer2.clone_from(&buffer); - Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); + if let Err(e) = Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]) { + *self.error.borrow_mut() = Some(e); + } assert!(self.buffer2.is_empty()); } else { - Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]); + if let Err(e) = Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]) { + *self.error.borrow_mut() = Some(e); + } assert!(buffer.is_empty()); } } @@ -487,9 +513,10 @@ impl Handle { /// for round in 0..10 { /// input.send(round); /// input.advance_to(round + 1); - /// worker.step(); + /// worker.step()?; /// } - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn send(&mut self, data: D) { // assert!(self.buffer1.capacity() == Message::::default_length()); diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 7c5a8567e..5499fa25a 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -40,8 +40,9 @@ pub trait Probe { /// for round in 0..10 { /// input.send(round); /// input.advance_to(round + 1); - /// worker.step_while(|| probe.less_than(input.time())); + /// worker.step_while(|| probe.less_than(input.time()))?; /// } + /// Ok(()) /// }).unwrap(); /// ``` fn probe(&self) -> Handle; @@ -72,8 +73,9 @@ pub trait Probe { /// for round in 0..10 { /// input.send(round); /// input.advance_to(round + 1); - /// worker.step_while(|| probe.less_than(input.time())); + /// worker.step_while(|| probe.less_than(input.time()))?; /// } + /// Ok(()) /// }).unwrap(); /// ``` fn probe_with(&self, handle: &mut Handle) -> StreamCore; @@ -92,7 +94,8 @@ impl Probe for StreamCore { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); let (tee, stream) = builder.new_output(); - let mut output = PushBuffer::new(PushCounter::new(tee)); + let error = Rc::new(RefCell::new(None)); + let mut output = PushBuffer::new(PushCounter::new(tee), Rc::clone(&error)); let shared_frontier = handle.frontier.clone(); let mut started = false; @@ -128,7 +131,11 @@ impl Probe for StreamCore { input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]); output.inner().produced().borrow_mut().drain_into(&mut progress.produceds[0]); - false + if let Some(err) = error.borrow_mut().take() { + Err(err) + } else { + Ok(false) + } }, ); @@ -212,18 +219,19 @@ mod tests { assert!(probe.less_equal(&round)); assert!(probe.less_than(&(round + 1))); input.advance_to(round + 1); - worker.step(); + worker.step()?; } // seal the input input.close(); // finish off any remaining work - worker.step(); - worker.step(); - worker.step(); - worker.step(); + worker.step()?; + worker.step()?; + worker.step()?; + worker.step()?; assert!(probe.done()); + Ok(()) }).unwrap(); } diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index c7e600234..1f88402b7 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -2,6 +2,7 @@ use std::rc::Rc; use std::cell::RefCell; +use timely_communication::err::CommError; use crate::Container; use crate::scheduling::{Schedule, ActivateOnDrop}; @@ -11,7 +12,7 @@ use crate::progress::{Operate, operate::SharedProgress, Timestamp}; use crate::progress::Source; use crate::progress::ChangeBatch; -use crate::Data; +use crate::{Data, Result}; use crate::dataflow::channels::pushers::{CounterCore as PushCounter, TeeCore}; use crate::dataflow::channels::pushers::buffer::{BufferCore as PushBuffer, AutoflushSessionCore}; @@ -65,8 +66,9 @@ pub trait UnorderedInput { /// for round in 0..10 { /// input.session(cap.clone()).give(round); /// cap = cap.delayed(&(round + 1)); - /// worker.step(); + /// worker.step()?; /// } + /// Ok(()) /// }).unwrap(); /// /// let extract = recv.extract(); @@ -133,8 +135,9 @@ pub trait UnorderedInputCore { /// for round in 0..10 { /// input.session(cap.clone()).give(round); /// cap = cap.delayed(&(round + 1)); - /// worker.step(); + /// worker.step()?; /// } + /// Ok(()) /// }).unwrap(); /// /// let extract = recv.extract(); @@ -162,8 +165,9 @@ impl UnorderedInputCore for G { address.push(index); let cap = ActivateCapability::new(cap, &address, self.activations()); + let error = Rc::new(RefCell::new(None)); - let helper = UnorderedHandleCore::new(counter); + let helper = UnorderedHandleCore::new(counter, Rc::clone(&error)); self.add_operator_with_index(Box::new(UnorderedOperator { name: "UnorderedInput".to_owned(), @@ -172,6 +176,7 @@ impl UnorderedInputCore for G { internal, produced, peers, + error, }), index); ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone())) @@ -185,16 +190,21 @@ struct UnorderedOperator { internal: Rc>>, produced: Rc>>, peers: usize, + error: Rc>>, } impl Schedule for UnorderedOperator { fn name(&self) -> &str { &self.name } fn path(&self) -> &[usize] { &self.address[..] } - fn schedule(&mut self) -> bool { + fn schedule(&mut self) -> Result { + // Report any pending error + if let Some(err) = self.error.borrow_mut().take() { + return Err(err); + } let shared_progress = &mut *self.shared_progress.borrow_mut(); self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]); self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]); - false + Ok(false) } } @@ -220,14 +230,14 @@ pub struct UnorderedHandleCore { } impl UnorderedHandleCore { - fn new(pusher: PushCounter>) -> UnorderedHandleCore { + fn new(pusher: PushCounter>, error: Rc>>) -> UnorderedHandleCore { UnorderedHandleCore { - buffer: PushBuffer::new(pusher), + buffer: PushBuffer::new(pusher, error), } } /// Allocates a new automatically flushing session based on the supplied capability. - pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { + pub fn session(&mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) } } diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs index 50ee7b8fa..28286adaa 100644 --- a/timely/src/dataflow/scopes/mod.rs +++ b/timely/src/dataflow/scopes/mod.rs @@ -89,7 +89,8 @@ pub trait Scope: ScopeParent { /// }); /// input /// }); - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` fn scoped(&mut self, name: &str, func: F) -> R where @@ -116,7 +117,8 @@ pub trait Scope: ScopeParent { /// }); /// input /// }); - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` fn iterative(&mut self, func: F) -> R where @@ -146,7 +148,8 @@ pub trait Scope: ScopeParent { /// }); /// input /// }); - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` fn region(&mut self, func: F) -> R where @@ -178,7 +181,8 @@ pub trait Scope: ScopeParent { /// }); /// input /// }); - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` fn region_named(&mut self, name: &str, func: F) -> R where diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 3564d3592..b26404a8e 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -3,7 +3,7 @@ use crate::communication::{initialize_from, Allocator, allocator::AllocateBuilder, WorkerGuards}; use crate::dataflow::scopes::Child; use crate::worker::Worker; -use crate::{CommunicationConfig, WorkerConfig}; +use crate::{CommunicationConfig, WorkerConfig, Result}; /// Configures the execution of a timely dataflow computation. pub struct Config { @@ -38,7 +38,7 @@ impl Config { /// This method is only available if the `getopts` feature is enabled, which /// it is by default. #[cfg(feature = "getopts")] - pub fn from_matches(matches: &getopts_dep::Matches) -> Result { + pub fn from_matches(matches: &getopts_dep::Matches) -> std::result::Result { Ok(Config { communication: CommunicationConfig::from_matches(matches)?, worker: WorkerConfig::from_matches(matches)?, @@ -49,7 +49,7 @@ impl Config { /// /// Most commonly, callers supply `std::env::args()` as the iterator. #[cfg(feature = "getopts")] - pub fn from_args>(args: I) -> Result { + pub fn from_args>(args: I) -> std::result::Result { let mut opts = getopts_dep::Options::new(); Config::install_options(&mut opts); let matches = opts.parse(args).map_err(|e| e.to_string())?; @@ -123,7 +123,7 @@ where T: Send+'static, F: FnOnce(&mut Child,u64>)->T+Send+Sync+'static { - crate::execute::execute_directly(|worker| worker.dataflow(|scope| func(scope))) + execute_directly(|worker| worker.dataflow(|scope| func(scope))).unwrap() } @@ -145,9 +145,9 @@ where /// (0..10).to_stream(scope) /// .inspect(|x| println!("seen: {:?}", x)); /// }) -/// }); +/// }).unwrap(); /// ``` -pub fn execute_directly(func: F) -> T +pub fn execute_directly(func: F) -> Result where T: Send+'static, F: FnOnce(&mut Worker)->T+Send+Sync+'static @@ -156,9 +156,9 @@ where let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc); let result = func(&mut worker); while worker.has_dataflows() { - worker.step_or_park(None); + worker.step_or_park(None)?; } - result + Ok(result) } /// Executes a timely dataflow from a configuration and per-communicator logic. @@ -188,7 +188,8 @@ where /// worker.dataflow::<(),_,_>(|scope| { /// (0..10).to_stream(scope) /// .inspect(|x| println!("seen: {:?}", x)); -/// }) +/// }); +/// Ok(()) /// }).unwrap(); /// ``` /// @@ -213,6 +214,7 @@ where /// .inspect(|x| println!("seen: {:?}", x)) /// .capture_into(send); /// }); +/// Ok(()) /// }).unwrap(); /// /// // the extracted data should have data (0..10) thrice at timestamp 0. @@ -221,10 +223,10 @@ where pub fn execute( mut config: Config, func: F -) -> Result,String> +) -> Result>> where T:Send+'static, - F: Fn(&mut Worker)->T+Send+Sync+'static { + F: Fn(&mut Worker)->Result+Send+Sync+'static { if let CommunicationConfig::Cluster { ref mut log_fn, .. } = config.communication { @@ -286,7 +288,7 @@ where let result = func(&mut worker); while worker.has_dataflows() { - worker.step_or_park(None); + worker.step_or_park(None)?; } result }) @@ -331,7 +333,8 @@ where /// worker.dataflow::<(),_,_>(|scope| { /// (0..10).to_stream(scope) /// .inspect(|x| println!("seen: {:?}", x)); -/// }) +/// }); +/// Ok(()) /// }).unwrap(); /// ``` /// ```ignore @@ -348,10 +351,10 @@ where /// host3:port /// ``` #[cfg(feature = "getopts")] -pub fn execute_from_args(iter: I, func: F) -> Result,String> +pub fn execute_from_args(iter: I, func: F) -> Result>> where I: Iterator, T:Send+'static, - F: Fn(&mut Worker)->T+Send+Sync+'static, { + F: Fn(&mut Worker)->Result+Send+Sync+'static, { let config = Config::from_args(iter)?; execute(config, func) } @@ -378,7 +381,7 @@ pub fn execute_from( others: Box, worker_config: WorkerConfig, func: F, -) -> Result, String> +) -> Result>> where A: AllocateBuilder+'static, T: Send+'static, @@ -387,8 +390,8 @@ where let mut worker = Worker::new(worker_config.clone(), allocator); let result = func(&mut worker); while worker.has_dataflows() { - worker.step_or_park(None); + worker.step_or_park(None)?; } - result + Ok(result) }) } diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 96d7c7d99..2d0b22e23 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -40,8 +40,9 @@ //! for round in 0..10 { //! input.send(round); //! input.advance_to(round + 1); -//! worker.step(); +//! worker.step()?; //! } +//! Ok(()) //! }); //! ``` //! @@ -86,6 +87,7 @@ pub mod container { pub mod communication { pub use timely_communication::*; } +pub use communication::Result; /// Re-export of the `timely_bytes` crate. pub mod bytes { diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ca9868fbe..e4914c315 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -32,19 +32,22 @@ impl BatchLogger where P: EventPusher) { if !data.is_empty() { - self.event_pusher.push(Event::Messages(self.time, data.drain(..).collect())); + // We unwrap because we don't have a good way to report errors. + self.event_pusher.push(Event::Messages(self.time, data.drain(..).collect())).unwrap(); } if self.time < time { let new_frontier = time; let old_frontier = self.time; - self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (old_frontier, -1)])); + // We unwrap because we don't have a good way to report errors. + self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (old_frontier, -1)])).unwrap(); } self.time = time; } } impl Drop for BatchLogger where P: EventPusher { fn drop(&mut self) { - self.event_pusher.push(Event::Progress(vec![(self.time, -1)])); + // Ignore errors on drop. + let _ = self.event_pusher.push(Event::Progress(vec![(self.time, -1)])); } } diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 7c0b95dfe..8a03a62c2 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -1,5 +1,6 @@ //! Broadcasts progress information among workers. +use crate::Result; use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::{Location, Port}; use crate::communication::{Message, Push, Pull}; @@ -54,7 +55,7 @@ impl Progcaster { } /// Sends pointstamp changes to all workers. - pub fn send(&mut self, changes: &mut ChangeBatch<(Location, T)>) { + pub fn send(&mut self, changes: &mut ChangeBatch<(Location, T)>) -> Result<()> { changes.compact(); if !changes.is_empty() { @@ -108,13 +109,14 @@ impl Progcaster { } // TODO: This should probably use a broadcast channel. - pusher.push(&mut self.to_push); - pusher.done(); + pusher.push(&mut self.to_push)?; + pusher.done()?; } self.counter += 1; changes.clear(); } + Ok(()) } /// Receives pointstamp changes from all workers. diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 28fd10ae8..5c8bed84e 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -52,7 +52,7 @@ pub trait Operate : Schedule { /// of the shared progress state. An operator should be able to consult `frontiers` at any /// point and read out the current frontier information, or the changes from the last time /// that `frontiers` was drained. - fn set_external_summary(&mut self) { } + fn set_external_summary(&mut self) -> crate::Result<()> { Ok(()) } /// Indicates of whether the operator requires `push_external_progress` information or not. fn notify_me(&self) -> bool { true } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index f41fee190..4583dca98 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -10,6 +10,8 @@ use std::cell::RefCell; use std::collections::BinaryHeap; use std::cmp::Reverse; +use crate::Result; + use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; @@ -277,7 +279,7 @@ where fn path(&self) -> &[usize] { &self.path } - fn schedule(&mut self) -> bool { + fn schedule(&mut self) -> Result { // This method performs several actions related to progress tracking // and child operator scheduling. The actions have been broken apart @@ -309,13 +311,13 @@ where // De-duplicate, and don't revisit. if index > previous { // TODO: This is a moment where a scheduling decision happens. - self.activate_child(index); + self.activate_child(index)?; previous = index; } } // Transmit produced progress updates. - self.send_progress(); + self.send_progress()?; // If child scopes surface more final pointstamp updates we must re-execute. if !self.final_pointstamp.is_empty() { @@ -326,7 +328,7 @@ where let incomplete = self.incomplete_count > 0; let tracking = self.pointstamp_tracker.tracking_anything(); - incomplete || tracking + Ok(incomplete || tracking) } } @@ -339,11 +341,11 @@ where /// Schedules a child operator and collects progress statements. /// /// The return value indicates that the child task cannot yet shut down. - fn activate_child(&mut self, child_index: usize) -> bool { + fn activate_child(&mut self, child_index: usize) -> Result { let child = &mut self.children[child_index]; - let incomplete = child.schedule(); + let incomplete = child.schedule()?; if incomplete != self.incomplete[child_index] { if incomplete { self.incomplete_count += 1; } @@ -375,7 +377,7 @@ where child.extract_progress(&mut self.final_pointstamp, &mut self.temp_active); } - incomplete + Ok(incomplete) } /// Move frontier changes from parent into progress statements. @@ -507,7 +509,7 @@ where /// /// This method does not guarantee that all of `self.local_pointstamps` are /// sent, but that no blocking pointstamps remain - fn send_progress(&mut self) { + fn send_progress(&mut self) -> Result<()> { // If we are requested to eagerly send progress updates, or if there are // updates visible in the scope-wide frontier, we must send all updates. @@ -522,8 +524,9 @@ where }; if must_send { - self.progcaster.send(&mut self.local_pointstamp); + self.progcaster.send(&mut self.local_pointstamp)?; } + Ok(()) } } @@ -580,13 +583,15 @@ where (internal_summary, self.shared_progress.clone()) } - fn set_external_summary(&mut self) { + fn set_external_summary(&mut self) -> Result<()> { self.accept_frontier(); self.propagate_pointstamps(); // ensure propagation of input frontiers. - self.children + for op in self.children .iter_mut() - .flat_map(|child| child.operator.as_mut()) - .for_each(|op| op.set_external_summary()); + .flat_map(|child| child.operator.as_mut()) { + op.set_external_summary()?; + } + Ok(()) } } @@ -679,7 +684,7 @@ impl PerOperatorState { } } - pub fn schedule(&mut self) -> bool { + pub fn schedule(&mut self) -> Result { if let Some(ref mut operator) = self.operator { @@ -716,7 +721,7 @@ impl PerOperatorState { } // A closed operator shouldn't keep anything open. - false + Ok(false) } } diff --git a/timely/src/scheduling/mod.rs b/timely/src/scheduling/mod.rs index dc2ea35a0..6ce7e48e8 100644 --- a/timely/src/scheduling/mod.rs +++ b/timely/src/scheduling/mod.rs @@ -5,6 +5,7 @@ use std::cell::RefCell; pub mod activate; +use crate::Result; pub use self::activate::{Activations, Activator, ActivateOnDrop, SyncActivator}; /// A type that can be scheduled. @@ -17,7 +18,7 @@ pub trait Schedule { /// /// The return value indicates whether `self` has outstanding /// work and would be upset if the computation terminated. - fn schedule(&mut self) -> bool; + fn schedule(&mut self) -> Result; } /// Methods for types which schedule fibers. diff --git a/timely/src/synchronization/barrier.rs b/timely/src/synchronization/barrier.rs index e7d1fe460..444bc0b53 100644 --- a/timely/src/synchronization/barrier.rs +++ b/timely/src/synchronization/barrier.rs @@ -1,5 +1,6 @@ //! Barrier synchronization. +use crate::Result; use crate::communication::Allocate; use crate::dataflow::{InputHandle, ProbeHandle}; use crate::worker::Worker; @@ -27,11 +28,12 @@ impl Barrier { /// /// This method does *not* block dataflow execution, which continues /// to execute while we await the arrival of the other workers. - pub fn wait(&mut self) { + pub fn wait(&mut self) -> Result<()> { self.advance(); while !self.reached() { - self.worker.step(); + self.worker.step()?; } + Ok(()) } /// Advances this worker to the next barrier stage. @@ -51,4 +53,3 @@ impl Barrier { !self.probe.less_than(self.input.time()) } } - diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 7525d3389..4d15c0a7d 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -79,13 +79,14 @@ impl Sequencer { /// sequencer.push(format!("worker {:?}, round {:?}", worker.index(), round)); /// /// // Ensures the pushed string is sent. - /// worker.step(); + /// worker.step()?; /// /// // Read out received announcements. /// while let Some(element) = sequencer.next() { /// println!("{:?}:\tWorker {:?}:\t recv'd: {:?}", timer.elapsed(), worker.index(), element); /// } /// } + /// Ok(()) /// }).expect("Timely computation did not complete correctly."); /// ``` pub fn new(worker: &mut Worker, timer: Instant) -> Self { @@ -233,4 +234,4 @@ impl Drop for Sequencer { .expect("Sequencer.activator unavailable") .activate() } -} \ No newline at end of file +} diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 153f5c775..c5ea57827 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -17,6 +17,7 @@ use crate::progress::SubgraphBuilder; use crate::progress::operate::Operate; use crate::dataflow::scopes::Child; use crate::logging::TimelyLogger; +use crate::Result; /// Different ways in which timely's progress tracking can work. /// @@ -73,11 +74,11 @@ impl Default for ProgressMode { impl FromStr for ProgressMode { type Err = String; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> std::result::Result { match s { "eager" => Ok(ProgressMode::Eager), "demand" => Ok(ProgressMode::Demand), - _ => Err(format!("unknown progress mode: {}", s)), + _ => Err(format!("unknown progress mode: {s}")), } } } @@ -115,7 +116,7 @@ impl Config { /// This method is only available if the `getopts` feature is enabled, which /// it is by default. #[cfg(feature = "getopts")] - pub fn from_matches(matches: &getopts_dep::Matches) -> Result { + pub fn from_matches(matches: &getopts_dep::Matches) -> std::result::Result { let progress_mode = matches .opt_get_default("progress-mode", ProgressMode::Eager)?; Ok(Config::default().progress_mode(progress_mode)) @@ -141,6 +142,7 @@ impl Config { /// timely::execute(config, |worker| { /// use crate::timely::worker::AsWorker; /// assert_eq!(worker.config().get::("example"), Some(&7)); + /// Ok(()) /// }).unwrap(); /// ``` pub fn set(&mut self, key: String, val: T) -> &mut Self @@ -164,6 +166,7 @@ impl Config { /// timely::execute(config, |worker| { /// use crate::timely::worker::AsWorker; /// assert_eq!(worker.config().get::("example"), Some(&7)); + /// Ok(()) /// }).unwrap(); /// ``` pub fn get(&self, key: &str) -> Option<&T> { @@ -296,10 +299,11 @@ impl Worker { /// .inspect(|x| println!("{:?}", x)); /// }); /// - /// worker.step(); - /// }); + /// worker.step()?; + /// Ok(()) + /// }).unwrap(); /// ``` - pub fn step(&mut self) -> bool { + pub fn step(&mut self) -> Result { self.step_or_park(Some(Duration::from_secs(0))) } @@ -327,14 +331,15 @@ impl Worker { /// .inspect(|x| println!("{:?}", x)); /// }); /// - /// worker.step_or_park(Some(Duration::from_secs(1))); - /// }); + /// worker.step_or_park(Some(Duration::from_secs(1)))?; + /// Ok(()) + /// }).unwrap(); /// ``` - pub fn step_or_park(&mut self, duration: Option) -> bool { + pub fn step_or_park(&mut self, duration: Option) -> Result { { // Process channel events. Activate responders. let mut allocator = self.allocator.borrow_mut(); - allocator.receive(); + allocator.receive()?; let events = allocator.events().clone(); let mut borrow = events.borrow_mut(); let paths = self.paths.borrow(); @@ -393,7 +398,7 @@ impl Worker { // Step dataflow if it exists, remove if not incomplete. if let Entry::Occupied(mut entry) = dataflows.entry(index) { // TODO: This is a moment at which a scheduling decision is being made. - let incomplete = entry.get_mut().step(); + let incomplete = entry.get_mut().step()?; if !incomplete { let mut paths = self.paths.borrow_mut(); for channel in entry.get_mut().channel_ids.drain(..) { @@ -407,8 +412,8 @@ impl Worker { // Clean up, indicate if dataflows remain. self.logging.borrow_mut().flush(); - self.allocator.borrow_mut().release(); - !self.dataflows.borrow().is_empty() + self.allocator.borrow_mut().release()?; + Ok(!self.dataflows.borrow().is_empty()) } /// Calls `self.step()` as long as `func` evaluates to true. @@ -433,10 +438,11 @@ impl Worker { /// .probe() /// }); /// - /// worker.step_while(|| probe.less_than(&0)); - /// }); + /// worker.step_while(|| probe.less_than(&0))?; + /// Ok(()) + /// }).unwrap(); /// ``` - pub fn step_whilebool>(&mut self, func: F) { + pub fn step_whilebool>(&mut self, func: F) -> Result<()> { self.step_or_park_while(Some(Duration::from_secs(0)), func) } @@ -462,11 +468,13 @@ impl Worker { /// .probe() /// }); /// - /// worker.step_or_park_while(None, || probe.less_than(&0)); - /// }); + /// worker.step_or_park_while(None, || probe.less_than(&0))?; + /// Ok(()) + /// }).unwrap(); /// ``` - pub fn step_or_park_whilebool>(&mut self, duration: Option, mut func: F) { - while func() { self.step_or_park(duration); } + pub fn step_or_park_whilebool>(&mut self, duration: Option, mut func: F) -> Result<()> { + while func() { self.step_or_park(duration)?; } + Ok(()) } /// The index of the worker out of its peers. @@ -480,8 +488,8 @@ impl Worker { /// let timer = worker.timer(); /// /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers); - /// - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn index(&self) -> usize { self.allocator.borrow().index() } /// The total number of peer workers. @@ -495,8 +503,8 @@ impl Worker { /// let timer = worker.timer(); /// /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers); - /// - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn peers(&self) -> usize { self.allocator.borrow().peers() } @@ -511,8 +519,8 @@ impl Worker { /// let timer = worker.timer(); /// /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers); - /// - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn timer(&self) -> Instant { self.timer } @@ -536,7 +544,8 @@ impl Worker { /// .insert::("timely", |time, data| /// println!("{:?}\t{:?}", time, data) /// ); - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn log_register(&self) -> ::std::cell::RefMut> { self.logging.borrow_mut() @@ -555,7 +564,8 @@ impl Worker { /// // uses of `scope` to build dataflow /// /// }); - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn dataflow(&mut self, func: F) -> R where @@ -579,7 +589,8 @@ impl Worker { /// // uses of `scope` to build dataflow /// /// }); - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn dataflow_named(&mut self, name: &str, func: F) -> R where @@ -613,7 +624,8 @@ impl Worker { /// /// } /// ); - /// }); + /// Ok(()) + /// }).unwrap(); /// ``` pub fn dataflow_core(&mut self, name: &str, mut logging: Option, mut resources: V, func: F) -> R where @@ -744,15 +756,15 @@ impl Wrapper { /// If the dataflow is incomplete, this call will drop it and its resources, /// dropping the dataflow first and then the resources (so that, e.g., shared /// library bindings will outlive the dataflow). - fn step(&mut self) -> bool { + fn step(&mut self) -> Result { // Perhaps log information about the start of the schedule call. if let Some(l) = self.logging.as_mut() { l.log(crate::logging::ScheduleEvent::start(self.identifier)); } - let incomplete = self.operate.as_mut().map(|op| op.schedule()).unwrap_or(false); - if !incomplete { + let incomplete = self.operate.as_mut().map(|op| op.schedule()).unwrap_or(Ok(false)); + if !matches!(incomplete, Ok(true)) { self.operate = None; self.resources = None; } diff --git a/timely/tests/barrier.rs b/timely/tests/barrier.rs index 9e627762b..d6af0b676 100644 --- a/timely/tests/barrier.rs +++ b/timely/tests/barrier.rs @@ -36,5 +36,6 @@ fn barrier_sync_helper(comm_config: ::timely::CommunicationConfig) { ) .connect_loop(handle); }); + Ok(()) }).unwrap(); // asserts error-free execution; }