Skip to content

Commit ad6e22a

Browse files
committed
Cleanup
1 parent 1540baa commit ad6e22a

File tree

6 files changed

+34
-52
lines changed

6 files changed

+34
-52
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ serde_derive = "1"
2222
serde_json = "1"
2323
rmp-serde = "0.13.7"
2424
mio = "0.6.16"
25+
mio-extras = "2.0.5"
2526
slab = "0.4.1"
2627
ws = { git = "https://github.com/comnik/ws-rs" }
2728
log = "0.4"

src/bin/server.rs

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,12 @@
11
#[global_allocator]
22
static ALLOCATOR: jemallocator::Jemalloc = jemallocator::Jemalloc;
33

4-
extern crate declarative_dataflow;
5-
extern crate differential_dataflow;
6-
extern crate getopts;
7-
extern crate mio;
84
#[macro_use]
95
extern crate serde_derive;
10-
extern crate serde_json;
11-
extern crate slab;
12-
extern crate timely;
13-
extern crate ws;
14-
15-
#[macro_use]
16-
extern crate log;
17-
extern crate env_logger;
18-
196
#[macro_use]
207
extern crate abomonation_derive;
21-
extern crate abomonation;
8+
#[macro_use]
9+
extern crate log;
2210

2311
use std::collections::{HashSet, VecDeque};
2412
use std::io::BufRead;
@@ -91,8 +79,9 @@ fn main() {
9179

9280
let args: Vec<String> = std::env::args().collect();
9381
let timely_args = std::env::args().take_while(|ref arg| *arg != "--");
82+
let timely_config = timely::Configuration::from_args(timely_args).unwrap();
9483

95-
timely::execute_from_args(timely_args, move |worker| {
84+
timely::execute(timely_config, move |worker| {
9685
// read configuration
9786
let server_args = args.iter().rev().take_while(|arg| *arg != "--");
9887
let default_config: Config = Default::default();
@@ -139,13 +128,13 @@ fn main() {
139128
};
140129

141130
// setup CLI channel
142-
let (send_cli, recv_cli) = mio::channel::channel();
131+
let (send_cli, recv_cli) = mio_extras::channel::channel();
143132

144133
// setup results channel
145-
let (send_results, recv_results) = mio::channel::channel::<(String, Vec<ResultDiff<T>>)>();
134+
let (send_results, recv_results) = mio_extras::channel::channel::<(String, Vec<ResultDiff<T>>)>();
146135

147136
// setup errors channel
148-
let (send_errors, recv_errors) = mio::channel::channel::<(Vec<Token>, Vec<(Error, TxId)>)>();
137+
let (send_errors, recv_errors) = mio_extras::channel::channel::<(Vec<Token>, Vec<(Error, TxId)>)>();
149138

150139
// setup server socket
151140
// let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), config.port);
@@ -416,14 +405,12 @@ fn main() {
416405
_ => {
417406
let token = event.token();
418407
let active = {
419-
let readiness = event.readiness();
420-
let conn_events = connections[token.into()].events();
408+
let event_readiness = event.readiness();
409+
let conn_readiness = connections[token.into()].events();
421410

422-
// @TODO refactor connection to accept a
423-
// vector in which to place events and
424-
// rename conn_events to avoid name clash
411+
// @TODO refactor connection to accept a vector in which to place events
425412

426-
if (readiness & conn_events).is_readable() {
413+
if (event_readiness & conn_readiness).is_readable() {
427414
match connections[token.into()].read() {
428415
Err(err) => {
429416
trace!(
@@ -493,9 +480,9 @@ fn main() {
493480
}
494481
}
495482

496-
let conn_events = connections[token.into()].events();
483+
let conn_readiness = connections[token.into()].events();
497484

498-
if (readiness & conn_events).is_writable() {
485+
if (event_readiness & conn_readiness).is_writable() {
499486
if let Err(err) = connections[token.into()].write() {
500487
trace!(
501488
"[WORKER {}] error while writing: {}",
@@ -589,7 +576,7 @@ fn main() {
589576
.consolidate()
590577
}
591578
#[cfg(not(feature = "real-time"))]
592-
Some(delay) => {
579+
Some(granularity) => {
593580
relation
594581
.delay(move |t| (t/granularity + 1) * granularity)
595582
.consolidate()

src/domain/mod.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -243,18 +243,16 @@ where
243243

244244
self.forward
245245
.get_mut(aid)
246-
.expect(&format!(
247-
"Configuration available for unknown attribute {}",
248-
aid
249-
))
246+
.unwrap_or_else(|| {
247+
panic!("Configuration available for unknown attribute {}", aid)
248+
})
250249
.advance_by(frontier);
251250

252251
self.reverse
253252
.get_mut(aid)
254-
.expect(&format!(
255-
"Configuration available for unknown attribute {}",
256-
aid
257-
))
253+
.unwrap_or_else(|| {
254+
panic!("Configuration available for unknown attribute {}", aid)
255+
})
258256
.advance_by(frontier);
259257
}
260258
}
@@ -265,10 +263,9 @@ where
265263

266264
self.arrangements
267265
.get_mut(name)
268-
.expect(&format!(
269-
"Configuration available for unknown relation {}",
270-
name
271-
))
266+
.unwrap_or_else(|| {
267+
panic!("Configuration available for unknown relation {}", name)
268+
})
272269
.advance_by(frontier);
273270
}
274271
}

src/server/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::collections::{HashMap, HashSet};
55
use std::hash::Hash;
66
use std::ops::Sub;
77
use std::rc::Rc;
8-
use std::time::{Duration, Instant};
8+
use std::time::Instant;
99

1010
use timely::dataflow::{ProbeHandle, Scope};
1111
use timely::order::TotalOrder;
@@ -420,9 +420,9 @@ impl<Token: Hash> Server<u64, Token> {
420420
}
421421

422422
#[cfg(feature = "real-time")]
423-
impl<Token: Hash> Server<Duration, Token> {
423+
impl<Token: Hash> Server<std::time::Duration, Token> {
424424
/// Handle a RegisterSource request.
425-
pub fn register_source<S: Scope<Timestamp = Duration>>(
425+
pub fn register_source<S: Scope<Timestamp = std::time::Duration>>(
426426
&mut self,
427427
source: Source,
428428
scope: &mut S,
@@ -437,7 +437,7 @@ impl<Token: Hash> Server<Duration, Token> {
437437
}
438438

439439
/// Handle a RegisterSink request.
440-
pub fn register_sink<S: Scope<Timestamp = Duration>>(
440+
pub fn register_sink<S: Scope<Timestamp = std::time::Duration>>(
441441
&mut self,
442442
req: RegisterSink,
443443
scope: &mut S,

src/server/scheduler.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use timely::scheduling::activate::Activator;
1212
/// (optionally) block for input from within the event loop without
1313
/// unnecessarily delaying sources that have run out of fuel during
1414
/// the current step.
15+
#[derive(Default)]
1516
pub struct Scheduler {
1617
activator_queue: BinaryHeap<TimedActivator>,
1718
}
@@ -28,11 +29,7 @@ impl Scheduler {
2829
/// scheduled.
2930
pub fn has_pending(&self) -> bool {
3031
if let Some(ref timed_activator) = self.activator_queue.peek() {
31-
if Instant::now() <= timed_activator.at {
32-
true
33-
} else {
34-
false
35-
}
32+
Instant::now() <= timed_activator.at
3633
} else {
3734
false
3835
}

src/sources/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use std::cell::RefCell;
44
use std::rc::Weak;
5-
use std::time::{Duration, Instant};
5+
use std::time::Instant;
66

77
use timely::dataflow::{Scope, Stream};
88
use timely::order::TotalOrder;
@@ -53,13 +53,13 @@ pub enum Source {
5353
}
5454

5555
#[cfg(feature = "real-time")]
56-
impl Sourceable<Duration> for Source {
57-
fn source<S: Scope<Timestamp = Duration>>(
56+
impl Sourceable<std::time::Duration> for Source {
57+
fn source<S: Scope<Timestamp = std::time::Duration>>(
5858
&self,
5959
scope: &mut S,
6060
t0: Instant,
6161
scheduler: Weak<RefCell<Scheduler>>,
62-
) -> Vec<(Aid, Stream<S, ((Value, Value), Duration, isize)>)> {
62+
) -> Vec<(Aid, Stream<S, ((Value, Value), std::time::Duration, isize)>)> {
6363
match *self {
6464
Source::TimelyLogging(ref source) => source.source(scope, t0, scheduler),
6565
Source::DifferentialLogging(ref source) => source.source(scope, t0, scheduler),

0 commit comments

Comments
 (0)