diff --git a/differential-dataflow/src/consolidation.rs b/differential-dataflow/src/consolidation.rs index 292f1a63c..7ed931ee5 100644 --- a/differential-dataflow/src/consolidation.rs +++ b/differential-dataflow/src/consolidation.rs @@ -22,6 +22,7 @@ use crate::difference::{IsZero, Semigroup}; /// This method will sort `vec` and then consolidate runs of more than one entry with /// identical first elements by accumulating the second elements of the pairs. Should the final /// accumulation be zero, the element is discarded. +#[inline] pub fn consolidate(vec: &mut Vec<(T, R)>) { consolidate_from(vec, 0); } @@ -31,48 +32,53 @@ pub fn consolidate(vec: &mut Vec<(T, R)>) { /// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with /// identical first elements by accumulating the second elements of the pairs. Should the final /// accumulation be zero, the element is discarded. +#[inline] pub fn consolidate_from(vec: &mut Vec<(T, R)>, offset: usize) { let length = consolidate_slice(&mut vec[offset..]); vec.truncate(offset + length); } /// Sorts and consolidates a slice, returning the valid prefix length. +#[inline] pub fn consolidate_slice(slice: &mut [(T, R)]) -> usize { - if slice.len() > 1 { + consolidate_slice_slow(slice) + } + else { + slice.iter().filter(|x| !x.1.is_zero()).count() + } +} - // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs. - // In a world where there are not many results, we may never even need to call in to merge sort. - slice.sort_by(|x,y| x.0.cmp(&y.0)); +/// Part of `consolidate_slice` that handles slices of length greater than 1. +fn consolidate_slice_slow(slice: &mut [(T, R)]) -> usize { + // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs. + // In a world where there are not many results, we may never even need to call in to merge sort. + slice.sort_by(|x,y| x.0.cmp(&y.0)); - // Counts the number of distinct known-non-zero accumulations. Indexes the write location. - let mut offset = 0; - let mut accum = slice[offset].1.clone(); + // Counts the number of distinct known-non-zero accumulations. Indexes the write location. + let mut offset = 0; + let mut accum = slice[offset].1.clone(); - for index in 1 .. slice.len() { - if slice[index].0 == slice[index-1].0 { - accum.plus_equals(&slice[index].1); - } - else { - if !accum.is_zero() { - slice.swap(offset, index-1); - slice[offset].1.clone_from(&accum); - offset += 1; - } - accum.clone_from(&slice[index].1); - } + for index in 1 .. slice.len() { + if slice[index].0 == slice[index-1].0 { + accum.plus_equals(&slice[index].1); } - if !accum.is_zero() { - slice.swap(offset, slice.len()-1); - slice[offset].1 = accum; - offset += 1; + else { + if !accum.is_zero() { + slice.swap(offset, index-1); + slice[offset].1.clone_from(&accum); + offset += 1; + } + accum.clone_from(&slice[index].1); } - - offset } - else { - slice.iter().filter(|x| !x.1.is_zero()).count() + if !accum.is_zero() { + slice.swap(offset, slice.len()-1); + slice[offset].1 = accum; + offset += 1; } + + offset } /// Sorts and consolidates `vec`. @@ -80,6 +86,7 @@ pub fn consolidate_slice(slice: &mut [(T, R)]) -> usize { /// This method will sort `vec` and then consolidate runs of more than one entry with /// identical first two elements by accumulating the third elements of the triples. Should the final /// accumulation be zero, the element is discarded. +#[inline] pub fn consolidate_updates(vec: &mut Vec<(D, T, R)>) { consolidate_updates_from(vec, 0); } @@ -89,48 +96,54 @@ pub fn consolidate_updates(vec: &mut Vec<(D, T, R) /// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with /// identical first two elements by accumulating the third elements of the triples. Should the final /// accumulation be zero, the element is discarded. +#[inline] pub fn consolidate_updates_from(vec: &mut Vec<(D, T, R)>, offset: usize) { let length = consolidate_updates_slice(&mut vec[offset..]); vec.truncate(offset + length); } /// Sorts and consolidates a slice, returning the valid prefix length. +#[inline] pub fn consolidate_updates_slice(slice: &mut [(D, T, R)]) -> usize { if slice.len() > 1 { + consolidate_updates_slice_slow(slice) + } + else { + slice.iter().filter(|x| !x.2.is_zero()).count() + } +} - // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs. - // In a world where there are not many results, we may never even need to call in to merge sort. - slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1))); +/// Part of `consolidate_updates_slice` that handles slices of length greater than 1. +fn consolidate_updates_slice_slow(slice: &mut [(D, T, R)]) -> usize { + // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs. + // In a world where there are not many results, we may never even need to call in to merge sort. + slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1))); - // Counts the number of distinct known-non-zero accumulations. Indexes the write location. - let mut offset = 0; - let mut accum = slice[offset].2.clone(); + // Counts the number of distinct known-non-zero accumulations. Indexes the write location. + let mut offset = 0; + let mut accum = slice[offset].2.clone(); - for index in 1 .. slice.len() { - if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) { - accum.plus_equals(&slice[index].2); - } - else { - if !accum.is_zero() { - slice.swap(offset, index-1); - slice[offset].2.clone_from(&accum); - offset += 1; - } - accum.clone_from(&slice[index].2); - } + for index in 1 .. slice.len() { + if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) { + accum.plus_equals(&slice[index].2); } - if !accum.is_zero() { - slice.swap(offset, slice.len()-1); - slice[offset].2 = accum; - offset += 1; + else { + if !accum.is_zero() { + slice.swap(offset, index-1); + slice[offset].2.clone_from(&accum); + offset += 1; + } + accum.clone_from(&slice[index].2); } - - offset } - else { - slice.iter().filter(|x| !x.2.is_zero()).count() + if !accum.is_zero() { + slice.swap(offset, slice.len()-1); + slice[offset].2 = accum; + offset += 1; } + + offset }