Skip to content

Commit 6ab96d5

Browse files
committed
Optimize merge sort to do less comparisons
1 parent a5a34e9 commit 6ab96d5

File tree

1 file changed

+37
-18
lines changed

1 file changed

+37
-18
lines changed

rust/datafusion/src/physical_plan/merge_sort.rs

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -473,36 +473,55 @@ fn merge_sort(
473473
));
474474
}
475475
while let Some(Reverse(c)) = candidates.pop() {
476+
let mut len = 1;
477+
if let Some(next) = candidates.peek() {
478+
loop {
479+
if num_result_rows + len == max_batch_rows
480+
|| c.row + len == sort_keys[c.index][0].len()
481+
{
482+
break;
483+
}
484+
assert!(
485+
lexcmp_array_rows(
486+
sort_keys[c.index].iter().map(|a| *a),
487+
c.row + len - 1,
488+
c.row + len
489+
) <= Ordering::Equal,
490+
"unsorted data in merge. row {}. data: {:?}",
491+
c.row + len,
492+
sort_keys[c.index]
493+
.iter()
494+
.map(|a| a.slice(pos[c.index] + len - 1, 2))
495+
);
496+
let k = Key {
497+
values: &sort_keys[c.index],
498+
index: c.index,
499+
row: c.row + len,
500+
};
501+
if k.cmp(&next.0) <= Ordering::Equal {
502+
len += 1;
503+
} else {
504+
break;
505+
}
506+
}
507+
}
476508
for i in 0..num_cols {
477-
result_cols[i].extend(c.index, c.row, c.row + 1);
509+
result_cols[i].extend(c.index, c.row, c.row + len);
478510
}
479-
num_result_rows += 1;
511+
num_result_rows += len;
480512

481513
assert_eq!(pos[c.index], c.row);
482-
pos[c.index] += 1;
514+
pos[c.index] += len;
483515
if num_result_rows == max_batch_rows
484516
|| pos[c.index] == sort_keys[c.index][0].len()
485517
{
486518
break;
487519
}
488-
assert!(
489-
lexcmp_array_rows(
490-
sort_keys[c.index].iter().map(|a| *a),
491-
pos[c.index] - 1,
492-
pos[c.index]
493-
) <= Ordering::Equal,
494-
"unsorted data in merge. row {}. data: {:?}",
495-
pos[c.index],
496-
sort_keys[c.index]
497-
.iter()
498-
.map(|a| a.slice(pos[c.index] - 1, 2))
499-
);
500-
let k = Key {
520+
candidates.push(Reverse(Key {
501521
values: &sort_keys[c.index],
502522
index: c.index,
503523
row: pos[c.index],
504-
};
505-
candidates.push(Reverse(k));
524+
}));
506525
}
507526

508527
let result_cols: Vec<ArrayRef> = result_cols

0 commit comments

Comments
 (0)