Skip to content

Commit fb9cb0f

Browse files
committed
wip
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent a466fd0 commit fb9cb0f

File tree

30 files changed

+128
-93
lines changed

30 files changed

+128
-93
lines changed

communication/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ license = "MIT"
1717
default = ["getopts"]
1818

1919
[dependencies]
20-
anyhow = "1.0.68"
2120
getopts = { version = "0.2.14", optional = true }
2221
bincode = { version = "1.0", optional = true }
2322
serde_derive = "1.0"

communication/examples/comm_hello.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ fn main() {
3535
allocator.release()?;
3636
}
3737

38-
Result::Ok(allocator.index())
38+
Ok(allocator.index())
3939
});
4040

4141
// computation runs until guards are joined or dropped.

communication/src/allocator/counters.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ impl<T, P: Push<T>> Pusher<T, P> {
3131

3232
impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
3333
#[inline]
34-
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()>{
34+
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()> {
3535
// if element.is_none() {
3636
// if self.count != 0 {
3737
// self.events
@@ -81,7 +81,7 @@ impl<T, P: Push<T>> ArcPusher<T, P> {
8181

8282
impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
8383
#[inline]
84-
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()>{
84+
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()> {
8585
// if element.is_none() {
8686
// if self.count != 0 {
8787
// self.events

communication/src/allocator/process.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl Allocate for Process {
182182
self.inner.await_events(duration);
183183
}
184184

185-
fn receive(&mut self) -> crate::Result<()>{
185+
fn receive(&mut self) -> crate::Result<()> {
186186
let mut events = self.inner.events().borrow_mut();
187187
while let Ok((index, event)) = self.counters_recv.try_recv() {
188188
events.push_back((index, event));
@@ -205,7 +205,7 @@ impl<T> Clone for Pusher<T> {
205205
}
206206

207207
impl<T> Push<T> for Pusher<T> {
208-
#[inline] fn push(&mut self, element: &mut Option<T>) -> crate::Result<()>{
208+
#[inline] fn push(&mut self, element: &mut Option<T>) -> crate::Result<()> {
209209
if let Some(element) = element.take() {
210210
// The remote endpoint could be shut down, and so
211211
// it is not fundamentally an error to fail to send.

communication/src/allocator/thread.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ pub struct Pusher<T> {
8181

8282
impl<T> Push<T> for Pusher<T> {
8383
#[inline]
84-
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()>{
84+
fn push(&mut self, element: &mut Option<T>) -> crate::Result<()> {
8585
let mut borrow = self.target.borrow_mut();
8686
if let Some(element) = element.take() {
8787
borrow.0.push_back(element);

communication/src/allocator/zero_copy/bytes_exchange.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
33
use std::sync::{Arc, Mutex};
44
use std::collections::VecDeque;
5-
use anyhow::bail;
65

76
use bytes::arc::Bytes;
87
use super::bytes_slab::BytesSlab;
8+
use crate::err::CommError;
99

1010
/// A target for `Bytes`.
1111
pub trait BytesPush {
@@ -45,8 +45,8 @@ impl MergeQueue {
4545
}
4646
/// Indicates that all input handles to the queue have dropped.
4747
pub fn is_complete(&self) -> crate::Result<bool> {
48-
if self.panic.load(Ordering::SeqCst) { bail!("MergeQueue poisoned."); }
49-
Ok(Arc::strong_count(&self.queue) == 1 && self.queue.lock().map_err(|e| anyhow::anyhow!("MergeQueue mutex poisoned: {e}"))?.is_empty())
48+
if self.panic.load(Ordering::SeqCst) { return Err(CommError::Poison); }
49+
Ok(Arc::strong_count(&self.queue) == 1 && self.queue.lock()?.is_empty())
5050
}
5151

5252
/// Mark self as poisoned, which causes all subsequent operations to error.
@@ -58,15 +58,14 @@ impl MergeQueue {
5858
impl BytesPush for MergeQueue {
5959
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) -> crate::Result<()> {
6060

61-
if self.panic.load(Ordering::SeqCst) { bail!("MergeQueue poisoned."); }
61+
if self.panic.load(Ordering::SeqCst) { return Err(CommError::Poison); }
6262

6363
// try to acquire lock without going to sleep (Rust's lock() might yield)
6464
let mut lock_ok = self.queue.try_lock();
6565
while let Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
6666
lock_ok = self.queue.try_lock();
6767
}
68-
let mut queue = lock_ok
69-
.map_err(|e| anyhow::anyhow!("MergeQueue mutex poisoned: {e}"))?;
68+
let mut queue = lock_ok?;
7069

7170
let mut iterator = iterator.into_iter();
7271
let mut should_ping = false;
@@ -101,15 +100,14 @@ impl BytesPush for MergeQueue {
101100

102101
impl BytesPull for MergeQueue {
103102
fn drain_into(&mut self, vec: &mut Vec<Bytes>) -> crate::Result<()> {
104-
if self.panic.load(Ordering::SeqCst) { bail!("MergeQueue poisoned."); }
103+
if self.panic.load(Ordering::SeqCst) { return Err(CommError::Poison); }
105104

106105
// try to acquire lock without going to sleep (Rust's lock() might yield)
107106
let mut lock_ok = self.queue.try_lock();
108107
while let Err(::std::sync::TryLockError::WouldBlock) = lock_ok {
109108
lock_ok = self.queue.try_lock();
110109
}
111-
let mut queue = lock_ok
112-
.map_err(|e| anyhow::anyhow!("MergeQueue mutex poisoned: {e}"))?;
110+
let mut queue = lock_ok?;
113111

114112
vec.extend(queue.drain(..));
115113
Ok(())

communication/src/allocator/zero_copy/initialize.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use std::sync::Arc;
44
// use crate::allocator::Process;
55
use crate::allocator::process::ProcessBuilder;
66
use crate::networking::create_sockets;
7+
use super::tcp::{send_loop, recv_loop};
78
use super::allocator::{TcpBuilder, new_vector};
89
use super::stream::Stream;
9-
use super::tcp::{send_loop, recv_loop};
1010

1111
/// Join handles for send and receive threads.
1212
///

communication/src/allocator/zero_copy/push_pull.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::collections::VecDeque;
6-
use anyhow::Context;
76

87
use bytes::arc::Bytes;
98

@@ -38,7 +37,7 @@ impl<T, P: BytesPush> Pusher<T, P> {
3837

3938
impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
4039
#[inline]
41-
fn push(&mut self, element: &mut Option<Message<T>>) -> crate::Result<()>{
40+
fn push(&mut self, element: &mut Option<Message<T>>) -> crate::Result<()> {
4241
if let Some(ref mut element) = *element {
4342

4443
// determine byte lengths and build header.
@@ -53,7 +52,7 @@ impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
5352
let mut bytes = borrow.reserve(header.required_bytes())?;
5453
assert!(bytes.len() >= header.required_bytes());
5554
let writer = &mut bytes;
56-
header.write_to(writer).context("writing header")?;
55+
header.write_to(writer)?;
5756
element.into_bytes(writer);
5857
}
5958
borrow.make_valid(header.required_bytes())?;

communication/src/allocator/zero_copy/tcp.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//!
22
33
use std::io::Write;
4-
use anyhow::{bail, Context};
54
use crossbeam_channel::{Sender, Receiver};
65

76
use crate::networking::MessageHeader;
@@ -12,6 +11,7 @@ use super::stream::Stream;
1211

1312
use logging_core::Logger;
1413

14+
use crate::err::CommError;
1515
use crate::logging::{CommunicationEvent, CommunicationSetup, MessageEvent, StateEvent};
1616

1717
/// Repeatedly reads from a TcpStream and carves out messages.
@@ -83,8 +83,8 @@ where
8383
assert!(!buffer.empty().is_empty());
8484

8585
// Attempt to read some more bytes into self.buffer.
86-
let read = match reader.read(&mut buffer.empty()).context("reading data")? {
87-
0 => bail!("reading data: Unexpected EOF"),
86+
let read = match reader.read(&mut buffer.empty())? {
87+
0 => { return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof).into()); },
8888
n => n,
8989
};
9090

@@ -109,11 +109,11 @@ where
109109
// Shutting down; confirm absence of subsequent data.
110110
active = false;
111111
if !buffer.valid().is_empty() {
112-
bail!("Clean shutdown followed by data.");
112+
return Err(CommError::UnexpectedData);
113113
}
114114
buffer.ensure_capacity(1);
115-
if reader.read(&mut buffer.empty()).context("reading data")? > 0 {
116-
bail!("Clean shutdown followed by data.");
115+
if reader.read(&mut buffer.empty())? > 0 {
116+
return Err(CommError::UnexpectedData);
117117
}
118118
}
119119
}
@@ -174,7 +174,7 @@ pub fn send_loop<S: Stream>(
174174
// still be a signal incoming.
175175
//
176176
// We could get awoken by more data, a channel closing, or spuriously perhaps.
177-
writer.flush().context("Flushing writer")?;
177+
writer.flush()?;
178178
for source in sources.iter_mut() {
179179
if let Some(s) = source {
180180
if s.is_complete()? {
@@ -200,7 +200,7 @@ pub fn send_loop<S: Stream>(
200200
}
201201
});
202202

203-
writer.write_all(&bytes[..]).context("writing data")?;
203+
writer.write_all(&bytes[..])?;
204204
}
205205
}
206206
}
@@ -215,9 +215,9 @@ pub fn send_loop<S: Stream>(
215215
length: 0,
216216
seqno: 0,
217217
};
218-
header.write_to(&mut writer).context("writing data")?;
219-
writer.flush().context("flushing writer")?;
220-
writer.get_mut().shutdown(::std::net::Shutdown::Write).context("Write shutdown failed")?;
218+
header.write_to(&mut writer)?;
219+
writer.flush()?;
220+
writer.get_mut().shutdown(::std::net::Shutdown::Write)?;
221221
logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header }));
222222

223223
// Log the send thread's end.

communication/src/initialize.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use crate::allocator::zero_copy::initialize::initialize_networking;
1717
use crate::logging::{CommunicationSetup, CommunicationEvent};
1818
use logging_core::Logger;
1919
use std::fmt::{Debug, Formatter};
20-
use anyhow::{bail, Context};
2120

2221

2322
/// Possible configurations for the communication infrastructure.
@@ -90,23 +89,23 @@ impl Config {
9089
/// This method is only available if the `getopts` feature is enabled, which
9190
/// it is by default.
9291
#[cfg(feature = "getopts")]
93-
pub fn from_matches(matches: &getopts::Matches) -> crate::Result<Config> {
94-
let threads = matches.opt_get_default("w", 1_usize)?;
95-
let process = matches.opt_get_default("p", 0_usize)?;
96-
let processes = matches.opt_get_default("n", 1_usize)?;
92+
pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
93+
let threads = matches.opt_get_default("w", 1_usize).map_err(|e| e.to_string())?;
94+
let process = matches.opt_get_default("p", 0_usize).map_err(|e| e.to_string())?;
95+
let processes = matches.opt_get_default("n", 1_usize).map_err(|e| e.to_string())?;
9796
let report = matches.opt_present("report");
9897
let zerocopy = matches.opt_present("zerocopy");
9998

10099
if processes > 1 {
101100
let mut addresses = Vec::new();
102101
if let Some(hosts) = matches.opt_str("h") {
103-
let file = ::std::fs::File::open(hosts.clone())?;
102+
let file = ::std::fs::File::open(hosts.clone()).map_err(|e| e.to_string())?;
104103
let reader = ::std::io::BufReader::new(file);
105104
for line in reader.lines().take(processes) {
106-
addresses.push(line?);
105+
addresses.push(line.map_err(|e| e.to_string())?);
107106
}
108107
if addresses.len() < processes {
109-
bail!("could only read {} addresses from {hosts}, but -n: {processes}", addresses.len());
108+
return Err(format!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes));
110109
}
111110
}
112111
else {
@@ -141,10 +140,10 @@ impl Config {
141140
/// This method is only available if the `getopts` feature is enabled, which
142141
/// it is by default.
143142
#[cfg(feature = "getopts")]
144-
pub fn from_args<I: Iterator<Item=String>>(args: I) -> crate::Result<Config> {
143+
pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<Config, String> {
145144
let mut opts = getopts::Options::new();
146145
Config::install_options(&mut opts);
147-
let matches = opts.parse(args)?;
146+
let matches = opts.parse(args).map_err(|e| e.to_string())?;
148147
Config::from_matches(&matches)
149148
}
150149

@@ -161,8 +160,7 @@ impl Config {
161160
Ok((ProcessBuilder::new_vector(threads).into_iter().map(|x| GenericBuilder::ProcessBinary(x)).collect(), Box::new(())))
162161
},
163162
Config::Cluster { threads, process, addresses, report, log_fn } => {
164-
let (stuff, guard) = initialize_networking(addresses, process, threads, report, log_fn)
165-
.context("initializing network")?;
163+
let (stuff, guard) = initialize_networking(addresses, process, threads, report, log_fn)?;
166164
Ok((stuff.into_iter().map(|x| GenericBuilder::ZeroCopy(x)).collect(), Box::new(guard)))
167165
},
168166
}
@@ -337,10 +335,10 @@ impl<T:Send+'static> WorkerGuards<T> {
337335
}
338336

339337
/// Waits on the worker threads and returns the results they produce.
340-
pub fn join(mut self) -> Vec<crate::Result<T>> {
338+
pub fn join(mut self) -> Vec<Result<T, String>> {
341339
self.guards
342340
.drain(..)
343-
.map(|guard| guard.join().map_err(|e| anyhow::anyhow!("{e:?}")))
341+
.map(|guard| guard.join().map_err(|e| format!("{:?}", e)))
344342
.collect()
345343
}
346344
}

0 commit comments

Comments
 (0)