From 9e959f88db203b353c8ad07a8603efaf37ccb077 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 7 Dec 2024 21:03:14 -0500 Subject: [PATCH 1/2] Make consolidate_into a trait method --- src/consolidation.rs | 98 +++++++++++++++------------- src/trace/implementations/chunker.rs | 6 +- 2 files changed, 55 insertions(+), 49 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index a361e7e45..ce57512ec 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -255,6 +255,49 @@ pub trait ConsolidateLayout: Container { /// Compare two items by key to sort containers. fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering; + + /// Consolidate the supplied container. + fn consolidate_into(&mut self, target: &mut Self) { + // Sort input data + let mut permutation = Vec::with_capacity(self.len()); + permutation.extend(self.drain()); + permutation.sort_by(|a, b| Self::cmp(a, b)); + + // Consolidate sorted data. + let mut previous: Option<(Self::Key<'_>, Self::DiffOwned)> = None; + // TODO: We should ensure that `target` has sufficient capacity, but `Container` doesn't + // offer a suitable API. + for item in permutation.drain(..) { + let (key, diff) = Self::into_parts(item); + match &mut previous { + // Initial iteration, remember key and diff. + // TODO: Opportunity for GatCow for diff. + None => previous = Some((key, diff.into_owned())), + Some((prevkey, d)) => { + // Second and following iteration, compare and accumulate or emit. + if key == *prevkey { + // Keys match, keep accumulating. + d.plus_equals(&diff); + } else { + // Keys don't match, write down result if non-zero. + if !d.is_zero() { + // Unwrap because we checked for `Some` above. + let (prevkey, diff) = previous.take().unwrap(); + target.push_with_diff(prevkey, diff); + } + // Remember current key and diff as `previous` + previous = Some((key, diff.into_owned())); + } + } + } + } + // Write any residual data, if non-zero. + if let Some((previtem, d)) = previous { + if !d.is_zero() { + target.push_with_diff(previtem, d); + } + } + } } impl ConsolidateLayout for Vec<(D, T, R)> @@ -278,6 +321,12 @@ where fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) { self.push((data, time, diff)); } + + /// Consolidate the supplied container. + fn consolidate_into(&mut self, target: &mut Self) { + consolidate_updates(self); + std::mem::swap(self, target); + } } impl ConsolidateLayout for FlatStack, T, R>> @@ -308,49 +357,6 @@ where } } -/// Consolidate the supplied container. -pub fn consolidate_container(container: &mut C, target: &mut C) { - // Sort input data - let mut permutation = Vec::with_capacity(container.len()); - permutation.extend(container.drain()); - permutation.sort_by(|a, b| C::cmp(a, b)); - - // Consolidate sorted data. - let mut previous: Option<(C::Key<'_>, C::DiffOwned)> = None; - // TODO: We should ensure that `target` has sufficient capacity, but `Container` doesn't - // offer a suitable API. - for item in permutation.drain(..) { - let (key, diff) = C::into_parts(item); - match &mut previous { - // Initial iteration, remember key and diff. - // TODO: Opportunity for GatCow for diff. - None => previous = Some((key, diff.into_owned())), - Some((prevkey, d)) => { - // Second and following iteration, compare and accumulate or emit. - if key == *prevkey { - // Keys match, keep accumulating. - d.plus_equals(&diff); - } else { - // Keys don't match, write down result if non-zero. - if !d.is_zero() { - // Unwrap because we checked for `Some` above. - let (prevkey, diff) = previous.take().unwrap(); - target.push_with_diff(prevkey, diff); - } - // Remember current key and diff as `previous` - previous = Some((key, diff.into_owned())); - } - } - } - } - // Write any residual data, if non-zero. - if let Some((previtem, d)) = previous { - if !d.is_zero() { - target.push_with_diff(previtem, d); - } - } -} - #[cfg(test)] @@ -445,11 +451,11 @@ mod tests { } #[test] - fn test_consolidate_container() { + fn test_consolidate_into() { let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)]; let mut target = Vec::default(); data.sort(); - consolidate_container(&mut data, &mut target); + data.consolidate_into(&mut target); assert_eq!(target, [(2, 1, 1)]); } @@ -477,7 +483,7 @@ mod tests { data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize)))); data.sort_by(|x,y| x.0.cmp(&y.0)); let start = std::time::Instant::now(); - consolidate_container(&mut data, &mut target); + data.consolidate_into(&mut target); duration += start.elapsed(); consolidate_updates(&mut data2); diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 0b76e9012..98a8a81ac 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -4,7 +4,7 @@ use std::collections::VecDeque; use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; -use crate::consolidation::{consolidate_updates, consolidate_container, ConsolidateLayout}; +use crate::consolidation::{consolidate_updates, ConsolidateLayout}; use crate::difference::Semigroup; /// Chunk a stream of vectors into chains of vectors. @@ -269,7 +269,7 @@ where self.pending.push(item); if self.pending.at_capacity() { let starting_len = self.pending.len(); - consolidate_container(&mut self.pending, &mut self.empty); + self.pending.consolidate_into(&mut self.empty); std::mem::swap(&mut self.pending, &mut self.empty); self.empty.clear(); if self.pending.len() > starting_len / 2 { @@ -300,7 +300,7 @@ where fn finish(&mut self) -> Option<&mut Self::Container> { if !self.pending.is_empty() { - consolidate_container(&mut self.pending, &mut self.empty); + self.pending.consolidate_into(&mut self.empty); std::mem::swap(&mut self.pending, &mut self.empty); self.empty.clear(); if !self.pending.is_empty() { From 00c20832ffda8e0421753c5139f52a22a8569801 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 7 Dec 2024 21:14:12 -0500 Subject: [PATCH 2/2] Streaming consolidate_into --- src/consolidation.rs | 50 +++++++++++++++++++------------------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/src/consolidation.rs b/src/consolidation.rs index ce57512ec..812207552 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -263,38 +263,30 @@ pub trait ConsolidateLayout: Container { permutation.extend(self.drain()); permutation.sort_by(|a, b| Self::cmp(a, b)); - // Consolidate sorted data. - let mut previous: Option<(Self::Key<'_>, Self::DiffOwned)> = None; - // TODO: We should ensure that `target` has sufficient capacity, but `Container` doesn't - // offer a suitable API. - for item in permutation.drain(..) { - let (key, diff) = Self::into_parts(item); - match &mut previous { - // Initial iteration, remember key and diff. - // TODO: Opportunity for GatCow for diff. - None => previous = Some((key, diff.into_owned())), - Some((prevkey, d)) => { - // Second and following iteration, compare and accumulate or emit. - if key == *prevkey { - // Keys match, keep accumulating. - d.plus_equals(&diff); - } else { - // Keys don't match, write down result if non-zero. - if !d.is_zero() { - // Unwrap because we checked for `Some` above. - let (prevkey, diff) = previous.take().unwrap(); - target.push_with_diff(prevkey, diff); - } - // Remember current key and diff as `previous` - previous = Some((key, diff.into_owned())); + // Iterate over the data, accumulating diffs for like keys. + let mut iter = permutation.drain(..); + if let Some(item) = iter.next() { + + let (k, d) = Self::into_parts(item); + let mut prev_key = k; + let mut prev_diff = d.into_owned(); + + for item in iter { + let (next_key, next_diff) = Self::into_parts(item); + if next_key == prev_key { + prev_diff.plus_equals(&next_diff); + } + else { + if !prev_diff.is_zero() { + target.push_with_diff(prev_key, prev_diff); } + prev_key = next_key; + prev_diff = next_diff.into_owned(); } } - } - // Write any residual data, if non-zero. - if let Some((previtem, d)) = previous { - if !d.is_zero() { - target.push_with_diff(previtem, d); + + if !prev_diff.is_zero() { + target.push_with_diff(prev_key, prev_diff); } } }