Skip to content

Commit 98e8bc4

Browse files
Update timely dependence to 0.21. (#599)
* Update to timely 0.21 * Tidy some logging uses
1 parent 3af2ba4 commit 98e8bc4

File tree

11 files changed

+15
-59
lines changed

11 files changed

+15
-59
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ resolver = "2"
1717

1818
[workspace.dependencies]
1919
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.14.2" }
20-
timely = { version = "0.20", default-features = false }
20+
timely = { version = "0.21", default-features = false }
2121
#timely = { path = "../timely-dataflow/timely/", default-features = false }
2222

2323
[profile.release]

differential-dataflow/examples/bfs.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use differential_dataflow::input::Input;
77
use differential_dataflow::Collection;
88
use differential_dataflow::operators::*;
99
use differential_dataflow::lattice::Lattice;
10-
use differential_dataflow::logging::DifferentialEventBuilder;
1110

1211
type Node = u32;
1312
type Edge = (Node, Node);
@@ -23,22 +22,6 @@ fn main() {
2322
// define a new computational scope, in which to run BFS
2423
timely::execute_from_args(std::env::args(), move |worker| {
2524

26-
if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") {
27-
28-
eprintln!("enabled DIFFERENTIAL logging to {}", addr);
29-
30-
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
31-
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
32-
let mut logger = ::timely::logging::BatchLogger::new(writer);
33-
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
34-
logger.publish_batch(time, data)
35-
);
36-
}
37-
else {
38-
panic!("Could not connect to differential log address: {:?}", addr);
39-
}
40-
}
41-
4225
let timer = ::std::time::Instant::now();
4326

4427
// define BFS dataflow; return handles to roots and edges inputs

differential-dataflow/examples/dynamic.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use differential_dataflow::input::Input;
77
use differential_dataflow::Collection;
88
use differential_dataflow::operators::*;
99
use differential_dataflow::lattice::Lattice;
10-
use differential_dataflow::logging::DifferentialEventBuilder;
1110

1211
type Node = u32;
1312
type Edge = (Node, Node);
@@ -23,22 +22,6 @@ fn main() {
2322
// define a new computational scope, in which to run BFS
2423
timely::execute_from_args(std::env::args(), move |worker| {
2524

26-
if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") {
27-
28-
eprintln!("enabled DIFFERENTIAL logging to {}", addr);
29-
30-
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
31-
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
32-
let mut logger = ::timely::logging::BatchLogger::new(writer);
33-
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
34-
logger.publish_batch(time, data)
35-
);
36-
}
37-
else {
38-
panic!("Could not connect to differential log address: {:?}", addr);
39-
}
40-
}
41-
4225
let timer = ::std::time::Instant::now();
4326

4427
// define BFS dataflow; return handles to roots and edges inputs

differential-dataflow/examples/itembased_cf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ fn main() {
9494
.map(|(user, item)| (*user, *item))
9595
.collect();
9696

97-
let timer = worker.timer();
97+
let timer = worker.timer().unwrap();
9898

9999

100100
for (user, item) in synthetic_interactions.iter() {

differential-dataflow/examples/pagerank.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ fn main() {
2626

2727
let peers = worker.peers();
2828
let index = worker.index();
29-
let timer = worker.timer();
29+
let timer = worker.timer().unwrap();
3030

3131
let mut input = InputSession::new();
3232
let mut probe = ProbeHandle::new();

differential-dataflow/examples/progress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ fn main() {
1717

1818
timely::execute_from_args(std::env::args(), move |worker| {
1919

20-
let timer = worker.timer();
20+
let timer = worker.timer().unwrap();
2121
let mut probe = Handle::new();
2222

2323
let (mut nodes, mut edges, mut times) = worker.dataflow::<usize,_,_>(|scope| {

differential-dataflow/src/logging.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ where
1515
A: timely::communication::Allocate,
1616
W: std::io::Write+'static,
1717
{
18-
let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
19-
let mut logger = ::timely::logging::BatchLogger::new(writer);
20-
worker
21-
.log_register()
22-
.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
18+
worker.log_register().and_then(|mut log_register| {
19+
let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
20+
let mut logger = ::timely::logging::BatchLogger::new(writer);
21+
log_register.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
22+
})
2323
}
2424

2525
/// Possible different differential events.

differential-dataflow/src/operators/arrange/arrangement.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -433,10 +433,7 @@ where
433433
let stream = stream.unary_frontier(pact, name, move |_capability, info| {
434434

435435
// Acquire a logger for arrange events.
436-
let logger = {
437-
let register = scope.log_register();
438-
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
439-
};
436+
let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
440437

441438
// Where we will deposit received updates, and from which we extract batches.
442439
let mut batcher = Ba::new(logger.clone(), info.global_id);

differential-dataflow/src/operators/arrange/upsert.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,11 +152,7 @@ where
152152
stream.unary_frontier(exchange, name, move |_capability, info| {
153153

154154
// Acquire a logger for arrange events.
155-
let logger = {
156-
let scope = stream.scope();
157-
let register = scope.log_register();
158-
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
159-
};
155+
let logger = stream.scope().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
160156

161157
// Tracks the lower envelope of times in `priority_queue`.
162158
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

differential-dataflow/src/operators/reduce.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -331,11 +331,8 @@ where
331331
let result_trace = &mut result_trace;
332332
trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
333333

334-
let logger = {
335-
let scope = trace.stream.scope();
336-
let register = scope.log_register();
337-
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
338-
};
334+
// Acquire a logger for arrange events.
335+
let logger = trace.stream.scope().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
339336

340337
let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
341338
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);

0 commit comments

Comments
 (0)