diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 0356ddf41..4ac2acf30 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -270,14 +270,14 @@ pub mod val_batch { fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { // An (incomplete) indication of the amount of work we've done so far. - let starting_updates = self.result.times.len(); + let starting_updates = self.result.times.len() + self.singletons; let mut effort = 0isize; // While both mergees are still active, perform single-key merges. while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { self.merge_key(&source1.storage, &source2.storage); // An (incomplete) accounting of the work we've done. - effort = (self.result.times.len() - starting_updates) as isize; + effort = (self.result.times.len() + self.singletons - starting_updates) as isize; } // Merging is complete, and only copying remains. @@ -285,12 +285,12 @@ pub mod val_batch { while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel { self.copy_key(&source1.storage, self.key_cursor1); self.key_cursor1 += 1; - effort = (self.result.times.len() - starting_updates) as isize; + effort = (self.result.times.len() + self.singletons - starting_updates) as isize; } while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { self.copy_key(&source2.storage, self.key_cursor2); self.key_cursor2 += 1; - effort = (self.result.times.len() - starting_updates) as isize; + effort = (self.result.times.len() + self.singletons - starting_updates) as isize; } *fuel -= effort; @@ -864,14 +864,14 @@ pub mod key_batch { fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { // An (incomplete) indication of the amount of work we've done so far. - let starting_updates = self.result.times.len(); + let starting_updates = self.result.times.len() + self.singletons; let mut effort = 0isize; // While both mergees are still active, perform single-key merges. while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { self.merge_key(&source1.storage, &source2.storage); // An (incomplete) accounting of the work we've done. - effort = (self.result.times.len() - starting_updates) as isize; + effort = (self.result.times.len() + self.singletons - starting_updates) as isize; } // Merging is complete, and only copying remains. @@ -879,12 +879,12 @@ pub mod key_batch { while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel { self.copy_key(&source1.storage, self.key_cursor1); self.key_cursor1 += 1; - effort = (self.result.times.len() - starting_updates) as isize; + effort = (self.result.times.len() + self.singletons - starting_updates) as isize; } while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { self.copy_key(&source2.storage, self.key_cursor2); self.key_cursor2 += 1; - effort = (self.result.times.len() - starting_updates) as isize; + effort = (self.result.times.len() + self.singletons - starting_updates) as isize; } *fuel -= effort;