Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ fn main() {
}
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
worker.step()?;
}
}
Ok(())
}).unwrap();
}
```
Expand Down
14 changes: 7 additions & 7 deletions communication/examples/comm_hello.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate timely_communication;

use std::ops::Deref;
use timely_communication::{Message, Allocate};
use timely_communication::{Message, Allocate, Result};

fn main() {

Expand All @@ -16,32 +16,32 @@ fn main() {

// send typed data along each channel
for i in 0 .. allocator.peers() {
senders[i].send(Message::from_typed(format!("hello, {}", i)));
senders[i].done();
senders[i].send(Message::from_typed(format!("hello, {}", i)))?;
senders[i].done()?;
}

// no support for termination notification,
// we have to count down ourselves.
let mut received = 0;
while received < allocator.peers() {

allocator.receive();
allocator.receive()?;

if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message.deref());
received += 1;
}

allocator.release();
allocator.release()?;
}

allocator.index()
Result::Ok(allocator.index())
});

// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
for guard in guards.join() {
println!("result: {:?}", guard);
println!("result: {:?}", guard.unwrap().unwrap());
}
}
else { println!("error in computation"); }
Expand Down
7 changes: 4 additions & 3 deletions communication/src/allocator/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl<T, P: Push<T>> Pusher<T, P> {

impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
#[inline]
fn push(&mut self, element: &mut Option<T>) {
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()> {
// if element.is_none() {
// if self.count != 0 {
// self.events
Expand Down Expand Up @@ -81,7 +81,7 @@ impl<T, P: Push<T>> ArcPusher<T, P> {

impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
#[inline]
fn push(&mut self, element: &mut Option<T>) {
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()> {
// if element.is_none() {
// if self.count != 0 {
// self.events
Expand All @@ -98,11 +98,12 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
// we first enqueue data, second enqueue interest in the channel,
// and finally awaken the thread. Other orders are defective when
// multiple threads are involved.
self.pusher.push(element);
self.pusher.push(element)?;
let _ = self.events.send((self.index, Event::Pushed(1)));
// TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
// .expect("Failed to send message count");
self.buzzer.buzz();
Ok(())
}
}

Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Generic {
}
}
/// Perform work before scheduling operators.
fn receive(&mut self) {
fn receive(&mut self) -> crate::Result<()> {
match self {
Generic::Thread(t) => t.receive(),
Generic::Process(p) => p.receive(),
Expand All @@ -66,7 +66,7 @@ impl Generic {
}
}
/// Perform work after scheduling operators.
pub fn release(&mut self) {
pub fn release(&mut self) -> crate::Result<()> {
match self {
Generic::Thread(t) => t.release(),
Generic::Process(p) => p.release(),
Expand All @@ -91,8 +91,8 @@ impl Allocate for Generic {
self.allocate(identifier)
}

fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn receive(&mut self) -> crate::Result<()> { self.receive() }
fn release(&mut self) -> crate::Result<()> { self.release() }
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub trait Allocate {
/// present messages contained in otherwise scarce resources (for example
/// network buffers), under the premise that someone is about to consume
/// the messages and release the resources.
fn receive(&mut self) { }
fn receive(&mut self) -> crate::Result<()> { Ok(()) }

/// Signal the completion of a batch of reads from channels.
///
Expand All @@ -79,7 +79,7 @@ pub trait Allocate {
/// the fabric should consider re-acquiring scarce resources. This can
/// lead to the fabric performing defensive copies out of un-consumed
/// buffers, and can be a performance problem if invoked casually.
fn release(&mut self) { }
fn release(&mut self) -> crate::Result<()> { Ok(()) }

/// Constructs a pipeline channel from the worker to itself.
///
Expand Down
6 changes: 4 additions & 2 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,12 @@ impl Allocate for Process {
self.inner.await_events(duration);
}

fn receive(&mut self) {
fn receive(&mut self) -> crate::Result<()> {
let mut events = self.inner.events().borrow_mut();
while let Ok((index, event)) = self.counters_recv.try_recv() {
events.push_back((index, event));
}
Ok(())
}
}

Expand All @@ -204,12 +205,13 @@ impl<T> Clone for Pusher<T> {
}

impl<T> Push<T> for Pusher<T> {
#[inline] fn push(&mut self, element: &mut Option<T>) {
#[inline] fn push(&mut self, element: &mut Option<T>) -> crate::Result<()> {
if let Some(element) = element.take() {
// The remote endpoint could be shut down, and so
// it is not fundamentally an error to fail to send.
let _ = self.target.send(element);
}
Ok(())
}
}

Expand Down
3 changes: 2 additions & 1 deletion communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ pub struct Pusher<T> {

impl<T> Push<T> for Pusher<T> {
#[inline]
fn push(&mut self, element: &mut Option<T>) {
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()> {
let mut borrow = self.target.borrow_mut();
if let Some(element) = element.take() {
borrow.0.push_back(element);
}
*element = borrow.1.pop_front();
Ok(())
}
}

Expand Down
14 changes: 8 additions & 6 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {

// Perform preparatory work, most likely reading binary buffers from self.recv.
#[inline(never)]
fn receive(&mut self) {
fn receive(&mut self) -> crate::Result<()> {

// Check for channels whose `Puller` has been dropped.
let mut canaries = self.canaries.borrow_mut();
Expand All @@ -207,10 +207,10 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
}
::std::mem::drop(canaries);

self.inner.receive();
self.inner.receive()?;

for recv in self.recvs.iter_mut() {
recv.drain_into(&mut self.staged);
recv.drain_into(&mut self.staged)?;
}

let mut events = self.inner.events().borrow_mut();
Expand Down Expand Up @@ -251,13 +251,14 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
}
}
}
Ok(())
}

// Perform postparatory work, most likely sending un-full binary buffers.
fn release(&mut self) {
fn release(&mut self) -> crate::Result<()> {
// Publish outgoing byte ledgers.
for send in self.sends.iter_mut() {
send.borrow_mut().publish();
send.borrow_mut().publish()?;
}

// OPTIONAL: Tattle on channels sitting on borrowed data.
Expand All @@ -268,11 +269,12 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
// eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
// }
// }
Ok(())
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
self.inner.events()
}
fn await_events(&self, duration: Option<std::time::Duration>) {
self.inner.await_events(duration);
}
}
}
12 changes: 7 additions & 5 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl Allocate for ProcessAllocator {

// Perform preparatory work, most likely reading binary buffers from self.recv.
#[inline(never)]
fn receive(&mut self) {
fn receive(&mut self) -> crate::Result<()> {

// Check for channels whose `Puller` has been dropped.
let mut canaries = self.canaries.borrow_mut();
Expand All @@ -179,7 +179,7 @@ impl Allocate for ProcessAllocator {
let mut events = self.events.borrow_mut();

for recv in self.recvs.iter_mut() {
recv.drain_into(&mut self.staged);
recv.drain_into(&mut self.staged)?;
}

for mut bytes in self.staged.drain(..) {
Expand Down Expand Up @@ -218,13 +218,14 @@ impl Allocate for ProcessAllocator {
}
}
}
Ok(())
}

// Perform postparatory work, most likely sending un-full binary buffers.
fn release(&mut self) {
fn release(&mut self) -> crate::Result<()> {
// Publish outgoing byte ledgers.
for send in self.sends.iter_mut() {
send.borrow_mut().publish();
send.borrow_mut().publish()?;
}

// OPTIONAL: Tattle on channels sitting on borrowed data.
Expand All @@ -235,6 +236,7 @@ impl Allocate for ProcessAllocator {
// eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
// }
// }
Ok(())
}

fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
Expand All @@ -250,4 +252,4 @@ impl Allocate for ProcessAllocator {
}
}
}
}
}
Loading