Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Probe};
use timely::logging::{TimelyEventBuilder, TimelyProgressEventBuilder};
use timely::container::CapacityContainerBuilder;
use timely::progress::reachability::logging::TrackerEventBuilder;

fn main() {
// initializes and runs a timely dataflow.
Expand All @@ -27,19 +28,19 @@ fn main() {
// Register timely progress logging.
// Less generally useful: intended for debugging advanced custom operators or timely
// internals.
worker.log_register().insert::<TimelyProgressEventBuilder,_>("timely/progress", |time, data|
worker.log_register().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
if let Some(data) = data {
data.iter().for_each(|x| {
println!("PROGRESS: {:?}", x);
let (_, ev) = x;
print!("PROGRESS: TYPED MESSAGES: ");
for (n, p, t, d) in ev.messages.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
print!("{:?}, ", (n, p, t, d));
}
println!();
print!("PROGRESS: TYPED INTERNAL: ");
for (n, p, t, d) in ev.internal.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
print!("{:?}, ", (n, p, t, d));
}
println!();
})
Expand All @@ -49,6 +50,17 @@ fn main() {
}
);

worker.log_register().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
if let Some(data) = data {
data.iter().for_each(|x| {
println!("REACHABILITY: {:?}", x);
})
}
else {
println!("REACHABILITY: Flush {time:?}");
}
);

// create a new input, exchange data, and inspect its output
worker.dataflow(|scope| {
scope
Expand Down
9 changes: 6 additions & 3 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ where
/// The log writer for this scope.
pub logging: Option<Logger>,
/// The progress log writer for this scope.
pub progress_logging: Option<ProgressLogger>,
pub progress_logging: Option<ProgressLogger<T>>,
}

impl<G, T> Child<'_, G, T>
Expand Down Expand Up @@ -130,13 +130,16 @@ where
let index = self.subgraph.borrow_mut().allocate_child_id();
let path = self.addr_for_child(index);

let subscope = RefCell::new(SubgraphBuilder::new_from(path, self.logging(), self.progress_logging.clone(), name));
let type_name = std::any::type_name::<T2>();
let progress_logging = self.log_register().get(&format!("timely/progress/{type_name}"));

let subscope = RefCell::new(SubgraphBuilder::new_from(path, self.logging(), name));
let result = {
let mut builder = Child {
subgraph: &subscope,
parent: self.clone(),
logging: self.logging.clone(),
progress_logging: self.progress_logging.clone(),
progress_logging,
};
func(&mut builder)
};
Expand Down
87 changes: 6 additions & 81 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ pub type TimelyEventBuilder = CapacityContainerBuilder<Vec<(Duration, TimelyEven
/// Logger for timely dataflow system events.
pub type TimelyLogger = crate::logging_core::TypedLogger<TimelyEventBuilder, TimelyEvent>;
/// Container builder for timely dataflow progress events.
pub type TimelyProgressEventBuilder = CapacityContainerBuilder<Vec<(Duration, TimelyProgressEvent)>>;
pub type TimelyProgressEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TimelyProgressEvent<T>)>>;
/// Logger for timely dataflow progress events (the "timely/progress" log stream).
pub type TimelyProgressLogger = crate::logging_core::Logger<TimelyProgressEventBuilder>;
pub type TimelyProgressLogger<T> = crate::logging_core::Logger<TimelyProgressEventBuilder<T>>;

use std::time::Duration;
use columnar::Columnar;
Expand Down Expand Up @@ -78,70 +78,9 @@ pub struct ChannelsEvent {
pub target: (usize, usize),
}

/// Encapsulates Any and Debug for dynamically typed timestamps in logs
pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any {
/// Upcasts this `ProgressEventTimestamp` to `Any`.
///
/// NOTE: This is required until <https://github.com/rust-lang/rfcs/issues/2765> is fixed
///
/// # Example
/// ```rust
/// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)];
/// let ts: &dyn timely::logging::ProgressEventTimestampVec = &ts;
/// for (n, p, t, d) in ts.iter() {
/// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d));
/// }
/// println!();
/// ```
fn as_any(&self) -> &dyn std::any::Any;

/// Returns the name of the concrete type of this object.
///
/// # Note
///
/// This is intended for diagnostic use. The exact contents and format of the
/// string returned are not specified, other than being a best-effort
/// description of the type. For example, amongst the strings
/// that `type_name::<Option<String>>()` might return are `"Option<String>"` and
/// `"std::option::Option<std::string::String>"`.
fn type_name(&self) -> &'static str;
}
impl<T: crate::Data + std::fmt::Debug + std::any::Any> ProgressEventTimestamp for T {
fn as_any(&self) -> &dyn std::any::Any { self }

fn type_name(&self) -> &'static str { std::any::type_name::<T>() }
}

/// A vector of progress updates in logs
///
/// This exists to support upcasting of the concrecte progress update vectors to
/// `dyn ProgressEventTimestamp`. Doing so at the vector granularity allows us to
/// use a single allocation for the entire vector (as opposed to a `Box` allocation
/// for each dynamically typed element).
pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any {
/// Iterate over the contents of the vector
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;

/// Clone self into a boxed trait object.
fn box_clone(&self) -> Box<dyn ProgressEventTimestampVec>;
}

impl<T: ProgressEventTimestamp + Clone> ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| {
let t: &dyn ProgressEventTimestamp = t;
(n, p, t, d)
}))
}

