Skip to content

Commit e0a1bf8

Browse files
committed
wip
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 78b9344 commit e0a1bf8

File tree

34 files changed

+164
-154
lines changed

34 files changed

+164
-154
lines changed

communication/Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ license = "MIT"
1717
default = ["getopts"]
1818

1919
[dependencies]
20-
anyhow = "1.0.57"
21-
abomonation = "0.7"
22-
abomonation_derive = "0.5"
23-
bincode = { version = "1.0", optional = true }
24-
crossbeam-channel = "0.5.0"
20+
anyhow = "1.0.68"
2521
getopts = { version = "0.2.14", optional = true }
26-
serde = "1.0"
22+
bincode = { version = "1.0", optional = true }
2723
serde_derive = "1.0"
24+
serde = "1.0"
25+
abomonation = "0.7"
26+
abomonation_derive = "0.5"
2827
timely_bytes = { path = "../bytes", version = "0.12" }
2928
timely_logging = { path = "../logging", version = "0.12" }
29+
crossbeam-channel = "0.5.0"

communication/examples/comm_hello.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
extern crate timely_communication;
22

33
use std::ops::Deref;
4-
use timely_communication::{Message, Allocate};
4+
use timely_communication::{Message, Allocate, Result};
55

66
fn main() {
77

@@ -35,7 +35,7 @@ fn main() {
3535
allocator.release()?;
3636
}
3737

38-
Result::<_, anyhow::Error>::Ok(allocator.index())
38+
Result::Ok(allocator.index())
3939
});
4040

4141
// computation runs until guards are joined or dropped.

communication/src/allocator/counters.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::collections::VecDeque;
66

7-
use crate::{Push, Pull, Result};
7+
use crate::{Push, Pull};
88
use crate::allocator::Event;
99

