Skip to content

Commit 8c5027b

Browse files
committed
Retire target and source changes in bulk
If we have many updates per source or target, `propagate_all` will update an operator's pointstamps as many times as there are updates. Updating the mutable antichain behind pointstamps can be expensive. This change extracts the updates from the target and source changes, and applies them in bulk on the pointstamps in the hope of reducing the cost of maintaining the antichain. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent c06a057 commit 8c5027b

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

timely/src/progress/change_batch.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,24 @@ impl<T, const X: usize> ChangeBatch<T, X> {
5656
}
5757
}
5858

59+
/// Constructs a `ChangeBatch` from updates.
60+
///
61+
/// # Examples
62+
///
63+
///```
64+
/// use timely::progress::ChangeBatch;
65+
///
66+
/// let updates = [(5, 1), (5, -1)].into();
67+
/// let mut batch = ChangeBatch::from_updates(updates);
68+
/// assert!(batch.is_empty());
69+
///```
70+
pub fn from_updates(updates: SmallVec<[(T, i64); X]>) -> Self {
71+
Self {
72+
updates,
73+
clean: 0,
74+
}
75+
}
76+
5977
/// Returns `true` if the change batch is not guaranteed compact.
6078
pub fn is_dirty(&self) -> bool {
6179
self.updates.len() > self.clean

timely/src/progress/reachability.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -591,10 +591,16 @@ impl<T:Timestamp> Tracker<T> {
591591
// By filtering the changes through `self.pointstamps` we react only to discrete
592592
// changes in the frontier, rather than changes in the pointstamp counts that
593593
// witness that frontier.
594-
for ((target, time), diff) in self.target_changes.drain() {
594+
595+
// For both target and source changes, we try to retire updates per target or source in
596+
// bulk to avoid repeatedly updating the operator's mutable antichain.
597+
let mut target_changes = std::mem::take(&mut self.target_changes).into_inner();
598+
while !target_changes.is_empty() {
599+
let target = target_changes[0].0.0;
600+
let update_count = target_changes.iter().position(|((t,_),_)| *t != target).unwrap_or(target_changes.len());
595601

596602
let operator = &mut self.per_operator[target.node].targets[target.port];
597-
let changes = operator.pointstamps.update_iter(Some((time, diff)));
603+
let changes = operator.pointstamps.update_iter(target_changes.drain(..update_count).map(|((_,time),diff)| (time, diff)));
598604

599605
for (time, diff) in changes {
600606
self.total_counts += diff;
@@ -609,11 +615,17 @@ impl<T:Timestamp> Tracker<T> {
609615
self.worklist.push(Reverse((time, Location::from(target), diff)));
610616
}
611617
}
618+
// Recycle `target_changes` allocation.
619+
debug_assert!(target_changes.is_empty());
620+
self.target_changes = ChangeBatch::from_updates(target_changes);
612621

613-
for ((source, time), diff) in self.source_changes.drain() {
622+
let mut source_changes = std::mem::take(&mut self.source_changes).into_inner();
623+
while !source_changes.is_empty() {
624+
let source = source_changes[0].0.0;
625+
let update_count = source_changes.iter().position(|((s,_),_)| *s != source).unwrap_or(source_changes.len());
614626

615627
let operator = &mut self.per_operator[source.node].sources[source.port];
616-
let changes = operator.pointstamps.update_iter(Some((time, diff)));
628+
let changes = operator.pointstamps.update_iter(source_changes.drain(..update_count).map(|((_,time),diff)| (time, diff)));
617629

618630
for (time, diff) in changes {
619631
self.total_counts += diff;
@@ -628,6 +640,9 @@ impl<T:Timestamp> Tracker<T> {
628640
self.worklist.push(Reverse((time, Location::from(source), diff)));
629641
}
630642
}
643+
// Recycle `source_changes` allocation.
644+
debug_assert!(source_changes.is_empty());
645+
self.source_changes = ChangeBatch::from_updates(source_changes);
631646

632647
// Step 2: Circulate implications of changes to `self.pointstamps`.
633648
//

0 commit comments

Comments
 (0)