Skip to content

Commit 17a053b

Browse files
authored
Incorporate breaking changes from Timely's logging update (#558)
* Incorporate breaking changes from Timely's logging update TimelyDataflow/timely-dataflow#615 Signed-off-by: Moritz Hoffmann <[email protected]> * Back out of some changes Signed-off-by: Moritz Hoffmann <[email protected]> * Upgrade timely Signed-off-by: Moritz Hoffmann <[email protected]> * Fix compile errors Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent c3871fb commit 17a053b

File tree

10 files changed

+23
-22
lines changed

10 files changed

+23
-22
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ timely = {workspace = true}
4848
columnar = "0.2"
4949

5050
[workspace.dependencies]
51-
timely = { version = "0.15", default-features = false }
51+
timely = { version = "0.16", default-features = false }
5252
#timely = { path = "../timely-dataflow/timely/", default-features = false }
5353

5454
[features]

examples/bfs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use differential_dataflow::input::Input;
77
use differential_dataflow::Collection;
88
use differential_dataflow::operators::*;
99
use differential_dataflow::lattice::Lattice;
10-
use differential_dataflow::logging::DifferentialEvent;
10+
use differential_dataflow::logging::DifferentialEventBuilder;
1111

1212
type Node = u32;
1313
type Edge = (Node, Node);
@@ -30,7 +30,7 @@ fn main() {
3030
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
3131
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
3232
let mut logger = ::timely::logging::BatchLogger::new(writer);
33-
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
33+
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
3434
logger.publish_batch(time, data)
3535
);
3636
}

examples/dynamic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use differential_dataflow::input::Input;
77
use differential_dataflow::Collection;
88
use differential_dataflow::operators::*;
99
use differential_dataflow::lattice::Lattice;
10-
use differential_dataflow::logging::DifferentialEvent;
10+
use differential_dataflow::logging::DifferentialEventBuilder;
1111

1212
type Node = u32;
1313
type Edge = (Node, Node);
@@ -30,7 +30,7 @@ fn main() {
3030
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
3131
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
3232
let mut logger = ::timely::logging::BatchLogger::new(writer);
33-
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
33+
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
3434
logger.publish_batch(time, data)
3535
);
3636
}

interactive/src/manager.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::hash::Hash;
77
use timely::dataflow::ProbeHandle;
88
use timely::communication::Allocate;
99
use timely::worker::Worker;
10-
use timely::logging::TimelyEvent;
10+
use timely::logging::TimelyEventBuilder;
1111

1212
// use timely::dataflow::operators::capture::event::EventIterator;
1313

@@ -16,7 +16,7 @@ use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
1616
use differential_dataflow::operators::arrange::TraceAgent;
1717
use differential_dataflow::input::InputSession;
1818

19-
use differential_dataflow::logging::DifferentialEvent;
19+
use differential_dataflow::logging::DifferentialEventBuilder;
2020

2121
use crate::{Time, Diff, Plan, Datum};
2222

@@ -87,11 +87,11 @@ impl<V: ExchangeData+Datum> Manager<V>
8787
// Deregister loggers, so that the logging dataflows can shut down.
8888
worker
8989
.log_register()
90-
.insert::<TimelyEvent,_>("timely", move |_time, _data| { });
90+
.insert::<TimelyEventBuilder,_>("timely", move |_time, _data| { });
9191

9292
worker
9393
.log_register()
94-
.insert::<DifferentialEvent,_>("differential/arrange", move |_time, _data| { });
94+
.insert::<DifferentialEventBuilder,_>("differential/arrange", move |_time, _data| { });
9595
}
9696

9797
/// Inserts a new input session by name.
@@ -226,4 +226,4 @@ impl<V: ExchangeData+Hash+Datum> TraceManager<V> {
226226
.insert(keys.to_vec(), handle.clone());
227227
}
228228

229-
}
229+
}

