Skip to content

Commit e2d3e1f

Browse files
committed
fix: HashJoin panic with String dictionary keys (don't flatten keys) (apache#20505)
- Fixes apache#20696 - Follow on to apache#20441 apache#20441 (review) fixes the special case DictionaryArray handling in Joins. However, I don't think we need to special case DictionaryArrays at all 1. Remove the special case dictionary handling Yes by CI No (though maybe some queries get faster)
1 parent 9a67de5 commit e2d3e1f

File tree

2 files changed

+113
-26
lines changed

2 files changed

+113
-26
lines changed

datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
use std::sync::Arc;
2121

2222
use arrow::array::{ArrayRef, StructArray};
23-
use arrow::compute::cast;
2423
use arrow::datatypes::{Field, FieldRef, Fields};
2524
use arrow_schema::DataType;
2625
use datafusion_common::Result;
@@ -33,19 +32,6 @@ pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result<Fields> {
3332
.collect()
3433
}
3534

36-
/// Casts dictionary-encoded arrays to their underlying value type, preserving row count.
37-
/// Non-dictionary arrays are returned as-is.
38-
fn flatten_dictionary_array(array: &ArrayRef) -> Result<ArrayRef> {
39-
match array.data_type() {
40-
DataType::Dictionary(_, value_type) => {
41-
let casted = cast(array, value_type)?;
42-
// Recursively flatten in case of nested dictionaries
43-
flatten_dictionary_array(&casted)
44-
}
45-
_ => Ok(Arc::clone(array)),
46-
}
47-
}
48-
4935
/// Builds InList values from join key column arrays.
5036
///
5137
/// If `join_key_arrays` is:
@@ -65,20 +51,14 @@ fn flatten_dictionary_array(array: &ArrayRef) -> Result<ArrayRef> {
6551
pub(super) fn build_struct_inlist_values(
6652
join_key_arrays: &[ArrayRef],
6753
) -> Result<Option<ArrayRef>> {
68-
// Flatten any dictionary-encoded arrays
69-
let flattened_arrays: Vec<ArrayRef> = join_key_arrays
70-
.iter()
71-
.map(flatten_dictionary_array)
72-
.collect::<Result<Vec<_>>>()?;
73-
7454
// Build the source array/struct
75-
let source_array: ArrayRef = if flattened_arrays.len() == 1 {
55+
let source_array: ArrayRef = if join_key_arrays.len() == 1 {
7656
// Single column: use directly
77-
Arc::clone(&flattened_arrays[0])
57+
Arc::clone(&join_key_arrays[0])
7858
} else {
7959
// Multi-column: build StructArray once from all columns
8060
let fields = build_struct_fields(
81-
&flattened_arrays
61+
&join_key_arrays
8262
.iter()
8363
.map(|arr| arr.data_type().clone())
8464
.collect::<Vec<_>>(),
@@ -88,7 +68,7 @@ pub(super) fn build_struct_inlist_values(
8868
let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields
8969
.iter()
9070
.cloned()
91-
.zip(flattened_arrays.iter().cloned())
71+
.zip(join_key_arrays.iter().cloned())
9272
.collect();
9373

9474
Arc::new(StructArray::from(arrays_with_fields))
@@ -152,7 +132,14 @@ mod tests {
152132
assert_eq!(
153133
*result.data_type(),
154134
DataType::Struct(
155-
build_struct_fields(&[DataType::Utf8, DataType::Int32]).unwrap()
135+
build_struct_fields(&[
136+
DataType::Dictionary(
137+
Box::new(DataType::Int8),
138+
Box::new(DataType::Utf8)
139+
),
140+
DataType::Int32
141+
])
142+
.unwrap()
156143
)
157144
);
158145
}
@@ -168,6 +155,6 @@ mod tests {
168155
.unwrap();
169156

170157
assert_eq!(result.len(), 3);
171-
assert_eq!(*result.data_type(), DataType::Utf8);
158+
assert_eq!(result.data_type(), dict_array.data_type());
172159
}
173160
}

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,3 +563,103 @@ ORDER BY start_timestamp, trace_id
563563
LIMIT 1;
564564
----
565565
2024-10-01T00:00:00
566+
567+
568+
statement ok
569+
set datafusion.execution.parquet.pushdown_filters = false;
570+
571+
# Regression test for https://github.com/apache/datafusion/issues/20696
572+
# Multi-column INNER JOIN with dictionary fails
573+
# when parquet pushdown filters are enabled.
574+
575+
statement ok
576+
COPY (
577+
SELECT
578+
to_timestamp_nanos(time_ns) AS time,
579+
arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
580+
arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
581+
temp
582+
FROM (
583+
VALUES
584+
(200, 'CA', 'LA', 90.0),
585+
(250, 'MA', 'Boston', 72.4),
586+
(100, 'MA', 'Boston', 70.4),
587+
(350, 'CA', 'LA', 90.0)
588+
) AS t(time_ns, state, city, temp)
589+
)
590+
TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/data.parquet';
591+
592+
statement ok
593+
COPY (
594+
SELECT
595+
to_timestamp_nanos(time_ns) AS time,
596+
arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
597+
arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
598+
temp,
599+
reading
600+
FROM (
601+
VALUES
602+
(250, 'MA', 'Boston', 53.4, 51.0),
603+
(100, 'MA', 'Boston', 50.4, 50.0)
604+
) AS t(time_ns, state, city, temp, reading)
605+
)
606+
TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/data.parquet';
607+
608+
statement ok
609+
CREATE EXTERNAL TABLE h2o_parquet_20696 STORED AS PARQUET
610+
LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/';
611+
612+
statement ok
613+
CREATE EXTERNAL TABLE o2_parquet_20696 STORED AS PARQUET
614+
LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/';
615+
616+
# Query should work both with and without filters
617+
statement ok
618+
set datafusion.execution.parquet.pushdown_filters = false;
619+
620+
query RRR
621+
SELECT
622+
h2o_parquet_20696.temp AS h2o_temp,
623+
o2_parquet_20696.temp AS o2_temp,
624+
o2_parquet_20696.reading
625+
FROM h2o_parquet_20696
626+
INNER JOIN o2_parquet_20696
627+
ON h2o_parquet_20696.time = o2_parquet_20696.time
628+
AND h2o_parquet_20696.state = o2_parquet_20696.state
629+
AND h2o_parquet_20696.city = o2_parquet_20696.city
630+
WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
631+
AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
632+
----
633+
72.4 53.4 51
634+
70.4 50.4 50
635+
636+
637+
statement ok
638+
set datafusion.execution.parquet.pushdown_filters = true;
639+
640+
query RRR
641+
SELECT
642+
h2o_parquet_20696.temp AS h2o_temp,
643+
o2_parquet_20696.temp AS o2_temp,
644+
o2_parquet_20696.reading
645+
FROM h2o_parquet_20696
646+
INNER JOIN o2_parquet_20696
647+
ON h2o_parquet_20696.time = o2_parquet_20696.time
648+
AND h2o_parquet_20696.state = o2_parquet_20696.state
649+
AND h2o_parquet_20696.city = o2_parquet_20696.city
650+
WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
651+
AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
652+
----
653+
72.4 53.4 51
654+
70.4 50.4 50
655+
656+
# Cleanup
657+
statement ok
658+
DROP TABLE h2o_parquet_20696;
659+
660+
statement ok
661+
DROP TABLE o2_parquet_20696;
662+
663+
# Cleanup settings
664+
statement ok
665+
set datafusion.execution.parquet.pushdown_filters = false;

0 commit comments

Comments
 (0)