Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 66 additions & 53 deletions differential-dataflow/src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::difference::{IsZero, Semigroup};
/// This method will sort `vec` and then consolidate runs of more than one entry with
/// identical first elements by accumulating the second elements of the pairs. Should the final
/// accumulation be zero, the element is discarded.
#[inline]
pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
consolidate_from(vec, 0);
}
Expand All @@ -31,55 +32,61 @@ pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
/// identical first elements by accumulating the second elements of the pairs. Should the final
/// accumulation be zero, the element is discarded.
#[inline]
pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
let length = consolidate_slice(&mut vec[offset..]);
vec.truncate(offset + length);
}

/// Sorts and consolidates a slice, returning the valid prefix length.
#[inline]
pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {

if slice.len() > 1 {
consolidate_slice_slow(slice)
}
else {
slice.iter().filter(|x| !x.1.is_zero()).count()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not the most important thing, but I wonder if there is a way to write this that is more helpful to Rust. Like, rather than iter and hope Rust realizes the slices are either len 0 or len 1, we could get(0) and unwrap_or(0)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see these were inherited from the previous implementation! We can explore in follow-up work, or .. ignore if no improvements!

}
}

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_by(|x,y| x.0.cmp(&y.0));
/// Part of `consolidate_slice` that handles slices of length greater than 1.
fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
Copy link

Copilot AI Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function consolidate_slice_slow should be marked as pub(crate) or remain private. Making it public exposes an implementation detail that users shouldn't call directly.

Copilot uses AI. Check for mistakes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nit but, it's not that it's slow it's just general. There isn't a fast version, there's just a less general version.

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_by(|x,y| x.0.cmp(&y.0));

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
let mut accum = slice[offset].1.clone();
// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
let mut accum = slice[offset].1.clone();

for index in 1 .. slice.len() {
if slice[index].0 == slice[index-1].0 {
accum.plus_equals(&slice[index].1);
}
else {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].1.clone_from(&accum);
offset += 1;
}
accum.clone_from(&slice[index].1);
}
for index in 1 .. slice.len() {
if slice[index].0 == slice[index-1].0 {
accum.plus_equals(&slice[index].1);
}
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].1 = accum;
offset += 1;
else {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].1.clone_from(&accum);
offset += 1;
}
accum.clone_from(&slice[index].1);
}

offset
}
else {
slice.iter().filter(|x| !x.1.is_zero()).count()
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].1 = accum;
offset += 1;
}

offset
}

/// Sorts and consolidates `vec`.
///
/// This method will sort `vec` and then consolidate runs of more than one entry with
/// identical first two elements by accumulating the third elements of the triples. Should the final
/// accumulation be zero, the element is discarded.
#[inline]
pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
consolidate_updates_from(vec, 0);
}
Expand All @@ -89,48 +96,54 @@ pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)
/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
/// identical first two elements by accumulating the third elements of the triples. Should the final
/// accumulation be zero, the element is discarded.
#[inline]
pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
let length = consolidate_updates_slice(&mut vec[offset..]);
vec.truncate(offset + length);
}

/// Sorts and consolidates a slice, returning the valid prefix length.
#[inline]
pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {

if slice.len() > 1 {
consolidate_updates_slice_slow(slice)
}
else {
slice.iter().filter(|x| !x.2.is_zero()).count()
}
}

// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
/// Part of `consolidate_updates_slice` that handles slices of length greater than 1.
fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
// We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
// In a world where there are not many results, we may never even need to call in to merge sort.
slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));

// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
let mut accum = slice[offset].2.clone();
// Counts the number of distinct known-non-zero accumulations. Indexes the write location.
let mut offset = 0;
let mut accum = slice[offset].2.clone();

for index in 1 .. slice.len() {
if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
accum.plus_equals(&slice[index].2);
}
else {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].2.clone_from(&accum);
offset += 1;
}
accum.clone_from(&slice[index].2);
}
for index in 1 .. slice.len() {
if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
accum.plus_equals(&slice[index].2);
}
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].2 = accum;
offset += 1;
else {
if !accum.is_zero() {
slice.swap(offset, index-1);
slice[offset].2.clone_from(&accum);
offset += 1;
}
accum.clone_from(&slice[index].2);
}

offset
}
else {
slice.iter().filter(|x| !x.2.is_zero()).count()
if !accum.is_zero() {
slice.swap(offset, slice.len()-1);
slice[offset].2 = accum;
offset += 1;
}

offset
}


Expand Down
Loading