Skip to content

Commit 2b6dcac

Browse files
Add preserve_empty_as_null option to unnest for Spark explode_outer compatibility
This PR adds a new flag to UnnestOptions that treats empty arrays the same as NULL arrays (both produce an output row with NULL). This enables Spark-compatible explode_outer behavior. Closes #19053
1 parent d13d891 commit 2b6dcac

File tree

7 files changed

+131
-1
lines changed

7 files changed

+131
-1
lines changed

datafusion/common/src/unnest.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,34 @@ use crate::Column;
6363
/// c1 c2 c1 c2
6464
/// ```
6565
///
66+
/// ## `Unnest(c1)`, preserve_nulls: true, preserve_empty_as_null: true (Spark's explode_outer behavior)
67+
/// ```text
68+
/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐
69+
/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │
70+
/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤
71+
/// │ null │ │ B │ │ 2 │ │ A │
72+
/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤
73+
/// │ {} │ │ D │ │ null │ │ B │
74+
/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤
75+
/// │ {3} │ │ E │ │ null │ │ D │ <-- empty array {} produces null
76+
/// └─────────┘ └─────┘ ├─────────┤ ├─────┤
77+
/// c1 c2 │ 3 │ │ E │
78+
/// └─────────┘ └─────┘
79+
/// c1 c2
80+
/// ```
81+
///
6682
/// `recursions` instruct how a column should be unnested (e.g unnesting a column multiple
6783
/// time, with depth = 1 and depth = 2). Any unnested column not being mentioned inside this
6884
/// options is inferred to be unnested with depth = 1
6985
#[derive(Debug, Clone, PartialEq, PartialOrd, Hash, Eq)]
7086
pub struct UnnestOptions {
7187
/// Should nulls in the input be preserved? Defaults to true
7288
pub preserve_nulls: bool,
89+
/// Should empty arrays be treated as NULL arrays? Defaults to false.
90+
/// When true, empty arrays `[]` will produce a row with NULL value,
91+
/// similar to Spark's `explode_outer` behavior.
92+
/// When false (default), empty arrays produce no output rows.
93+
pub preserve_empty_as_null: bool,
7394
/// If specific columns need to be unnested multiple times (e.g at different depth),
7495
/// declare them here. Any unnested columns not being mentioned inside this option
7596
/// will be unnested with depth = 1
@@ -90,6 +111,8 @@ impl Default for UnnestOptions {
90111
Self {
91112
// default to true to maintain backwards compatible behavior
92113
preserve_nulls: true,
114+
// default to false to maintain backwards compatible behavior
115+
preserve_empty_as_null: false,
93116
recursions: vec![],
94117
}
95118
}
@@ -108,6 +131,14 @@ impl UnnestOptions {
108131
self
109132
}
110133

134+
/// Set the behavior with empty arrays in the input.
135+
/// When true, empty arrays `[]` will produce a row with NULL value,
136+
/// similar to Spark's `explode_outer` behavior.
137+
pub fn with_preserve_empty_as_null(mut self, preserve_empty_as_null: bool) -> Self {
138+
self.preserve_empty_as_null = preserve_empty_as_null;
139+
self
140+
}
141+
111142
/// Set the recursions for the unnest operation
112143
pub fn with_recursions(mut self, recursion: RecursionUnnestOption) -> Self {
113144
self.recursions.push(recursion);

datafusion/physical-plan/src/unnest.rs

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,10 +749,16 @@ fn build_batch(
749749
///
750750
/// whereas if `preserve_nulls` is true, the longest length array will be:
751751
///
752-
///
753752
/// ```ignore
754753
/// longest_length: [3, 1, 1, 2]
755754
/// ```
755+
///
756+
/// If `preserve_empty_as_null` is true (Spark's explode_outer behavior),
757+
/// empty arrays are treated like NULL arrays:
758+
///
759+
/// ```ignore
760+
/// longest_length: [3, 1, 1, 2] // empty array [] at index 1 gets length 1
761+
/// ```
756762
fn find_longest_length(
757763
list_arrays: &[ArrayRef],
758764
options: &UnnestOptions,
@@ -763,14 +769,36 @@ fn find_longest_length(
763769
} else {
764770
Scalar::new(Int64Array::from_value(0, 1))
765771
};
772+
773+
// The length to use for empty arrays when preserve_empty_as_null is true
774+
let empty_length = Scalar::new(Int64Array::from_value(1, 1));
775+
let zero_length = Scalar::new(Int64Array::from_value(0, 1));
776+
766777
let list_lengths: Vec<ArrayRef> = list_arrays
767778
.iter()
768779
.map(|list_array| {
769780
let mut length_array = length(list_array)?;
770781
// Make sure length arrays have the same type. Int64 is the most general one.
771782
length_array = cast(&length_array, &DataType::Int64)?;
783+
784+
// Handle empty arrays when preserve_empty_as_null is true
785+
// This must be done BEFORE handling NULLs, because we need to distinguish
786+
// between actual empty arrays (length 0, not null) and NULL arrays.
787+
// Empty arrays have length 0 but are not null, so we need to
788+
// replace 0 lengths with 1 to produce a NULL output row (Spark's explode_outer behavior)
789+
if options.preserve_empty_as_null {
790+
// Only replace 0 with 1 for non-null entries (actual empty arrays)
791+
let is_not_null_mask = is_not_null(&length_array)?;
792+
let is_zero = kernels::cmp::eq(&length_array, &zero_length)?;
793+
// is_empty_array = not null AND length == 0
794+
let is_empty_array = kernels::boolean::and(&is_not_null_mask, &is_zero)?;
795+
length_array = zip(&is_empty_array, &empty_length, &length_array)?;
796+
}
797+
798+
// Handle NULL arrays: replace null lengths with null_length
772799
length_array =
773800
zip(&is_not_null(&length_array)?, &length_array, &null_length)?;
801+
774802
Ok(length_array)
775803
})
776804
.collect::<Result<_>>()?;
@@ -1193,6 +1221,7 @@ mod tests {
11931221
&HashSet::default(),
11941222
&UnnestOptions {
11951223
preserve_nulls: true,
1224+
preserve_empty_as_null: false,
11961225
recursions: vec![],
11971226
},
11981227
)?
@@ -1277,9 +1306,19 @@ mod tests {
12771306
list_arrays: &[ArrayRef],
12781307
preserve_nulls: bool,
12791308
expected: Vec<i64>,
1309+
) -> Result<()> {
1310+
verify_longest_length_with_options(list_arrays, preserve_nulls, false, expected)
1311+
}
1312+
1313+
fn verify_longest_length_with_options(
1314+
list_arrays: &[ArrayRef],
1315+
preserve_nulls: bool,
1316+
preserve_empty_as_null: bool,
1317+
expected: Vec<i64>,
12801318
) -> Result<()> {
12811319
let options = UnnestOptions {
12821320
preserve_nulls,
1321+
preserve_empty_as_null,
12831322
recursions: vec![],
12841323
};
12851324
let longest_length = find_longest_length(list_arrays, &options)?;
@@ -1326,6 +1365,43 @@ mod tests {
13261365
Ok(())
13271366
}
13281367

1368+
#[test]
1369+
fn test_longest_list_length_preserve_empty_as_null() -> Result<()> {
1370+
// Test with single ListArray
1371+
// [A, B, C], [], NULL, [D], NULL, [NULL, F]
1372+
// With preserve_empty_as_null=true, empty arrays [] get length 1
1373+
let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;
1374+
1375+
// preserve_nulls=false, preserve_empty_as_null=true
1376+
// NULL arrays still get length 0, but empty arrays get length 1
1377+
verify_longest_length_with_options(
1378+
&[Arc::clone(&list_array)],
1379+
false, // preserve_nulls
1380+
true, // preserve_empty_as_null
1381+
vec![3, 1, 0, 1, 0, 2], // index 1 (empty []) now gets length 1
1382+
)?;
1383+
1384+
// preserve_nulls=true, preserve_empty_as_null=true (Spark's explode_outer behavior)
1385+
// Both NULL arrays and empty arrays get length 1
1386+
verify_longest_length_with_options(
1387+
&[Arc::clone(&list_array)],
1388+
true, // preserve_nulls
1389+
true, // preserve_empty_as_null
1390+
vec![3, 1, 1, 1, 1, 2], // index 1 (empty []) gets length 1, NULLs also get 1
1391+
)?;
1392+
1393+
// Test with single LargeListArray
1394+
let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
1395+
verify_longest_length_with_options(
1396+
&[Arc::clone(&list_array)],
1397+
true,
1398+
true,
1399+
vec![3, 1, 1, 1, 1, 2],
1400+
)?;
1401+
1402+
Ok(())
1403+
}
1404+
13291405
#[test]
13301406
fn test_create_take_indices() -> Result<()> {
13311407
let length_array = Int64Array::from(vec![2, 3, 1]);

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ message ColumnUnnestListRecursion {
311311
message UnnestOptions {
312312
bool preserve_nulls = 1;
313313
repeated RecursionUnnestOption recursions = 2;
314+
bool preserve_empty_as_null = 3;
314315
}
315316

316317
message RecursionUnnestOption {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl From<&protobuf::UnnestOptions> for UnnestOptions {
5757
fn from(opts: &protobuf::UnnestOptions) -> Self {
5858
Self {
5959
preserve_nulls: opts.preserve_nulls,
60+
preserve_empty_as_null: opts.preserve_empty_as_null,
6061
recursions: opts
6162
.recursions
6263
.iter()

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl From<&UnnestOptions> for protobuf::UnnestOptions {
5454
fn from(opts: &UnnestOptions) -> Self {
5555
Self {
5656
preserve_nulls: opts.preserve_nulls,
57+
preserve_empty_as_null: opts.preserve_empty_as_null,
5758
recursions: opts
5859
.recursions
5960
.iter()

0 commit comments

Comments
 (0)