src/logging.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
use columnar::Columnar;
44
use serde::{Deserialize, Serialize};
55

6+
/// Container builder for differential log events.
7+
pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;
8+
69
/// Logger for differential dataflow events.
7-
pub type Logger = ::timely::logging::Logger<DifferentialEvent>;
10+
pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;
811

912
/// Enables logging of differential dataflow events.
1013
pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
@@ -16,7 +19,7 @@ where
1619
let mut logger = ::timely::logging::BatchLogger::new(writer);
1720
worker
1821
.log_register()
19-
.insert::<DifferentialEvent,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
22+
.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
2023
}
2124

2225
/// Possible different differential events.

src/operators/arrange/arrangement.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ where
437437
// Acquire a logger for arrange events.
438438
let logger = {
439439
let register = scope.log_register();
440-
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
440+
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
441441
};
442442

443443
// Where we will deposit received updates, and from which we extract batches.

src/operators/arrange/upsert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ where
156156
let logger = {
157157
let scope = stream.scope();
158158
let register = scope.log_register();
159-
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
159+
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
160160
};
161161

162162
// Tracks the lower envelope of times in `priority_queue`.

src/operators/reduce.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ where
336336
let logger = {
337337
let scope = trace.stream.scope();
338338
let register = scope.log_register();
339-
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
339+
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
340340
};
341341

342342
let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));

src/trace/implementations/merge_batcher.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@
1212
1313
use std::marker::PhantomData;
1414

15-
use timely::logging_core::Logger;
1615
use timely::progress::frontier::AntichainRef;
1716
use timely::progress::{frontier::Antichain, Timestamp};
1817
use timely::Container;
1918
use timely::container::{ContainerBuilder, PushInto};
2019

21-
use crate::logging::{BatcherEvent, DifferentialEvent};
20+
use crate::logging::{BatcherEvent, Logger};
2221
use crate::trace::{Batcher, Builder, Description};
2322

2423
/// Creates batches from containers of unordered tuples.
@@ -41,7 +40,7 @@ pub struct MergeBatcher<Input, C, M: Merger> {
4140
/// The lower-bound frontier of the data, after the last call to seal.
4241
frontier: Antichain<M::Time>,
4342
/// Logger for size accounting.
44-
logger: Option<Logger<DifferentialEvent>>,
43+
logger: Option<Logger>,
4544
/// Timely operator ID.
4645
operator_id: usize,
4746
/// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
@@ -58,7 +57,7 @@ where
5857
type Time = M::Time;
5958
type Output = M::Chunk;
6059

61-
fn new(logger: Option<Logger<DifferentialEvent>>, operator_id: usize) -> Self {
60+
fn new(logger: Option<Logger>, operator_id: usize) -> Self {
6261
Self {
6362
logger,
6463
operator_id,

src/trace/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@ pub mod description;
1212
pub mod implementations;
1313
pub mod wrappers;
1414

15-
use timely::logging_core::Logger;
1615
use timely::progress::{Antichain, frontier::AntichainRef};
1716
use timely::progress::Timestamp;
1817

19-
use crate::logging::DifferentialEvent;
18+
use crate::logging::Logger;
2019
use crate::trace::cursor::IntoOwned;
2120
use crate::difference::Semigroup;
2221
use crate::lattice::Lattice;
@@ -309,7 +308,7 @@ pub trait Batcher {
309308
/// Times at which batches are formed.
310309
type Time: Timestamp;
311310
/// Allocates a new empty batcher.
312-
fn new(logger: Option<Logger<DifferentialEvent>>, operator_id: usize) -> Self;
311+
fn new(logger: Option<Logger>, operator_id: usize) -> Self;
313312
/// Adds an unordered container of elements to the batcher.
314313
fn push_container(&mut self, batch: &mut Self::Input);
315314
/// Returns all updates not greater or equal to an element of `upper`.

0 commit comments

Comments
 (0)