Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ resolver = "2"

[workspace.dependencies]
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.14.2" }
timely = { version = "0.20", default-features = false }
timely = { version = "0.21", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[profile.release]
Expand Down
17 changes: 0 additions & 17 deletions differential-dataflow/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEventBuilder;

type Node = u32;
type Edge = (Node, Node);
Expand All @@ -23,22 +22,6 @@ fn main() {
// define a new computational scope, in which to run BFS
timely::execute_from_args(std::env::args(), move |worker| {

if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") {

eprintln!("enabled DIFFERENTIAL logging to {}", addr);

if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
logger.publish_batch(time, data)
);
}
else {
panic!("Could not connect to differential log address: {:?}", addr);
}
}

let timer = ::std::time::Instant::now();

// define BFS dataflow; return handles to roots and edges inputs
Expand Down
17 changes: 0 additions & 17 deletions differential-dataflow/examples/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEventBuilder;

type Node = u32;
type Edge = (Node, Node);
Expand All @@ -23,22 +22,6 @@ fn main() {
// define a new computational scope, in which to run BFS
timely::execute_from_args(std::env::args(), move |worker| {

if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") {

eprintln!("enabled DIFFERENTIAL logging to {}", addr);

if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
logger.publish_batch(time, data)
);
}
else {
panic!("Could not connect to differential log address: {:?}", addr);
}
}

let timer = ::std::time::Instant::now();

// define BFS dataflow; return handles to roots and edges inputs
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/itembased_cf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn main() {
.map(|(user, item)| (*user, *item))
.collect();

let timer = worker.timer();
let timer = worker.timer().unwrap();


for (user, item) in synthetic_interactions.iter() {
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() {

let peers = worker.peers();
let index = worker.index();
let timer = worker.timer();
let timer = worker.timer().unwrap();

let mut input = InputSession::new();
let mut probe = ProbeHandle::new();
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() {

timely::execute_from_args(std::env::args(), move |worker| {

let timer = worker.timer();
let timer = worker.timer().unwrap();
let mut probe = Handle::new();

let (mut nodes, mut edges, mut times) = worker.dataflow::<usize,_,_>(|scope| {
Expand Down
10 changes: 5 additions & 5 deletions differential-dataflow/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ where
A: timely::communication::Allocate,
W: std::io::Write+'static,
{
let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker
.log_register()
.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
worker.log_register().and_then(|mut log_register| {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
let mut logger = ::timely::logging::BatchLogger::new(writer);
log_register.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
})
}

/// Possible different differential events.
Expand Down
5 changes: 1 addition & 4 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,7 @@ where
let stream = stream.unary_frontier(pact, name, move |_capability, info| {

// Acquire a logger for arrange events.
let logger = {
let register = scope.log_register();
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
};
let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);

// Where we will deposit received updates, and from which we extract batches.
let mut batcher = Ba::new(logger.clone(), info.global_id);
Expand Down
6 changes: 1 addition & 5 deletions differential-dataflow/src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ where
stream.unary_frontier(exchange, name, move |_capability, info| {

// Acquire a logger for arrange events.
let logger = {
let scope = stream.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
};
let logger = stream.scope().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);

// Tracks the lower envelope of times in `priority_queue`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
Expand Down
7 changes: 2 additions & 5 deletions differential-dataflow/src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,8 @@ where
let result_trace = &mut result_trace;
trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {

let logger = {
let scope = trace.stream.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
};
// Acquire a logger for arrange events.
let logger = trace.stream.scope().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);

let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
Expand Down
4 changes: 2 additions & 2 deletions interactive/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ impl<V: ExchangeData+Datum> Manager<V>
// Deregister loggers, so that the logging dataflows can shut down.
worker
.log_register()
.insert::<TimelyEventBuilder,_>("timely", move |_time, _data| { });
.map(|mut log_register| log_register.insert::<TimelyEventBuilder,_>("timely", move |_time, _data| { }));

worker
.log_register()
.insert::<DifferentialEventBuilder,_>("differential/arrange", move |_time, _data| { });
.map(|mut log_register| log_register.insert::<DifferentialEventBuilder,_>("differential/arrange", move |_time, _data| { }));
}

/// Inserts a new input session by name.
Expand Down