Skip to content

Commit df96291

Browse files
committed
Improved (?) error handling for communication and timely
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 6a73600 commit df96291

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+605
-400
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ fn main() {
8282
}
8383
input.advance_to(round + 1);
8484
while probe.less_than(input.time()) {
85-
worker.step();
85+
worker.step()?;
8686
}
8787
}
88+
Ok(())
8889
}).unwrap();
8990
}
9091
```

communication/Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ license = "MIT"
1717
default = ["getopts"]
1818

1919
[dependencies]
20-
getopts = { version = "0.2.14", optional = true }
21-
bincode = { version = "1.0", optional = true }
22-
serde_derive = "1.0"
23-
serde = "1.0"
20+
anyhow = "1.0.57"
2421
abomonation = "0.7"
2522
abomonation_derive = "0.5"
23+
bincode = { version = "1.0", optional = true }
24+
crossbeam-channel = "0.5.0"
25+
getopts = { version = "0.2.14", optional = true }
26+
serde = "1.0"
27+
serde_derive = "1.0"
2628
timely_bytes = { path = "../bytes", version = "0.12" }
2729
timely_logging = { path = "../logging", version = "0.12" }
28-
crossbeam-channel = "0.5.0"

communication/examples/comm_hello.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,32 @@ fn main() {
1616

1717
// send typed data along each channel
1818
for i in 0 .. allocator.peers() {
19-
senders[i].send(Message::from_typed(format!("hello, {}", i)));
20-
senders[i].done();
19+
senders[i].send(Message::from_typed(format!("hello, {}", i)))?;
20+
senders[i].done()?;
2121
}
2222

2323
// no support for termination notification,
2424
// we have to count down ourselves.
2525
let mut received = 0;
2626
while received < allocator.peers() {
2727

28-
allocator.receive();
28+
allocator.receive()?;
2929

3030
if let Some(message) = receiver.recv() {
3131
println!("worker {}: received: <{}>", allocator.index(), message.deref());
3232
received += 1;
3333
}
3434

35-
allocator.release();
35+
allocator.release()?;
3636
}
3737

38-
allocator.index()
38+
Result::<_, anyhow::Error>::Ok(allocator.index())
3939
});
4040

4141
// computation runs until guards are joined or dropped.
4242
if let Ok(guards) = guards {
4343
for guard in guards.join() {
44-
println!("result: {:?}", guard);
44+
println!("result: {:?}", guard.unwrap().unwrap());
4545
}
4646
}
4747
else { println!("error in computation"); }

communication/src/allocator/counters.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::collections::VecDeque;
66

7-
use crate::{Push, Pull};
7+
use crate::{Push, Pull, Result};
88
use crate::allocator::Event;
99

1010
/// The push half of an intra-thread channel.
@@ -31,7 +31,7 @@ impl<T, P: Push<T>> Pusher<T, P> {
3131

3232
impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
3333
#[inline]
34-
fn push(&mut self, element: &mut Option<T>) {
34+
fn push(&mut self, element: &mut Option<T>) -> Result<()>{
3535
// if element.is_none() {
3636
// if self.count != 0 {
3737
// self.events
@@ -81,7 +81,7 @@ impl<T, P: Push<T>> ArcPusher<T, P> {
8181

8282
impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
8383
#[inline]
84-
fn push(&mut self, element: &mut Option<T>) {
84+
fn push(&mut self, element: &mut Option<T>) -> Result<()>{
8585
// if element.is_none() {
8686
// if self.count != 0 {
8787
// self.events
@@ -98,11 +98,12 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
9898
// we first enqueue data, second enqueue interest in the channel,
9999
// and finally awaken the thread. Other orders are defective when
100100
// multiple threads are involved.
101-
self.pusher.push(element);
101+
self.pusher.push(element)?;
102102
let _ = self.events.send((self.index, Event::Pushed(1)));
103103
// TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
104104
// .expect("Failed to send message count");
105105
self.buzzer.buzz();
106+
Ok(())
106107
}
107108
}
108109

communication/src/allocator/generic.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl Generic {
5757
}
5858
}
5959
/// Perform work before scheduling operators.
60-
fn receive(&mut self) {
60+
fn receive(&mut self) -> anyhow::Result<()> {
6161
match self {
6262
Generic::Thread(t) => t.receive(),
6363
Generic::Process(p) => p.receive(),
@@ -66,7 +66,7 @@ impl Generic {
6666
}
6767
}
6868
/// Perform work after scheduling operators.
69-
pub fn release(&mut self) {
69+
pub fn release(&mut self) -> anyhow::Result<()> {
7070
match self {
7171
Generic::Thread(t) => t.release(),
7272
Generic::Process(p) => p.release(),
@@ -91,8 +91,8 @@ impl Allocate for Generic {
9191
self.allocate(identifier)
9292
}
9393

94-
fn receive(&mut self) { self.receive(); }
95-
fn release(&mut self) { self.release(); }
94+
fn receive(&mut self) -> anyhow::Result<()> { self.receive() }
95+
fn release(&mut self) -> anyhow::Result<()> { self.release() }
9696
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
9797
fn await_events(&self, _duration: Option<std::time::Duration>) {
9898
match self {

communication/src/allocator/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub mod counters;
1818

1919
pub mod zero_copy;
2020

21-
use crate::{Data, Push, Pull, Message};
21+
use crate::{Data, Push, Pull, Message, Result};
2222

2323
/// A proto-allocator, which implements `Send` and can be completed with `build`.
2424
///
@@ -70,7 +70,7 @@ pub trait Allocate {
7070
/// present messages contained in otherwise scarce resources (for example
7171
/// network buffers), under the premise that someone is about to consume
7272
/// the messages and release the resources.
73-
fn receive(&mut self) { }
73+
fn receive(&mut self) -> Result<()> { Ok(()) }
7474

7575
/// Signal the completion of a batch of reads from channels.
7676
///
@@ -79,7 +79,7 @@ pub trait Allocate {
7979
/// the fabric should consider re-acquiring scarce resources. This can
8080
/// lead to the fabric performing defensive copies out of un-consumed
8181
/// buffers, and can be a performance problem if invoked casually.
82-
fn release(&mut self) { }
82+
fn release(&mut self) -> Result<()> { Ok(()) }
8383

8484
/// Constructs a pipeline channel from the worker to itself.
8585
///

communication/src/allocator/process.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crossbeam_channel::{Sender, Receiver};
1010

1111
use crate::allocator::thread::{ThreadBuilder};
1212
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread};
13-
use crate::{Push, Pull, Message};
13+
use crate::{Push, Pull, Message, Result};
1414
use crate::buzzer::Buzzer;
1515

1616
/// An allocator for inter-thread, intra-process communication
@@ -182,11 +182,12 @@ impl Allocate for Process {
182182
self.inner.await_events(duration);
183183
}
184184

185-
fn receive(&mut self) {
185+
fn receive(&mut self) -> anyhow::Result<()>{
186186
let mut events = self.inner.events().borrow_mut();
187187
while let Ok((index, event)) = self.counters_recv.try_recv() {
188188
events.push_back((index, event));
189189
}
190+
Ok(())
190191
}
191192
}
192193

@@ -204,12 +205,13 @@ impl<T> Clone for Pusher<T> {
204205
}
205206

206207
impl<T> Push<T> for Pusher<T> {
207-
#[inline] fn push(&mut self, element: &mut Option<T>) {
208+
#[inline] fn push(&mut self, element: &mut Option<T>) -> Result<()>{
208209
if let Some(element) = element.take() {
209210
// The remote endpoint could be shut down, and so
210211
// it is not fundamentally an error to fail to send.
211212
let _ = self.target.send(element);
212213
}
214+
Ok(())
213215
}
214216
}
215217

communication/src/allocator/thread.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::collections::VecDeque;
88
use crate::allocator::{Allocate, AllocateBuilder, Event};
99
use crate::allocator::counters::Pusher as CountPusher;
1010
use crate::allocator::counters::Puller as CountPuller;
11-
use crate::{Push, Pull, Message};
11+
use crate::{Push, Pull, Message, Result};
1212

1313
/// Builder for single-threaded allocator.
1414
pub struct ThreadBuilder;
@@ -81,12 +81,13 @@ pub struct Pusher<T> {
8181

8282
impl<T> Push<T> for Pusher<T> {
8383
#[inline]
84-
fn push(&mut self, element: &mut Option<T>) {
84+
fn push(&mut self, element: &mut Option<T>) -> Result<()>{
8585
let mut borrow = self.target.borrow_mut();
8686
if let Some(element) = element.take() {
8787
borrow.0.push_back(element);
8888
}
8989
*element = borrow.1.pop_front();
90+
Ok(())
9091
}
9192
}
9293

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
190190

191191
// Perform preparatory work, most likely reading binary buffers from self.recv.
192192
#[inline(never)]
193-
fn receive(&mut self) {
193+
fn receive(&mut self) -> anyhow::Result<()> {
194194

195195
// Check for channels whose `Puller` has been dropped.
196196
let mut canaries = self.canaries.borrow_mut();
@@ -205,12 +205,12 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
205205
// on events from it.
206206
// assert!(dropped.borrow().is_empty());
207207
}
208-
::std::mem::drop(canaries);
208+
drop(canaries);
209209

210-
self.inner.receive();
210+
self.inner.receive()?;
211211

212212
for recv in self.recvs.iter_mut() {
213-
recv.drain_into(&mut self.staged);
213+
recv.drain_into(&mut self.staged)?;
214214
}
215215

216216
let mut events = self.inner.events().borrow_mut();
@@ -251,13 +251,14 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
251251
}
252252
}
253253
}
254+
Ok(())
254255
}
255256

256257
// Perform postparatory work, most likely sending un-full binary buffers.
257-
fn release(&mut self) {
258+
fn release(&mut self) -> anyhow::Result<()> {
258259
// Publish outgoing byte ledgers.
259260
for send in self.sends.iter_mut() {
260-
send.borrow_mut().publish();
261+
send.borrow_mut().publish()?;
261262
}
262263

263264
// OPTIONAL: Tattle on channels sitting on borrowed data.
@@ -268,11 +269,12 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
268269
// eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
269270
// }
270271
// }
272+
Ok(())
271273
}
272274
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
273275
self.inner.events()
274276
}
275277
fn await_events(&self, duration: Option<std::time::Duration>) {
276278
self.inner.await_events(duration);
277279
}
278-
}
280+
}

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl Allocate for ProcessAllocator {
159159

160160
// Perform preparatory work, most likely reading binary buffers from self.recv.
161161
#[inline(never)]
162-
fn receive(&mut self) {
162+
fn receive(&mut self) -> anyhow::Result<()> {
163163

164164
// Check for channels whose `Puller` has been dropped.
165165
let mut canaries = self.canaries.borrow_mut();
@@ -179,7 +179,7 @@ impl Allocate for ProcessAllocator {
179179
let mut events = self.events.borrow_mut();
180180

181181
for recv in self.recvs.iter_mut() {
182-
recv.drain_into(&mut self.staged);
182+
recv.drain_into(&mut self.staged)?;
183183
}
184184

185185
for mut bytes in self.staged.drain(..) {
@@ -218,13 +218,14 @@ impl Allocate for ProcessAllocator {
218218
}
219219
}
220220
}
221+
Ok(())
221222
}
222223

223224
// Perform postparatory work, most likely sending un-full binary buffers.
224-
fn release(&mut self) {
225+
fn release(&mut self) -> anyhow::Result<()> {
225226
// Publish outgoing byte ledgers.
226227
for send in self.sends.iter_mut() {
227-
send.borrow_mut().publish();
228+
send.borrow_mut().publish()?;
228229
}
229230

230231
// OPTIONAL: Tattle on channels sitting on borrowed data.
@@ -235,6 +236,7 @@ impl Allocate for ProcessAllocator {
235236
// eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
236237
// }
237238
// }
239+
Ok(())
238240
}
239241

240242
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
@@ -250,4 +252,4 @@ impl Allocate for ProcessAllocator {
250252
}
251253
}
252254
}
253-
}
255+
}

0 commit comments

Comments
 (0)