1010
/// The push half of an intra-thread channel.
@@ -31,7 +31,7 @@ impl<T, P: Push<T>> Pusher<T, P> {
3131

3232
impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
3333
#[inline]
34-
fn push(&mut self, element: &mut Option<T>) -> Result<()>{
34+
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()>{
3535
// if element.is_none() {
3636
// if self.count != 0 {
3737
// self.events
@@ -81,7 +81,7 @@ impl<T, P: Push<T>> ArcPusher<T, P> {
8181

8282
impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
8383
#[inline]
84-
fn push(&mut self, element: &mut Option<T>) -> Result<()>{
84+
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()>{
8585
// if element.is_none() {
8686
// if self.count != 0 {
8787
// self.events

communication/src/allocator/generic.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl Generic {
5757
}
5858
}
5959
/// Perform work before scheduling operators.
60-
fn receive(&mut self) -> anyhow::Result<()> {
60+
fn receive(&mut self) -> crate::Result<()> {
6161
match self {
6262
Generic::Thread(t) => t.receive(),
6363
Generic::Process(p) => p.receive(),
@@ -66,7 +66,7 @@ impl Generic {
6666
}
6767
}
6868
/// Perform work after scheduling operators.
69-
pub fn release(&mut self) -> anyhow::Result<()> {
69+
pub fn release(&mut self) -> crate::Result<()> {
7070
match self {
7171
Generic::Thread(t) => t.release(),
7272
Generic::Process(p) => p.release(),
@@ -91,8 +91,8 @@ impl Allocate for Generic {
9191
self.allocate(identifier)
9292
}
9393

94-
fn receive(&mut self) -> anyhow::Result<()> { self.receive() }
95-
fn release(&mut self) -> anyhow::Result<()> { self.release() }
94+
fn receive(&mut self) -> crate::Result<()> { self.receive() }
95+
fn release(&mut self) -> crate::Result<()> { self.release() }
9696
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
9797
fn await_events(&self, _duration: Option<std::time::Duration>) {
9898
match self {

communication/src/allocator/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub mod counters;
1818

1919
pub mod zero_copy;
2020

21-
use crate::{Data, Push, Pull, Message, Result};
21+
use crate::{Data, Push, Pull, Message};
2222

2323
/// A proto-allocator, which implements `Send` and can be completed with `build`.
2424
///
@@ -70,7 +70,7 @@ pub trait Allocate {
7070
/// present messages contained in otherwise scarce resources (for example
7171
/// network buffers), under the premise that someone is about to consume
7272
/// the messages and release the resources.
73-
fn receive(&mut self) -> Result<()> { Ok(()) }
73+
fn receive(&mut self) -> crate::Result<()> { Ok(()) }
7474

7575
/// Signal the completion of a batch of reads from channels.
7676
///
@@ -79,7 +79,7 @@ pub trait Allocate {
7979
/// the fabric should consider re-acquiring scarce resources. This can
8080
/// lead to the fabric performing defensive copies out of un-consumed
8181
/// buffers, and can be a performance problem if invoked casually.
82-
fn release(&mut self) -> Result<()> { Ok(()) }
82+
fn release(&mut self) -> crate::Result<()> { Ok(()) }
8383

8484
/// Constructs a pipeline channel from the worker to itself.
8585
///

communication/src/allocator/process.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crossbeam_channel::{Sender, Receiver};
1010

1111
use crate::allocator::thread::{ThreadBuilder};
1212
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread};
13-
use crate::{Push, Pull, Message, Result};
13+
use crate::{Push, Pull, Message};
1414
use crate::buzzer::Buzzer;
1515

1616
/// An allocator for inter-thread, intra-process communication
@@ -182,7 +182,7 @@ impl Allocate for Process {
182182
self.inner.await_events(duration);
183183
}
184184

185-
fn receive(&mut self) -> anyhow::Result<()>{
185+
fn receive(&mut self) -> crate::Result<()>{
186186
let mut events = self.inner.events().borrow_mut();
187187
while let Ok((index, event)) = self.counters_recv.try_recv() {
188188
events.push_back((index, event));
@@ -205,7 +205,7 @@ impl<T> Clone for Pusher<T> {
205205
}
206206

207207
impl<T> Push<T> for Pusher<T> {
208-
#[inline] fn push(&mut self, element: &mut Option<T>) -> Result<()>{
208+
#[inline] fn push(&mut self, element: &mut Option<T>) -> crate::Result<()>{
209209
if let Some(element) = element.take() {
210210
// The remote endpoint could be shut down, and so
211211
// it is not fundamentally an error to fail to send.

communication/src/allocator/thread.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::collections::VecDeque;
88
use crate::allocator::{Allocate, AllocateBuilder, Event};
99
use crate::allocator::counters::Pusher as CountPusher;
1010
use crate::allocator::counters::Puller as CountPuller;
11-
use crate::{Push, Pull, Message, Result};
11+
use crate::{Push, Pull, Message};
1212

1313
/// Builder for single-threaded allocator.
1414
pub struct ThreadBuilder;
@@ -81,7 +81,7 @@ pub struct Pusher<T> {
8181

8282
impl<T> Push<T> for Pusher<T> {
8383
#[inline]
84-
fn push(&mut self, element: &mut Option<T>) -> Result<()>{
84+
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()>{
8585
let mut borrow = self.target.borrow_mut();
8686
if let Some(element) = element.take() {
8787
borrow.0.push_back(element);

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
190190

191191
// Perform preparatory work, most likely reading binary buffers from self.recv.
192192
#[inline(never)]
193-
fn receive(&mut self) -> anyhow::Result<()> {
193+
fn receive(&mut self) -> crate::Result<()> {
194194

195195
// Check for channels whose `Puller` has been dropped.
196196
let mut canaries = self.canaries.borrow_mut();
@@ -205,7 +205,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
205205
// on events from it.
206206
// assert!(dropped.borrow().is_empty());
207207
}
208-
drop(canaries);
208+
::std::mem::drop(canaries);
209209

210210
self.inner.receive()?;
211211

@@ -255,7 +255,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
255255
}
256256

257257
// Perform postparatory work, most likely sending un-full binary buffers.
258-
fn release(&mut self) -> anyhow::Result<()> {
258+
fn release(&mut self) -> crate::Result<()> {
259259
// Publish outgoing byte ledgers.
260260
for send in self.sends.iter_mut() {
261261
send.borrow_mut().publish()?;

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl Allocate for ProcessAllocator {
159159

160160
// Perform preparatory work, most likely reading binary buffers from self.recv.
161161
#[inline(never)]
162-
fn receive(&mut self) -> anyhow::Result<()> {
162+
fn receive(&mut self) -> crate::Result<()> {
163163

164164
// Check for channels whose `Puller` has been dropped.
165165
let mut canaries = self.canaries.borrow_mut();
@@ -222,7 +222,7 @@ impl Allocate for ProcessAllocator {
222222
}
223223

224224
// Perform postparatory work, most likely sending un-full binary buffers.
225-
fn release(&mut self) -> anyhow::Result<()> {
225+
fn release(&mut self) -> crate::Result<()> {
226226
// Publish outgoing byte ledgers.
227227
for send in self.sends.iter_mut() {
228228
send.borrow_mut().publish()?;

communication/src/allocator/zero_copy/bytes_exchange.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,28 @@
11
//! Types and traits for sharing `Bytes`.
22
33
use std::sync::{Arc, Mutex};
4-
use std::sync::atomic::{AtomicBool, Ordering};
54
use std::collections::VecDeque;
65
use anyhow::bail;
76

87
use bytes::arc::Bytes;
98
use super::bytes_slab::BytesSlab;
109

11-
use crate::Result;
12-
1310
/// A target for `Bytes`.
1411
pub trait BytesPush {
1512
// /// Pushes bytes at the instance.
1613
// fn push(&mut self, bytes: Bytes);
1714
/// Pushes many bytes at the instance.
18-
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I) -> Result<()>;
15+
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I) -> crate::Result<()>;
1916
}
2017
/// A source for `Bytes`.
2118
pub trait BytesPull {
2219
// /// Pulls bytes from the instance.
2320
// fn pull(&mut self) -> Option<Bytes>;
2421
/// Drains many bytes from the instance.
25-
fn drain_into(&mut self, vec: &mut Vec<Bytes>) -> Result<()>;
22+
fn drain_into(&mut self, vec: &mut Vec<Bytes>) -> crate::Result<()>;
2623
}
2724

25+
use std::sync::atomic::{AtomicBool, Ordering};
2826
/// An unbounded queue of bytes intended for point-to-point communication
2927
/// between threads. Cloning returns another handle to the same queue.
3028
///
@@ -46,19 +44,19 @@ impl MergeQueue {
4644
}
4745
}
4846
/// Indicates that all input handles to the queue have dropped.
49-
pub fn is_complete(&self) -> Result<bool> {
47+
pub fn is_complete(&self) -> crate::Result<bool> {
5048
if self.panic.load(Ordering::SeqCst) { bail!("MergeQueue poisoned."); }
51-
Ok(Arc::strong_count(&self.queue) == 1 && self.queue.lock().map_err(|_| anyhow::anyhow!("MergeQueue mutex poisoned."))?.is_empty())
49+
Ok(Arc::strong_count(&self.queue) == 1 && self.queue.lock().map_err(|e| anyhow::anyhow!("MergeQueue mutex poisoned: {e}"))?.is_empty())
5250
}
5351

54-
/// TODO
52+
/// Mark self as poisoned, which causes all subsequent operations to error.
5553
pub fn poison(&mut self) {
5654
self.panic.store(true, Ordering::SeqCst);
5755
}
5856
}
5957

6058
impl BytesPush for MergeQueue {
61-
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) -> Result<()> {
59+
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) -> crate::Result<()> {
6260

6361
if self.panic.load(Ordering::SeqCst) { bail!("MergeQueue poisoned."); }
6462

@@ -68,7 +66,7 @@ impl BytesPush for MergeQueue {
6866
lock_ok = self.queue.try_lock();
6967
}
7068
let mut queue = lock_ok
71-
.map_err(|_| anyhow::anyhow!("MergeQueue mutex poisoned."))?;
69+
.map_err(|e| anyhow::anyhow!("MergeQueue mutex poisoned: {e}"))?;
7270

7371
let mut iterator = iterator.into_iter();
7472
let mut should_ping = false;
@@ -102,7 +100,7 @@ impl BytesPush for MergeQueue {
102100
}
103101

104102
impl BytesPull for MergeQueue {
105-
fn drain_into(&mut self, vec: &mut Vec<Bytes>) -> Result<()> {
103+
fn drain_into(&mut self, vec: &mut Vec<Bytes>) -> crate::Result<()> {
106104
if self.panic.load(Ordering::SeqCst) { bail!("MergeQueue poisoned."); }
107105

108106
// try to acquire lock without going to sleep (Rust's lock() might yield)
@@ -111,7 +109,7 @@ impl BytesPull for MergeQueue {
111109
lock_ok = self.queue.try_lock();
112110
}
113111
let mut queue = lock_ok
114-
.map_err(|_| anyhow::anyhow!("MergeQueue mutex poisoned."))?;
112+
.map_err(|e| anyhow::anyhow!("MergeQueue mutex poisoned: {e}"))?;
115113

116114
vec.extend(queue.drain(..));
117115
Ok(())
@@ -128,7 +126,7 @@ impl Drop for MergeQueue {
128126
}
129127
else {
130128
// TODO: Perhaps this aggressive ordering can relax orderings elsewhere.
131-
// if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
129+
if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
132130
}
133131
// Drop the queue before pinging.
134132
self.queue = Arc::new(Mutex::new(VecDeque::new()));
@@ -146,7 +144,7 @@ pub struct SendEndpoint<P: BytesPush> {
146144
impl<P: BytesPush> SendEndpoint<P> {
147145

148146
/// Moves `self.buffer` into `self.send`, replaces with empty buffer.
149-
fn send_buffer(&mut self) -> Result<()> {
147+
fn send_buffer(&mut self) -> crate::Result<()> {
150148
let valid_len = self.buffer.valid().len();
151149
if valid_len > 0 {
152150
self.send.extend(Some(self.buffer.extract(valid_len)))?;
@@ -164,12 +162,12 @@ impl<P: BytesPush> SendEndpoint<P> {
164162
/// Makes the next `bytes` bytes valid.
165163
///
166164
/// The current implementation also sends the bytes, to ensure early visibility.
167-
pub fn make_valid(&mut self, bytes: usize) -> Result<()> {
165+
pub fn make_valid(&mut self, bytes: usize) -> crate::Result<()> {
168166
self.buffer.make_valid(bytes);
169167
self.send_buffer()
170168
}
171169
/// Acquires a prefix of `self.empty()` of length at least `capacity`.
172-
pub fn reserve(&mut self, capacity: usize) -> Result<&mut [u8]> {
170+
pub fn reserve(&mut self, capacity: usize) -> crate::Result<&mut [u8]> {
173171

174172
if self.buffer.empty().len() < capacity {
175173
self.send_buffer()?;
@@ -180,7 +178,7 @@ impl<P: BytesPush> SendEndpoint<P> {
180178
Ok(self.buffer.empty())
181179
}
182180
/// Marks all written data as valid, makes visible.
183-
pub fn publish(&mut self) -> Result<()> {
181+
pub fn publish(&mut self) -> crate::Result<()> {
184182
self.send_buffer()
185183
}
186184
}

0 commit comments

Comments
 (0)