Skip to content

Commit 8091cc1

Browse files
committed
Re-implement merge operator
Aim to minimize memory allocations. This produces a big performance win, especially when merging multiple streams.
1 parent e760fb6 commit 8091cc1

File tree

3 files changed

+323
-310
lines changed

3 files changed

+323
-310
lines changed

rust/arrow/src/compute/kernels/merge.rs

Lines changed: 2 additions & 239 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ use crate::error::{ArrowError, Result};
2525

2626
use crate::compute::{total_cmp_32, total_cmp_64};
2727
use core::fmt;
28-
use std::cmp::{Ordering, Reverse};
29-
use std::collections::BinaryHeap;
28+
use std::cmp::Ordering;
3029
use std::fmt::{Debug, Display, Formatter};
3130

3231
type JoinCursorAndIndices = (usize, bool, Arc<UInt32Array>);
@@ -169,86 +168,6 @@ pub fn merge_join_indices<'a>(
169168
))
170169
}
171170

172-
pub fn merge_sort_indices(
173-
arrays: Vec<&[ArrayRef]>,
174-
cursors: Vec<usize>,
175-
last_array: Vec<bool>,
176-
) -> Result<Vec<CursorAndIndices>> {
177-
let arrays_count = cursors.len();
178-
179-
if arrays_count != cursors.len() {
180-
return Err(ArrowError::InvalidArgumentError(format!(
181-
"Arrays doesn't match cursors len: {} != {}",
182-
arrays_count,
183-
cursors.len()
184-
)));
185-
}
186-
187-
let comparators = comparators_for(arrays)?;
188-
189-
let mut indices = Vec::new();
190-
191-
for _ in 0..cursors.len() {
192-
indices.push(Vec::<Option<u32>>::new());
193-
}
194-
195-
let (merge_cursors, mut finished_cursors) = cursors
196-
.into_iter()
197-
.enumerate()
198-
.map(|(array_index, row_index)| {
199-
MergeRowCursor::new(&comparators, array_index, row_index)
200-
})
201-
.partition::<Vec<_>, _>(|c| c.within_range());
202-
203-
let mut merge_cursors = merge_cursors
204-
.into_iter()
205-
.map(Reverse)
206-
.collect::<BinaryHeap<Reverse<MergeRowCursor>>>();
207-
208-
while merge_cursors.iter().all(|Reverse(c)| c.within_range())
209-
&& !merge_cursors.is_empty()
210-
{
211-
if let Some(Reverse(current)) = merge_cursors.pop() {
212-
current.check_consistency()?;
213-
for (array_index, indices_array) in indices.iter_mut().enumerate() {
214-
indices_array.push(if current.array_index == array_index {
215-
Some(current.row_index as u32)
216-
} else {
217-
None
218-
})
219-
}
220-
let next = current.next();
221-
let within_range = next.within_range();
222-
if within_range {
223-
merge_cursors.push(Reverse(next));
224-
} else {
225-
let array_index = next.array_index;
226-
finished_cursors.push(next);
227-
if !last_array[array_index] {
228-
break;
229-
}
230-
}
231-
}
232-
}
233-
234-
let mut merge_cursors_vec = merge_cursors
235-
.into_iter()
236-
.map(|Reverse(c)| c)
237-
.collect::<Vec<_>>();
238-
239-
merge_cursors_vec.append(&mut finished_cursors);
240-
241-
merge_cursors_vec.sort_by_key(|c| c.array_index);
242-
243-
Ok(merge_cursors_vec
244-
.into_iter()
245-
.zip(indices.into_iter())
246-
.map(|(c, indices_array)| {
247-
(c.row_index, Arc::new(UInt32Array::from(indices_array)))
248-
})
249-
.collect())
250-
}
251-
252171
fn comparators_for<'a>(
253172
arrays: Vec<&'a [ArrayRef]>,
254173
) -> Result<Vec<Box<dyn ArrayComparator + 'a>>> {
@@ -729,95 +648,11 @@ where
729648
mod tests {
730649
use crate::array::{ArrayRef, UInt32Array, UInt64Array};
731650
use crate::compute::kernels::if_op::if_primitive;
732-
use crate::compute::kernels::merge::{
733-
merge_join_indices, merge_sort_indices, MergeJoinType,
734-
};
651+
use crate::compute::kernels::merge::{merge_join_indices, MergeJoinType};
735652
use crate::compute::{is_not_null, take};
736653
use crate::error::ArrowError;
737654
use std::sync::Arc;
738655

739-
#[test]
740-
fn merge_sort() {
741-
let array_1: ArrayRef = Arc::new(UInt64Array::from(vec![1, 2, 2, 3, 5, 10, 20]));
742-
let array_2: ArrayRef = Arc::new(UInt64Array::from(vec![4, 8, 9, 15]));
743-
let array_3: ArrayRef = Arc::new(UInt64Array::from(vec![4, 7, 9, 15]));
744-
let vec1 = vec![array_1];
745-
let vec2 = vec![array_2];
746-
let vec3 = vec![array_3];
747-
let arrays = vec![vec1.as_slice(), vec2.as_slice(), vec3.as_slice()];
748-
let res = test_merge(arrays);
749-
750-
assert_eq!(
751-
res.as_any().downcast_ref::<UInt64Array>().unwrap(),
752-
&UInt64Array::from(vec![1, 2, 2, 3, 4, 4, 5, 7, 8, 9, 9, 10, 15, 15, 20])
753-
)
754-
}
755-
756-
#[test]
757-
fn merge_sort_with_nulls() {
758-
let array_1: ArrayRef = Arc::new(UInt64Array::from(vec![
759-
None,
760-
None,
761-
Some(1),
762-
Some(2),
763-
Some(2),
764-
Some(3),
765-
Some(5),
766-
Some(10),
767-
Some(20),
768-
]));
769-
let array_2: ArrayRef = Arc::new(UInt64Array::from(vec![
770-
None,
771-
None,
772-
None,
773-
None,
774-
Some(4),
775-
Some(8),
776-
Some(9),
777-
Some(15),
778-
]));
779-
let array_3: ArrayRef = Arc::new(UInt64Array::from(vec![
780-
None,
781-
Some(4),
782-
Some(7),
783-
Some(9),
784-
Some(15),
785-
]));
786-
let vec1 = vec![array_1];
787-
let vec2 = vec![array_2];
788-
let vec3 = vec![array_3];
789-
let arrays = vec![vec1.as_slice(), vec2.as_slice(), vec3.as_slice()];
790-
let res = test_merge(arrays);
791-
792-
assert_eq!(
793-
res.as_any().downcast_ref::<UInt64Array>().unwrap(),
794-
&UInt64Array::from(vec![
795-
None,
796-
None,
797-
None,
798-
None,
799-
None,
800-
None,
801-
None,
802-
Some(1),
803-
Some(2),
804-
Some(2),
805-
Some(3),
806-
Some(4),
807-
Some(4),
808-
Some(5),
809-
Some(7),
810-
Some(8),
811-
Some(9),
812-
Some(9),
813-
Some(10),
814-
Some(15),
815-
Some(15),
816-
Some(20)
817-
])
818-
)
819-
}
820-
821656
#[test]
822657
fn merge_join() {
823658
let array_left: ArrayRef = Arc::new(UInt64Array::from(vec![
@@ -1022,76 +857,4 @@ mod tests {
1022857
)
1023858
)
1024859
}
1025-
1026-
#[test]
1027-
fn single_array() {
1028-
let array_1: ArrayRef = Arc::new(UInt64Array::from(vec![1, 2, 2, 3, 5, 10, 20]));
1029-
let vec1 = vec![array_1];
1030-
let arrays = vec![vec1.as_slice()];
1031-
let res = test_merge(arrays);
1032-
1033-
assert_eq!(
1034-
res.as_any().downcast_ref::<UInt64Array>().unwrap(),
1035-
&UInt64Array::from(vec![1, 2, 2, 3, 5, 10, 20])
1036-
)
1037-
}
1038-
1039-
#[test]
1040-
fn empty_array() {
1041-
let array_1: ArrayRef = Arc::new(UInt64Array::from(vec![1, 2, 2, 3, 5, 10, 20]));
1042-
let array_2: ArrayRef = Arc::new(UInt64Array::from(Vec::<u64>::new()));
1043-
let vec1 = vec![array_1];
1044-
let vec2 = vec![array_2];
1045-
let arrays = vec![vec1.as_slice(), vec2.as_slice()];
1046-
let res = test_merge(arrays);
1047-
1048-
assert_eq!(
1049-
res.as_any().downcast_ref::<UInt64Array>().unwrap(),
1050-
&UInt64Array::from(vec![1, 2, 2, 3, 5, 10, 20])
1051-
)
1052-
}
1053-
1054-
#[test]
1055-
fn two_empty_arrays() {
1056-
let array_1: ArrayRef = Arc::new(UInt64Array::from(Vec::<u64>::new()));
1057-
let array_2: ArrayRef = Arc::new(UInt64Array::from(Vec::<u64>::new()));
1058-
let vec1 = vec![array_1];
1059-
let vec2 = vec![array_2];
1060-
let arrays = vec![vec1.as_slice(), vec2.as_slice()];
1061-
let res = test_merge(arrays);
1062-
1063-
assert_eq!(
1064-
res.as_any().downcast_ref::<UInt64Array>().unwrap(),
1065-
&UInt64Array::from(Vec::<u64>::new())
1066-
)
1067-
}
1068-
1069-
fn test_merge(arrays: Vec<&[ArrayRef]>) -> ArrayRef {
1070-
let result = merge_sort_indices(
1071-
arrays.clone(),
1072-
(0..arrays.len()).map(|_| 0).collect(),
1073-
(0..arrays.len()).map(|_| true).collect(),
1074-
)
1075-
.unwrap();
1076-
1077-
let mut array_values = result
1078-
.iter()
1079-
.enumerate()
1080-
.map(|(i, (_, a))| take(arrays[i][0].as_ref(), a, None))
1081-
.collect::<Result<Vec<_>, _>>()
1082-
.unwrap()
1083-
.into_iter();
1084-
let first = Ok(array_values.next().unwrap());
1085-
array_values
1086-
.fold(first, |res, b| {
1087-
res.and_then(|a| -> Result<ArrayRef, ArrowError> {
1088-
Ok(Arc::new(if_primitive(
1089-
&is_not_null(a.as_any().downcast_ref::<UInt64Array>().unwrap())?,
1090-
a.as_any().downcast_ref::<UInt64Array>().unwrap(),
1091-
b.as_any().downcast_ref::<UInt64Array>().unwrap(),
1092-
)?))
1093-
})
1094-
})
1095-
.unwrap()
1096-
}
1097860
}

rust/datafusion/src/cube_ext/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ pub mod join;
2020
pub mod joinagg;
2121
pub mod sequence;
2222
pub mod stream;
23+
pub mod util;

0 commit comments

Comments
 (0)