diff --git a/src/consolidation.rs b/src/consolidation.rs index a361e7e45..812207552 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -255,6 +255,41 @@ pub trait ConsolidateLayout: Container { /// Compare two items by key to sort containers. fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering; + + /// Consolidate the supplied container. + fn consolidate_into(&mut self, target: &mut Self) { + // Sort input data + let mut permutation = Vec::with_capacity(self.len()); + permutation.extend(self.drain()); + permutation.sort_by(|a, b| Self::cmp(a, b)); + + // Iterate over the data, accumulating diffs for like keys. + let mut iter = permutation.drain(..); + if let Some(item) = iter.next() { + + let (k, d) = Self::into_parts(item); + let mut prev_key = k; + let mut prev_diff = d.into_owned(); + + for item in iter { + let (next_key, next_diff) = Self::into_parts(item); + if next_key == prev_key { + prev_diff.plus_equals(&next_diff); + } + else { + if !prev_diff.is_zero() { + target.push_with_diff(prev_key, prev_diff); + } + prev_key = next_key; + prev_diff = next_diff.into_owned(); + } + } + + if !prev_diff.is_zero() { + target.push_with_diff(prev_key, prev_diff); + } + } + } } impl ConsolidateLayout for Vec<(D, T, R)> @@ -278,6 +313,12 @@ where fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) { self.push((data, time, diff)); } + + /// Consolidate the supplied container. + fn consolidate_into(&mut self, target: &mut Self) { + consolidate_updates(self); + std::mem::swap(self, target); + } } impl ConsolidateLayout for FlatStack, T, R>> @@ -308,49 +349,6 @@ where } } -/// Consolidate the supplied container. -pub fn consolidate_container(container: &mut C, target: &mut C) { - // Sort input data - let mut permutation = Vec::with_capacity(container.len()); - permutation.extend(container.drain()); - permutation.sort_by(|a, b| C::cmp(a, b)); - - // Consolidate sorted data. - let mut previous: Option<(C::Key<'_>, C::DiffOwned)> = None; - // TODO: We should ensure that `target` has sufficient capacity, but `Container` doesn't - // offer a suitable API. - for item in permutation.drain(..) { - let (key, diff) = C::into_parts(item); - match &mut previous { - // Initial iteration, remember key and diff. - // TODO: Opportunity for GatCow for diff. - None => previous = Some((key, diff.into_owned())), - Some((prevkey, d)) => { - // Second and following iteration, compare and accumulate or emit. - if key == *prevkey { - // Keys match, keep accumulating. - d.plus_equals(&diff); - } else { - // Keys don't match, write down result if non-zero. - if !d.is_zero() { - // Unwrap because we checked for `Some` above. - let (prevkey, diff) = previous.take().unwrap(); - target.push_with_diff(prevkey, diff); - } - // Remember current key and diff as `previous` - previous = Some((key, diff.into_owned())); - } - } - } - } - // Write any residual data, if non-zero. - if let Some((previtem, d)) = previous { - if !d.is_zero() { - target.push_with_diff(previtem, d); - } - } -} - #[cfg(test)] @@ -445,11 +443,11 @@ mod tests { } #[test] - fn test_consolidate_container() { + fn test_consolidate_into() { let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)]; let mut target = Vec::default(); data.sort(); - consolidate_container(&mut data, &mut target); + data.consolidate_into(&mut target); assert_eq!(target, [(2, 1, 1)]); } @@ -477,7 +475,7 @@ mod tests { data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize)))); data.sort_by(|x,y| x.0.cmp(&y.0)); let start = std::time::Instant::now(); - consolidate_container(&mut data, &mut target); + data.consolidate_into(&mut target); duration += start.elapsed(); consolidate_updates(&mut data2); diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 0b76e9012..98a8a81ac 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -4,7 +4,7 @@ use std::collections::VecDeque; use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; -use crate::consolidation::{consolidate_updates, consolidate_container, ConsolidateLayout}; +use crate::consolidation::{consolidate_updates, ConsolidateLayout}; use crate::difference::Semigroup; /// Chunk a stream of vectors into chains of vectors. @@ -269,7 +269,7 @@ where self.pending.push(item); if self.pending.at_capacity() { let starting_len = self.pending.len(); - consolidate_container(&mut self.pending, &mut self.empty); + self.pending.consolidate_into(&mut self.empty); std::mem::swap(&mut self.pending, &mut self.empty); self.empty.clear(); if self.pending.len() > starting_len / 2 { @@ -300,7 +300,7 @@ where fn finish(&mut self) -> Option<&mut Self::Container> { if !self.pending.is_empty() { - consolidate_container(&mut self.pending, &mut self.empty); + self.pending.consolidate_into(&mut self.empty); std::mem::swap(&mut self.pending, &mut self.empty); self.empty.clear(); if !self.pending.is_empty() {