fn box_clone(&self) -> Box<dyn ProgressEventTimestampVec> {
Box::new(self.clone())
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
/// Send or receive of progress information.
pub struct TimelyProgressEvent {
pub struct TimelyProgressEvent<T> {
/// `true` if the event is a send, and `false` if it is a receive.
pub is_send: bool,
/// Source worker index.
Expand All @@ -153,23 +92,9 @@ pub struct TimelyProgressEvent {
/// Sequence of nested scope identifiers indicating the path from the root to this instance.
pub addr: Vec<usize>,
/// List of message updates, containing Target descriptor, timestamp as string, and delta.
pub messages: Box<dyn ProgressEventTimestampVec>,
pub messages: Vec<(usize, usize, T, i64)>,
/// List of capability updates, containing Source descriptor, timestamp as string, and delta.
pub internal: Box<dyn ProgressEventTimestampVec>,
}

impl Clone for TimelyProgressEvent {
fn clone(&self) -> Self {
Self {
is_send: self.is_send,
source: self.source,
channel: self.channel,
seq_no: self.seq_no,
addr: self.addr.clone(),
messages: self.messages.box_clone(),
internal: self.internal.box_clone(),
}
}
pub internal: Vec<(usize, usize, T, i64)>,
}

#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
Expand Down
12 changes: 6 additions & 6 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ pub struct Progcaster<T:Timestamp> {
/// Communication channel identifier
channel_identifier: usize,

progress_logging: Option<ProgressLogger>,
progress_logging: Option<ProgressLogger<T>>,
}

impl<T:Timestamp+Send> Progcaster<T> {
/// Creates a new `Progcaster` using a channel from the supplied worker.
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger>) -> Progcaster<T> {
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger<T>>) -> Progcaster<T> {

let channel_identifier = worker.new_identifier();
let (pushers, puller) = worker.allocate(channel_identifier, addr.clone());
Expand Down Expand Up @@ -65,8 +65,8 @@ impl<T:Timestamp+Send> Progcaster<T> {
// Pre-allocate enough space; we transfer ownership, so there is not
// an opportunity to re-use allocations (w/o changing the logging
// interface to accept references).
let mut messages = Box::new(Vec::with_capacity(changes.len()));
let mut internal = Box::new(Vec::with_capacity(changes.len()));
let mut messages = Vec::with_capacity(changes.len());
let mut internal = Vec::with_capacity(changes.len());

for ((location, time), diff) in changes.iter() {
match location.port {
Expand Down Expand Up @@ -134,8 +134,8 @@ impl<T:Timestamp+Send> Progcaster<T> {
// options for improving it if performance limits users who want other logging.
self.progress_logging.as_ref().map(|l| {

let mut messages = Box::new(Vec::with_capacity(changes.len()));
let mut internal = Box::new(Vec::with_capacity(changes.len()));
let mut messages = Vec::with_capacity(changes.len());
let mut internal = Vec::with_capacity(changes.len());
Comment on lines +137 to +138
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we should be using recv_changes.len() here, because changes doesn't have any relation to what we're receiving. But, then we'd over-allocate :/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, could you explain that more? I agree that recv_changes.len() looks more appropriate, but what is the downside (seems like we fill these up with exactly that many elements, and then ship them in the TimelyProgressEvent)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because we have to demux the two types of events, probably?


for ((location, time), diff) in recv_changes.iter() {

Expand Down
73 changes: 27 additions & 46 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl<T: Timestamp> Builder<T> {
/// default summaries (a serious liveness issue).
///
/// The optional logger information is baked into the resulting tracker.
pub fn build(self, logger: Option<logging::TrackerLogger>) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {
pub fn build(self, logger: Option<logging::TrackerLogger<T>>) -> (Tracker<T>, Vec<Vec<Antichain<T::Summary>>>) {

if !self.is_acyclic() {
println!("Cycle detected without timestamp increment");
Expand Down Expand Up @@ -404,7 +404,7 @@ pub struct Tracker<T:Timestamp> {
total_counts: i64,

/// Optionally, a unique logging identifier and logging for tracking events.
logger: Option<logging::TrackerLogger>,
logger: Option<logging::TrackerLogger<T>>,
}

/// Target and source information for each operator.
Expand Down Expand Up @@ -503,7 +503,7 @@ impl<T:Timestamp> Tracker<T> {
/// output port.
///
/// If the optional logger is provided, it will be used to log various tracker events.
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Vec<Vec<Antichain<T::Summary>>>) {

// Allocate buffer space for each input and input port.
let mut per_operator =
Expand Down Expand Up @@ -577,7 +577,7 @@ impl<T:Timestamp> Tracker<T> {
.collect::<Vec<_>>();

if !target_changes.is_empty() {
logger.log_target_updates(Box::new(target_changes));
logger.log_target_updates(target_changes);
}

let source_changes =
Expand All @@ -587,7 +587,7 @@ impl<T:Timestamp> Tracker<T> {
.collect::<Vec<_>>();

if !source_changes.is_empty() {
logger.log_source_updates(Box::new(source_changes));
logger.log_source_updates(source_changes);
}
}

Expand Down Expand Up @@ -837,25 +837,24 @@ pub mod logging {
use timely_container::CapacityContainerBuilder;
use timely_logging::TypedLogger;
use crate::logging_core::Logger;
use crate::logging::ProgressEventTimestampVec;

/// A container builder for tracker events.
pub type TrackerEventBuilder = CapacityContainerBuilder<Vec<(Duration, TrackerEvent)>>;
pub type TrackerEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TrackerEvent<T>)>>;

/// A logger with additional identifying information about the tracker.
pub struct TrackerLogger {
pub struct TrackerLogger<T: Clone + 'static> {
path: Rc<[usize]>,
logger: TypedLogger<TrackerEventBuilder, TrackerEvent>,
logger: TypedLogger<TrackerEventBuilder<T>, TrackerEvent<T>>,
}

impl TrackerLogger {
impl<T: Clone + 'static> TrackerLogger<T> {
/// Create a new tracker logger from its fields.
pub fn new(path: Rc<[usize]>, logger: Logger<TrackerEventBuilder>) -> Self {
pub fn new(path: Rc<[usize]>, logger: Logger<TrackerEventBuilder<T>>) -> Self {
Self { path, logger: logger.into() }
}

/// Log source update events with additional identifying information.
pub fn log_source_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
pub fn log_source_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) {
self.logger.log({
SourceUpdate {
tracker_id: self.path.to_vec(),
Expand All @@ -864,7 +863,7 @@ pub mod logging {
})
}
/// Log target update events with additional identifying information.
pub fn log_target_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
pub fn log_target_updates(&mut self, updates: Vec<(usize, usize, T, i64)>) {
self.logger.log({
TargetUpdate {
tracker_id: self.path.to_vec(),
Expand All @@ -876,55 +875,37 @@ pub mod logging {

/// Events that the tracker may record.
#[derive(Debug, Clone)]
pub enum TrackerEvent {
pub enum TrackerEvent<T> {
/// Updates made at a source of data.
SourceUpdate(SourceUpdate),
SourceUpdate(SourceUpdate<T>),
/// Updates made at a target of data.
TargetUpdate(TargetUpdate),
TargetUpdate(TargetUpdate<T>),
}

/// An update made at a source of data.
#[derive(Debug)]
pub struct SourceUpdate {
#[derive(Debug, Clone)]
pub struct SourceUpdate<T> {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

impl Clone for SourceUpdate {
fn clone(&self) -> Self {
Self {
tracker_id: self.tracker_id.clone(),
updates: self.updates.box_clone(),
}
}
pub updates: Vec<(usize, usize, T, i64)>,
}

/// An update made at a target of data.
#[derive(Debug)]
pub struct TargetUpdate {
#[derive(Debug, Clone)]
pub struct TargetUpdate<T> {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

impl Clone for TargetUpdate {
fn clone(&self) -> Self {
Self {
tracker_id: self.tracker_id.clone(),
updates: self.updates.box_clone(),
}
}
pub updates: Vec<(usize, usize, T, i64)>,
}

impl From<SourceUpdate> for TrackerEvent {
fn from(v: SourceUpdate) -> TrackerEvent { TrackerEvent::SourceUpdate(v) }
impl<T> From<SourceUpdate<T>> for TrackerEvent<T> {
fn from(v: SourceUpdate<T>) -> TrackerEvent<T> { TrackerEvent::SourceUpdate(v) }
}

impl From<TargetUpdate> for TrackerEvent {
fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) }
impl<T> From<TargetUpdate<T>> for TrackerEvent<T> {
fn from(v: TargetUpdate<T>) -> TrackerEvent<T> { TrackerEvent::TargetUpdate(v) }
}
}

Expand Down Expand Up @@ -953,7 +934,7 @@ impl<T: Timestamp> Drop for Tracker<T> {
})
.collect::<Vec<_>>();
if !target_changes.is_empty() {
logger.log_target_updates(Box::new(target_changes));
logger.log_target_updates(target_changes);
}

let source_changes = per_operator.sources
Expand All @@ -966,7 +947,7 @@ impl<T: Timestamp> Drop for Tracker<T> {
})
.collect::<Vec<_>>();
if !source_changes.is_empty() {
logger.log_source_updates(Box::new(source_changes));
logger.log_source_updates(source_changes);
}
}
}
Expand Down
Loading
Loading