Skip to content

Commit b990fab

Browse files
authored
Drop implementation for Tracker (#517)
1 parent b4bce96 commit b990fab

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

timely/src/progress/frontier.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,17 @@ impl<T> MutableAntichain<T> {
528528
.map(|td| td.1)
529529
.sum()
530530
}
531+
532+
/// Reports the updates that form the frontier. Returns an iterator of timestamps and their frequency.
533+
///
534+
/// Rebuilds the internal representation before revealing times and frequencies.
535+
pub fn updates(&mut self) -> impl Iterator<Item=&(T, i64)>
536+
where
537+
T: Clone + PartialOrder + Ord,
538+
{
539+
self.rebuild();
540+
self.updates.iter()
541+
}
531542
}
532543

533544
impl<T> Default for MutableAntichain<T> {

timely/src/progress/reachability.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,4 +897,48 @@ pub mod logging {
897897
impl From<TargetUpdate> for TrackerEvent {
898898
fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) }
899899
}
900-
}
900+
}
901+
902+
// The Drop implementation for `Tracker` makes sure that reachability logging is correct for
903+
// prematurely dropped dataflows. At the moment, this is only possible through `drop_dataflow`,
904+
// because in all other cases the tracker stays alive while it has outstanding work, leaving no
905+
// remaining work for this Drop implementation.
906+
impl<T: Timestamp> Drop for Tracker<T> {
907+
fn drop(&mut self) {
908+
let logger = if let Some(logger) = &mut self.logger {
909+
logger
910+
} else {
911+
// No cleanup necessary when there is no logger.
912+
return;
913+
};
914+
915+
// Retract pending data that `propagate_all` would normally log.
916+
for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
917+
let target_changes = per_operator.targets
918+
.iter_mut()
919+
.enumerate()
920+
.flat_map(|(port, target)| {
921+
target.pointstamps
922+
.updates()
923+
.map(move |(time, diff)| (index, port, time.clone(), -diff))
924+
})
925+
.collect::<Vec<_>>();
926+
if !target_changes.is_empty() {
927+
logger.log_target_updates(Box::new(target_changes));
928+
}
929+
930+
let source_changes = per_operator.sources
931+
.iter_mut()
932+
.enumerate()
933+
.flat_map(|(port, source)| {
934+
source.pointstamps
935+
.updates()
936+
.map(move |(time, diff)| (index, port, time.clone(), -diff))
937+
})
938+
.collect::<Vec<_>>();
939+
if !source_changes.is_empty() {
940+
logger.log_source_updates(Box::new(source_changes));
941+
}
942+
}
943+
}
944+
}

0 commit comments

Comments
 (0)