Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::time::Duration;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Probe};
use timely::logging::{TimelyEventBuilder, TimelyProgressEventBuilder};
use timely::logging::{TimelyEventBuilder, TimelyProgressEventBuilder, TimelySummaryEventBuilder};
use timely::container::CapacityContainerBuilder;
use timely::progress::reachability::logging::TrackerEventBuilder;

Expand Down Expand Up @@ -61,6 +61,17 @@ fn main() {
}
);

worker.log_register().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
if let Some(data) = data {
data.iter().for_each(|(_, x)| {
println!("SUMMARY: {:?}", x);
})
}
else {
println!("SUMMARY: Flush {time:?}");
}
);

// create a new input, exchange data, and inspect its output
worker.dataflow(|scope| {
scope
Expand Down
3 changes: 2 additions & 1 deletion timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ where

let type_name = std::any::type_name::<T2>();
let progress_logging = self.log_register().get(&format!("timely/progress/{type_name}"));
let summary_logging = self.log_register().get(&format!("timely/summary/{type_name}"));

let subscope = RefCell::new(SubgraphBuilder::new_from(path, self.logging(), name));
let subscope = RefCell::new(SubgraphBuilder::new_from(path, self.logging(), summary_logging, name));
let result = {
let mut builder = Child {
subgraph: &subscope,
Expand Down
14 changes: 14 additions & 0 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ pub type TimelyLogger = crate::logging_core::TypedLogger<TimelyEventBuilder, Tim
pub type TimelyProgressEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TimelyProgressEvent<T>)>>;
/// Logger for timely dataflow progress events (the "timely/progress" log stream).
pub type TimelyProgressLogger<T> = crate::logging_core::Logger<TimelyProgressEventBuilder<T>>;
/// Container builder for timely dataflow progress events.
pub type TimelySummaryEventBuilder<TS> = CapacityContainerBuilder<Vec<(Duration, OperatesSummaryEvent<TS>)>>;
/// Logger for timely dataflow progress events (the "timely/progress" log stream).
pub type TimelySummaryLogger<TS> = crate::logging_core::Logger<TimelySummaryEventBuilder<TS>>;

use std::time::Duration;
use columnar::Columnar;
Expand Down Expand Up @@ -65,6 +69,16 @@ pub struct OperatesEvent {
pub name: String,
}


#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Eq, PartialEq)]
/// The summary of internal connectivity of an `Operate` implementor.
pub struct OperatesSummaryEvent<TS> {
/// Worker-unique identifier for the operator.
pub id: usize,
/// Timestamp action summaries for (input, output) pairs.
pub summary: Vec<Vec<crate::progress::Antichain<TS>>>,
}

#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// The creation of a channel between operators.
pub struct ChannelsEvent {
Expand Down
17 changes: 15 additions & 2 deletions timely/src/progress/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::collections::BinaryHeap;
use std::cmp::Reverse;

use crate::logging::{TimelyLogger as Logger, TimelyProgressEventBuilder};
use crate::logging::TimelySummaryLogger as SummaryLogger;

use crate::scheduling::Schedule;
use crate::scheduling::activate::Activations;
Expand Down Expand Up @@ -63,6 +64,8 @@ where

/// Logging handle
logging: Option<Logger>,
/// Typed logging handle for operator summaries.
summary_logging: Option<SummaryLogger<TInner::Summary>>,
}

impl<TOuter, TInner> SubgraphBuilder<TOuter, TInner>
Expand Down Expand Up @@ -95,6 +98,7 @@ where
pub fn new_from(
path: Rc<[usize]>,
logging: Option<Logger>,
summary_logging: Option<SummaryLogger<TInner::Summary>>,
name: &str,
)
-> SubgraphBuilder<TOuter, TInner>
Expand All @@ -113,6 +117,7 @@ where
input_messages: Vec::new(),
output_capabilities: Vec::new(),
logging,
summary_logging,
}
}

Expand All @@ -135,7 +140,7 @@ where
name: child.name().to_owned(),
});
}
self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone()))
self.children.push(PerOperatorState::new(child, index, identifier, self.logging.clone(), &mut self.summary_logging));
}

/// Now that initialization is complete, actually build a subgraph.
Expand Down Expand Up @@ -636,7 +641,8 @@ impl<T: Timestamp> PerOperatorState<T> {
mut scope: Box<dyn Operate<T>>,
index: usize,
identifier: usize,
logging: Option<Logger>
logging: Option<Logger>,
summary_logging: &mut Option<SummaryLogger<T::Summary>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively Option<&mut SummaryLogger<...>>? I can't seem to ever know which one to use where, but here we don't want to change the option... Also, logging mightn't need mutable access iirc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address in a follow-up!

) -> PerOperatorState<T>
{
let local = scope.local();
Expand All @@ -646,6 +652,13 @@ impl<T: Timestamp> PerOperatorState<T> {

let (internal_summary, shared_progress) = scope.get_internal_summary();

if let Some(l) = summary_logging {
l.log(crate::logging::OperatesSummaryEvent {
id: identifier,
summary: internal_summary.clone(),
})
}

assert_eq!(
internal_summary.len(),
inputs,
Expand Down
3 changes: 2 additions & 1 deletion timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,8 @@ impl<A: Allocate> Worker<A> {

let type_name = std::any::type_name::<T>();
let progress_logging = self.logging.borrow_mut().get(&format!("timely/progress/{type_name}"));
let subscope = SubgraphBuilder::new_from(addr, logging.clone(), name);
let summary_logging = self.logging.borrow_mut().get(&format!("timely/summary/{type_name}"));
let subscope = SubgraphBuilder::new_from(addr, logging.clone(), summary_logging, name);
let subscope = RefCell::new(subscope);

let result = {
Expand Down
Loading