Skip to content

Commit 09994a8

Browse files
authored
Typed logging (#624)
Convert the progress and reachability logs to typed logs, which do not require casting at runtime. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent c429faf commit 09994a8

File tree

7 files changed

+68
-150
lines changed

7 files changed

+68
-150
lines changed

timely/examples/logging-send.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use timely::dataflow::{InputHandle, ProbeHandle};
44
use timely::dataflow::operators::{Input, Exchange, Probe};
55
use timely::logging::{TimelyEventBuilder, TimelyProgressEventBuilder};
66
use timely::container::CapacityContainerBuilder;
7+
use timely::progress::reachability::logging::TrackerEventBuilder;
78

89
fn main() {
910
// initializes and runs a timely dataflow.
@@ -27,19 +28,19 @@ fn main() {
2728
// Register timely progress logging.
2829
// Less generally useful: intended for debugging advanced custom operators or timely
2930
// internals.
30-
worker.log_register().insert::<TimelyProgressEventBuilder,_>("timely/progress", |time, data|
31+
worker.log_register().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
3132
if let Some(data) = data {
3233
data.iter().for_each(|x| {
3334
println!("PROGRESS: {:?}", x);
3435
let (_, ev) = x;
3536
print!("PROGRESS: TYPED MESSAGES: ");
3637
for (n, p, t, d) in ev.messages.iter() {
37-
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
38+
print!("{:?}, ", (n, p, t, d));
3839
}
3940
println!();
4041
print!("PROGRESS: TYPED INTERNAL: ");
4142
for (n, p, t, d) in ev.internal.iter() {
42-
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
43+
print!("{:?}, ", (n, p, t, d));
4344
}
4445
println!();
4546
})
@@ -49,6 +50,17 @@ fn main() {
4950
}
5051
);
5152

53+
worker.log_register().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
54+
if let Some(data) = data {
55+
data.iter().for_each(|x| {
56+
println!("REACHABILITY: {:?}", x);
57+
})
58+
}
59+
else {
60+
println!("REACHABILITY: Flush {time:?}");
61+
}
62+
);
63+
5264
// create a new input, exchange data, and inspect its output
5365
worker.dataflow(|scope| {
5466
scope

timely/src/dataflow/scopes/child.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ where
3434
/// The log writer for this scope.
3535
pub logging: Option<Logger>,
3636
/// The progress log writer for this scope.
37-
pub progress_logging: Option<ProgressLogger>,
37+
pub progress_logging: Option<ProgressLogger<T>>,
3838
}
3939

4040
impl<G, T> Child<'_, G, T>
@@ -130,13 +130,16 @@ where
130130
let index = self.subgraph.borrow_mut().allocate_child_id();
131131
let path = self.addr_for_child(index);
132132

133-
let subscope = RefCell::new(SubgraphBuilder::new_from(path, self.logging(), self.progress_logging.clone(), name));
133+
let type_name = std::any::type_name::<T2>();
134+
let progress_logging = self.log_register().get(&format!("timely/progress/{type_name}"));
135+
136+
let subscope = RefCell::new(SubgraphBuilder::new_from(path, self.logging(), name));
134137
let result = {
135138
let mut builder = Child {
136139
subgraph: &subscope,
137140
parent: self.clone(),
138141
logging: self.logging.clone(),
139-
progress_logging: self.progress_logging.clone(),
142+
progress_logging,
140143
};
141144
func(&mut builder)
142145
};

timely/src/logging.rs

Lines changed: 6 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ pub type TimelyEventBuilder = CapacityContainerBuilder<Vec<(Duration, TimelyEven
77
/// Logger for timely dataflow system events.
88
pub type TimelyLogger = crate::logging_core::TypedLogger<TimelyEventBuilder, TimelyEvent>;
99
/// Container builder for timely dataflow progress events.
10-
pub type TimelyProgressEventBuilder = CapacityContainerBuilder<Vec<(Duration, TimelyProgressEvent)>>;
10+
pub type TimelyProgressEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TimelyProgressEvent<T>)>>;
1111
/// Logger for timely dataflow progress events (the "timely/progress" log stream).
12-
pub type TimelyProgressLogger = crate::logging_core::Logger<TimelyProgressEventBuilder>;
12+
pub type TimelyProgressLogger<T> = crate::logging_core::Logger<TimelyProgressEventBuilder<T>>;
1313

1414
use std::time::Duration;
1515
use columnar::Columnar;
@@ -78,70 +78,9 @@ pub struct ChannelsEvent {
7878
pub target: (usize, usize),
7979
}
8080

81-
/// Encapsulates Any and Debug for dynamically typed timestamps in logs
82-
pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any {
83-
/// Upcasts this `ProgressEventTimestamp` to `Any`.
84-
///
85-
/// NOTE: This is required until <https://github.com/rust-lang/rfcs/issues/2765> is fixed
86-
///
87-
/// # Example
88-
/// ```rust
89-
/// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)];
90-
/// let ts: &dyn timely::logging::ProgressEventTimestampVec = &ts;
91-
/// for (n, p, t, d) in ts.iter() {
92-
/// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d));
93-
/// }
94-
/// println!();
95-
/// ```
96-
fn as_any(&self) -> &dyn std::any::Any;
97-
98-
/// Returns the name of the concrete type of this object.
99-
///
100-
/// # Note
101-
///
102-
/// This is intended for diagnostic use. The exact contents and format of the
103-
/// string returned are not specified, other than being a best-effort
104-
/// description of the type. For example, amongst the strings
105-
/// that `type_name::<Option<String>>()` might return are `"Option<String>"` and
106-
/// `"std::option::Option<std::string::String>"`.
107-
fn type_name(&self) -> &'static str;
108-
}
109-
impl<T: crate::Data + std::fmt::Debug + std::any::Any> ProgressEventTimestamp for T {
110-
fn as_any(&self) -> &dyn std::any::Any { self }
111-
112-
fn type_name(&self) -> &'static str { std::any::type_name::<T>() }
113-
}
114-
115-
/// A vector of progress updates in logs
116-
///
117-
/// This exists to support upcasting of the concrecte progress update vectors to
118-
/// `dyn ProgressEventTimestamp`. Doing so at the vector granularity allows us to
119-
/// use a single allocation for the entire vector (as opposed to a `Box` allocation
120-
/// for each dynamically typed element).
121-
pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any {
122-
/// Iterate over the contents of the vector
123-
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;
124-
125-
/// Clone self into a boxed trait object.
126-
fn box_clone(&self) -> Box<dyn ProgressEventTimestampVec>;
127-
}
128-
129-
impl<T: ProgressEventTimestamp + Clone> ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> {
130-
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
131-
Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| {
132-
let t: &dyn ProgressEventTimestamp = t;
133-
(n, p, t, d)
134-
}))
135-
}
136-
137-
fn box_clone(&self) -> Box<dyn ProgressEventTimestampVec> {
138-
Box::new(self.clone())
139-
}
140-
}
141-
142-
#[derive(Debug)]
81+
#[derive(Debug, Clone)]
14382
/// Send or receive of progress information.
144-
pub struct TimelyProgressEvent {
83+
pub struct TimelyProgressEvent<T> {
14584
/// `true` if the event is a send, and `false` if it is a receive.
14685
pub is_send: bool,
14786
/// Source worker index.
@@ -153,23 +92,9 @@ pub struct TimelyProgressEvent {
15392
/// Sequence of nested scope identifiers indicating the path from the root to this instance.
15493
pub addr: Vec<usize>,
15594
/// List of message updates, containing Target descriptor, timestamp as string, and delta.
156-
pub messages: Box<dyn ProgressEventTimestampVec>,
95+
pub messages: Vec<(usize, usize, T, i64)>,
15796
/// List of capability updates, containing Source descriptor, timestamp as string, and delta.
158-
pub internal: Box<dyn ProgressEventTimestampVec>,
159-
}
160-
161-
impl Clone for TimelyProgressEvent {
162-
fn clone(&self) -> Self {
163-
Self {
164-
is_send: self.is_send,
165-
source: self.source,
166-
channel: self.channel,
167-
seq_no: self.seq_no,
168-
addr: self.addr.clone(),
169-
messages: self.messages.box_clone(),
170-
internal: self.internal.box_clone(),
171-
}
172-
}
97+
pub internal: Vec<(usize, usize, T, i64)>,
17398
}
17499

175100
#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]

timely/src/progress/broadcast.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ pub struct Progcaster<T:Timestamp> {
2828
/// Communication channel identifier
2929
channel_identifier: usize,
3030

31-
progress_logging: Option<ProgressLogger>,
31+
progress_logging: Option<ProgressLogger<T>>,
3232
}
3333

3434
impl<T:Timestamp+Send> Progcaster<T> {
3535
/// Creates a new `Progcaster` using a channel from the supplied worker.
36-
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {
36+
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger<T>>) -> Progcaster<T> {
3737

3838
let channel_identifier = worker.new_identifier();
3939
let (pushers, puller) = worker.allocate(channel_identifier, addr.clone());
@@ -65,8 +65,8 @@ impl<T:Timestamp+Send> Progcaster<T> {
6565
// Pre-allocate enough space; we transfer ownership, so there is not
6666
// an opportunity to re-use allocations (w/o changing the logging
6767
// interface to accept references).
68-
let mut messages = Box::new(Vec::with_capacity(changes.len()));
69-
let mut internal = Box::new(Vec::with_capacity(changes.len()));
68+
let mut messages = Vec::with_capacity(changes.len());
69+
let mut internal = Vec::with_capacity(changes.len());
7070

7171
for ((location, time), diff) in changes.iter() {
7272
match location.port {
@@ -134,8 +134,8 @@ impl<T:Timestamp+Send> Progcaster<T> {
134134
// options for improving it if performance limits users who want other logging.
135135
self.progress_logging.as_ref().map(|l| {
136136

137-
let mut messages = Box::new(Vec::with_capacity(changes.len()));
138-
let mut internal = Box::new(Vec::with_capacity(changes.len()));
137+
let mut messages = Vec::with_capacity(changes.len());
138+
let mut internal = Vec::with_capacity(changes.len());
139139

140140
for ((location, time), diff) in recv_changes.iter() {
141141

timely/src/progress/reachability.rs

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ impl<T: Timestamp> Builder<T> {
195195
/// default summaries (a serious liveness issue).
196196
///
197197
/// The optional logger information is baked into the resulting tracker.
198-
pub fn build(self, logger: Option<logging::TrackerLogger>) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {
198+
pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {
199199

200200
if !self.is_acyclic() {
201201
println!("Cycle detected without timestamp increment");
@@ -404,7 +404,7 @@ pub struct Tracker<T:Timestamp> {
404404
total_counts: i64,
405405

406406
/// Optionally, a unique logging identifier and logging for tracking events.
407-
logger: Option<logging::TrackerLogger>,
407+
logger: Option<logging::TrackerLogger<T>>,
408408
}
409409

410410
/// Target and source information for each operator.
@@ -503,7 +503,7 @@ impl<T:Timestamp> Tracker<T> {
503503
/// output port.
504504
///
505505
/// If the optional logger is provided, it will be used to log various tracker events.
506-
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {
506+
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {
507507

508508
// Allocate buffer space for each input and input port.
509509
let mut per_operator =
@@ -577,7 +577,7 @@ impl<T:Timestamp> Tracker<T> {
577577
.collect::<Vec<_>>();
578578

579579
if !target_changes.is_empty() {
580-
logger.log_target_updates(Box::new(target_changes));
580+
logger.log_target_updates(target_changes);
581581
}
582582

583583
let source_changes =
@@ -587,7 +587,7 @@ impl<T:Timestamp> Tracker<T> {
587587
.collect::<Vec<_>>();
588588

589589
if !source_changes.is_empty() {
590-
logger.log_source_updates(Box::new(source_changes));
590+
logger.log_source_updates(source_changes);
591591
}
592592
}
593593

@@ -837,25 +837,24 @@ pub mod logging {
837837
use timely_container::CapacityContainerBuilder;
838838
use timely_logging::TypedLogger;
839839
use crate::logging_core::Logger;
840-
use crate::logging::ProgressEventTimestampVec;
841840

842841
/// A container builder for tracker events.
843-
pub type TrackerEventBuilder = CapacityContainerBuilder<Vec<(Duration, TrackerEvent)>>;
842+
pub type TrackerEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TrackerEvent<T>)>>;
844843

845844
/// A logger with additional identifying information about the tracker.
846-
pub struct TrackerLogger {
845+
pub struct TrackerLogger<T: Clone + 'static> {
847846
path: Rc<[usize]>,
848-
logger: TypedLogger<TrackerEventBuilder, TrackerEvent>,
847+
logger: TypedLogger<TrackerEventBuilder<T>, TrackerEvent<T>>,
849848
}
850849

851-
impl TrackerLogger {
850+
impl<T: Clone + 'static> TrackerLogger<T> {
852851
/// Create a new tracker logger from its fields.
853-
pub fn new(path: Rc<[usize]>, logger: Logger<TrackerEventBuilder>) -> Self {
852+
pub fn new(path: Rc<[usize]>, logger: Logger<TrackerEventBuilder<T>>) -> Self {
854853
Self { path, logger: logger.into() }
855854
}
856855

857856
/// Log source update events with additional identifying information.
858-
pub fn log_source_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
857+
pub fn log_source_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) {
859858
self.logger.log({
860859
SourceUpdate {
861860
tracker_id: self.path.to_vec(),
@@ -864,7 +863,7 @@ pub mod logging {
864863
})
865864
}
866865
/// Log target update events with additional identifying information.
867-
pub fn log_target_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
866+
pub fn log_target_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) {
868867
self.logger.log({
869868
TargetUpdate {
870869
tracker_id: self.path.to_vec(),
@@ -876,55 +875,37 @@ pub mod logging {
876875

877876
/// Events that the tracker may record.
878877
#[derive(Debug, Clone)]
879-
pub enum TrackerEvent {
878+
pub enum TrackerEvent<T> {
880879
/// Updates made at a source of data.
881-
SourceUpdate(SourceUpdate),
880+
SourceUpdate(SourceUpdate<T>),
882881
/// Updates made at a target of data.
883-
TargetUpdate(TargetUpdate),
882+
TargetUpdate(TargetUpdate<T>),
884883
}
885884

886885
/// An update made at a source of data.
887-
#[derive(Debug)]
888-
pub struct SourceUpdate {
886+
#[derive(Debug, Clone)]
887+
pub struct SourceUpdate<T> {
889888
/// An identifier for the tracker.
890889
pub tracker_id: Vec<usize>,
891890
/// Updates themselves, as `(node, port, time, diff)`.
892-
pub updates: Box<dyn ProgressEventTimestampVec>,
893-
}
894-
895-
impl Clone for SourceUpdate {
896-
fn clone(&self) -> Self {
897-
Self {
898-
tracker_id: self.tracker_id.clone(),
899-
updates: self.updates.box_clone(),
900-
}
901-
}
891+
pub updates: Vec<(usize, usize, T, i64)>,
902892
}
903893

904894
/// An update made at a target of data.
905-
#[derive(Debug)]
906-
pub struct TargetUpdate {
895+
#[derive(Debug, Clone)]
896+
pub struct TargetUpdate<T> {
907897
/// An identifier for the tracker.
908898
pub tracker_id: Vec<usize>,
909899
/// Updates themselves, as `(node, port, time, diff)`.
910-
pub updates: Box<dyn ProgressEventTimestampVec>,
911-
}
912-
913-
impl Clone for TargetUpdate {
914-
fn clone(&self) -> Self {
915-
Self {
916-
tracker_id: self.tracker_id.clone(),
917-
updates: self.updates.box_clone(),
918-
}
919-
}
900+
pub updates: Vec<(usize, usize, T, i64)>,
920901
}
921902

922-
impl From<SourceUpdate> for TrackerEvent {
923-
fn from(v: SourceUpdate) -> TrackerEvent { TrackerEvent::SourceUpdate(v) }
903+
impl<T> From<SourceUpdate<T>> for TrackerEvent<T> {
904+
fn from(v: SourceUpdate<T>) -> TrackerEvent<T> { TrackerEvent::SourceUpdate(v) }
924905
}
925906

926-
impl From<TargetUpdate> for TrackerEvent {
927-
fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) }
907+
impl<T> From<TargetUpdate<T>> for TrackerEvent<T> {
908+
fn from(v: TargetUpdate<T>) -> TrackerEvent<T> { TrackerEvent::TargetUpdate(v) }
928909
}
929910
}
930911

@@ -953,7 +934,7 @@ impl<T: Timestamp> Drop for Tracker<T> {
953934
})
954935
.collect::<Vec<_>>();
955936
if !target_changes.is_empty() {
956-
logger.log_target_updates(Box::new(target_changes));
937+
logger.log_target_updates(target_changes);
957938
}
958939

959940
let source_changes = per_operator.sources
@@ -966,7 +947,7 @@ impl<T: Timestamp> Drop for Tracker<T> {
966947
})
967948
.collect::<Vec<_>>();
968949
if !source_changes.is_empty() {
969-
logger.log_source_updates(Box::new(source_changes));
950+
logger.log_source_updates(source_changes);
970951
}
971952
}
972953
}

0 commit comments

Comments
 (0)