Skip to content

Commit 7b65c5b

Browse files
stuartcarniealamb
andauthored
feat: Support binary data types for SortMergeJoin on clause (apache#17431)
* feat: Support binary data types for `SortMergeJoin` `on` clause * Add sql level tests for merge join on binary keys --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 528ed60 commit 7b65c5b

File tree

3 files changed

+209
-4
lines changed

3 files changed

+209
-4
lines changed

datafusion/physical-plan/src/joins/sort_merge_join/exec.rs

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,8 @@ mod tests {
586586

587587
use arrow::array::{
588588
builder::{BooleanBuilder, UInt64Builder},
589-
BooleanArray, Date32Array, Date64Array, Int32Array, RecordBatch, UInt64Array,
589+
BinaryArray, BooleanArray, Date32Array, Date64Array, FixedSizeBinaryArray,
590+
Int32Array, RecordBatch, UInt64Array,
590591
};
591592
use arrow::compute::{concat_batches, filter_record_batch, SortOptions};
592593
use arrow::datatypes::{DataType, Field, Schema};
@@ -685,6 +686,56 @@ mod tests {
685686
TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
686687
}
687688

689+
fn build_binary_table(
690+
a: (&str, &Vec<&[u8]>),
691+
b: (&str, &Vec<i32>),
692+
c: (&str, &Vec<i32>),
693+
) -> Arc<dyn ExecutionPlan> {
694+
let schema = Schema::new(vec![
695+
Field::new(a.0, DataType::Binary, false),
696+
Field::new(b.0, DataType::Int32, false),
697+
Field::new(c.0, DataType::Int32, false),
698+
]);
699+
700+
let batch = RecordBatch::try_new(
701+
Arc::new(schema),
702+
vec![
703+
Arc::new(BinaryArray::from(a.1.clone())),
704+
Arc::new(Int32Array::from(b.1.clone())),
705+
Arc::new(Int32Array::from(c.1.clone())),
706+
],
707+
)
708+
.unwrap();
709+
710+
let schema = batch.schema();
711+
TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
712+
}
713+
714+
fn build_fixed_size_binary_table(
715+
a: (&str, &Vec<&[u8]>),
716+
b: (&str, &Vec<i32>),
717+
c: (&str, &Vec<i32>),
718+
) -> Arc<dyn ExecutionPlan> {
719+
let schema = Schema::new(vec![
720+
Field::new(a.0, DataType::FixedSizeBinary(3), false),
721+
Field::new(b.0, DataType::Int32, false),
722+
Field::new(c.0, DataType::Int32, false),
723+
]);
724+
725+
let batch = RecordBatch::try_new(
726+
Arc::new(schema),
727+
vec![
728+
Arc::new(FixedSizeBinaryArray::from(a.1.clone())),
729+
Arc::new(Int32Array::from(b.1.clone())),
730+
Arc::new(Int32Array::from(c.1.clone())),
731+
],
732+
)
733+
.unwrap();
734+
735+
let schema = batch.schema();
736+
TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
737+
}
738+
688739
/// returns a table with 3 columns of i32 in memory
689740
pub fn build_table_i32_nullable(
690741
a: (&str, &Vec<Option<i32>>),
@@ -1923,6 +1974,100 @@ mod tests {
19231974
Ok(())
19241975
}
19251976

1977+
#[tokio::test]
1978+
async fn join_binary() -> Result<()> {
1979+
let left = build_binary_table(
1980+
(
1981+
"a1",
1982+
&vec![
1983+
&[0xc0, 0xff, 0xee],
1984+
&[0xde, 0xca, 0xde],
1985+
&[0xfa, 0xca, 0xde],
1986+
],
1987+
),
1988+
("b1", &vec![5, 10, 15]), // this has a repetition
1989+
("c1", &vec![7, 8, 9]),
1990+
);
1991+
let right = build_binary_table(
1992+
(
1993+
"a1",
1994+
&vec![
1995+
&[0xc0, 0xff, 0xee],
1996+
&[0xde, 0xca, 0xde],
1997+
&[0xfa, 0xca, 0xde],
1998+
],
1999+
),
2000+
("b2", &vec![105, 110, 115]),
2001+
("c2", &vec![70, 80, 90]),
2002+
);
2003+
2004+
let on = vec![(
2005+
Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2006+
Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2007+
)];
2008+
2009+
let (_, batches) = join_collect(left, right, on, Inner).await?;
2010+
2011+
// The output order is important as SMJ preserves sortedness
2012+
assert_snapshot!(batches_to_string(&batches), @r#"
2013+
+--------+----+----+--------+-----+----+
2014+
| a1 | b1 | c1 | a1 | b2 | c2 |
2015+
+--------+----+----+--------+-----+----+
2016+
| c0ffee | 5 | 7 | c0ffee | 105 | 70 |
2017+
| decade | 10 | 8 | decade | 110 | 80 |
2018+
| facade | 15 | 9 | facade | 115 | 90 |
2019+
+--------+----+----+--------+-----+----+
2020+
"#);
2021+
Ok(())
2022+
}
2023+
2024+
#[tokio::test]
2025+
async fn join_fixed_size_binary() -> Result<()> {
2026+
let left = build_fixed_size_binary_table(
2027+
(
2028+
"a1",
2029+
&vec![
2030+
&[0xc0, 0xff, 0xee],
2031+
&[0xde, 0xca, 0xde],
2032+
&[0xfa, 0xca, 0xde],
2033+
],
2034+
),
2035+
("b1", &vec![5, 10, 15]), // this has a repetition
2036+
("c1", &vec![7, 8, 9]),
2037+
);
2038+
let right = build_fixed_size_binary_table(
2039+
(
2040+
"a1",
2041+
&vec![
2042+
&[0xc0, 0xff, 0xee],
2043+
&[0xde, 0xca, 0xde],
2044+
&[0xfa, 0xca, 0xde],
2045+
],
2046+
),
2047+
("b2", &vec![105, 110, 115]),
2048+
("c2", &vec![70, 80, 90]),
2049+
);
2050+
2051+
let on = vec![(
2052+
Arc::new(Column::new_with_schema("a1", &left.schema())?) as _,
2053+
Arc::new(Column::new_with_schema("a1", &right.schema())?) as _,
2054+
)];
2055+
2056+
let (_, batches) = join_collect(left, right, on, Inner).await?;
2057+
2058+
// The output order is important as SMJ preserves sortedness
2059+
assert_snapshot!(batches_to_string(&batches), @r#"
2060+
+--------+----+----+--------+-----+----+
2061+
| a1 | b1 | c1 | a1 | b2 | c2 |
2062+
+--------+----+----+--------+-----+----+
2063+
| c0ffee | 5 | 7 | c0ffee | 105 | 70 |
2064+
| decade | 10 | 8 | decade | 110 | 80 |
2065+
| facade | 15 | 9 | facade | 115 | 90 |
2066+
+--------+----+----+--------+-----+----+
2067+
"#);
2068+
Ok(())
2069+
}
2070+
19262071
#[tokio::test]
19272072
async fn join_left_sort_order() -> Result<()> {
19282073
let left = build_table(

datafusion/physical-plan/src/joins/sort_merge_join/stream.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,6 +1918,10 @@ fn compare_join_arrays(
19181918
DataType::Utf8 => compare_value!(StringArray),
19191919
DataType::Utf8View => compare_value!(StringViewArray),
19201920
DataType::LargeUtf8 => compare_value!(LargeStringArray),
1921+
DataType::Binary => compare_value!(BinaryArray),
1922+
DataType::BinaryView => compare_value!(BinaryViewArray),
1923+
DataType::FixedSizeBinary(_) => compare_value!(FixedSizeBinaryArray),
1924+
DataType::LargeBinary => compare_value!(LargeBinaryArray),
19211925
DataType::Decimal128(..) => compare_value!(Decimal128Array),
19221926
DataType::Timestamp(time_unit, None) => match time_unit {
19231927
TimeUnit::Second => compare_value!(TimestampSecondArray),
@@ -1986,6 +1990,10 @@ fn is_join_arrays_equal(
19861990
DataType::Utf8 => compare_value!(StringArray),
19871991
DataType::Utf8View => compare_value!(StringViewArray),
19881992
DataType::LargeUtf8 => compare_value!(LargeStringArray),
1993+
DataType::Binary => compare_value!(BinaryArray),
1994+
DataType::BinaryView => compare_value!(BinaryViewArray),
1995+
DataType::FixedSizeBinary(_) => compare_value!(FixedSizeBinaryArray),
1996+
DataType::LargeBinary => compare_value!(LargeBinaryArray),
19891997
DataType::Decimal128(..) => compare_value!(Decimal128Array),
19901998
DataType::Timestamp(time_unit, None) => match time_unit {
19911999
TimeUnit::Second => compare_value!(TimestampSecondArray),

datafusion/sqllogictest/test_files/sort_merge_join.slt

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -833,9 +833,61 @@ t2 as (
833833
11 14
834834
12 15
835835

836-
# return sql params back to default values
837836
statement ok
838-
set datafusion.optimizer.prefer_hash_join = true;
837+
set datafusion.execution.batch_size = 8192;
839838

839+
840+
######
841+
## Tests for Binary, LargeBinary, BinaryView, FixedSizeBinary join keys
842+
######
840843
statement ok
841-
set datafusion.execution.batch_size = 8192;
844+
create table t1(x varchar, id1 int) as values ('aa', 1), ('bb', 2), ('aa', 3), (null, 4), ('ee', 5);
845+
846+
statement ok
847+
create table t2(y varchar, id2 int) as values ('ee', 10), ('bb', 20), ('cc', 30), ('cc', 40), (null, 50);
848+
849+
# Binary join keys
850+
query ?I?I
851+
with t1 as (select arrow_cast(x, 'Binary') as x, id1 from t1),
852+
t2 as (select arrow_cast(y, 'Binary') as y, id2 from t2)
853+
select * from t1 join t2 on t1.x = t2.y order by id1, id2
854+
----
855+
6262 2 6262 20
856+
6565 5 6565 10
857+
858+
# LargeBinary join keys
859+
query ?I?I
860+
with t1 as (select arrow_cast(x, 'LargeBinary') as x, id1 from t1),
861+
t2 as (select arrow_cast(y, 'LargeBinary') as y, id2 from t2)
862+
select * from t1 join t2 on t1.x = t2.y order by id1, id2
863+
----
864+
6262 2 6262 20
865+
6565 5 6565 10
866+
867+
# BinaryView join keys
868+
query ?I?I
869+
with t1 as (select arrow_cast(x, 'BinaryView') as x, id1 from t1),
870+
t2 as (select arrow_cast(y, 'BinaryView') as y, id2 from t2)
871+
select * from t1 join t2 on t1.x = t2.y order by id1, id2
872+
----
873+
6262 2 6262 20
874+
6565 5 6565 10
875+
876+
# FixedSizeBinary join keys
877+
query ?I?I
878+
with t1 as (select arrow_cast(arrow_cast(x, 'Binary'), 'FixedSizeBinary(2)') as x, id1 from t1),
879+
t2 as (select arrow_cast(arrow_cast(y, 'Binary'), 'FixedSizeBinary(2)') as y, id2 from t2)
880+
select * from t1 join t2 on t1.x = t2.y order by id1, id2
881+
----
882+
6262 2 6262 20
883+
6565 5 6565 10
884+
885+
statement ok
886+
drop table t1;
887+
888+
statement ok
889+
drop table t2;
890+
891+
# return sql params back to default values
892+
statement ok
893+
set datafusion.optimizer.prefer_hash_join = true;

0 commit comments

Comments
 (0)