diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 20f5406d9..149b32a9f 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -27,6 +27,7 @@ columnation = "0.1" getopts = { version = "0.2.21", optional = true } bincode = { version = "1.0" } byteorder = "1.5" +itertools = "0.14.0" serde = { version = "1.0", features = ["derive"] } timely_bytes = { path = "../bytes", version = "0.22" } timely_logging = { path = "../logging", version = "0.22" } diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 921886c53..b5909b23a 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -591,10 +591,14 @@ impl Tracker { // By filtering the changes through `self.pointstamps` we react only to discrete // changes in the frontier, rather than changes in the pointstamp counts that // witness that frontier. - for ((target, time), diff) in self.target_changes.drain() { + use itertools::Itertools; + let mut target_changes = self.target_changes.drain().peekable(); + while let Some(((target, _), _)) = target_changes.peek() { + let target = *target; let operator = &mut self.per_operator[target.node].targets[target.port]; - let changes = operator.pointstamps.update_iter(Some((time, diff))); + let target_updates = target_changes.peeking_take_while(|((t, _),_)| t == &target).map(|((_,time),diff)| (time,diff)); + let changes = operator.pointstamps.update_iter(target_updates); for (time, diff) in changes { self.total_counts += diff; @@ -610,10 +614,13 @@ impl Tracker { } } - for ((source, time), diff) in self.source_changes.drain() { + let mut source_changes = self.source_changes.drain().peekable(); + while let Some(((source, _), _)) = source_changes.peek() { + let source = *source; let operator = &mut self.per_operator[source.node].sources[source.port]; - let changes = operator.pointstamps.update_iter(Some((time, diff))); + let source_updates = source_changes.peeking_take_while(|((s, _),_)| s == &source).map(|((_,time),diff)| (time,diff)); + let changes = operator.pointstamps.update_iter(source_updates); for (time, diff) in changes { self.total_counts += diff; @@ -761,7 +768,7 @@ fn summarize_outputs( } } } - + let mut results: HashMap> = HashMap::new(); let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();