Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions datafusion/common/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,34 @@ use crate::Column;
/// c1 c2 c1 c2
/// ```
///
/// ## `Unnest(c1)`, preserve_nulls: true, preserve_empty_as_null: true (Spark's explode_outer behavior)
/// ```text
/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐
/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │
/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤
/// │ null │ │ B │ │ 2 │ │ A │
/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤
/// │ {} │ │ D │ │ null │ │ B │
/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤
/// │ {3} │ │ E │ │ null │ │ D │ <-- empty array {} produces null
/// └─────────┘ └─────┘ ├─────────┤ ├─────┤
/// c1 c2 │ 3 │ │ E │
/// └─────────┘ └─────┘
/// c1 c2
/// ```
///
/// `recursions` instruct how a column should be unnested (e.g unnesting a column multiple
/// time, with depth = 1 and depth = 2). Any unnested column not being mentioned inside this
/// options is inferred to be unnested with depth = 1
#[derive(Debug, Clone, PartialEq, PartialOrd, Hash, Eq)]
pub struct UnnestOptions {
/// Should nulls in the input be preserved? Defaults to true
pub preserve_nulls: bool,
/// Should empty arrays be treated as NULL arrays? Defaults to false.
/// When true, empty arrays `[]` will produce a row with NULL value,
/// similar to Spark's `explode_outer` behavior.
/// When false (default), empty arrays produce no output rows.
pub preserve_empty_as_null: bool,
/// If specific columns need to be unnested multiple times (e.g at different depth),
/// declare them here. Any unnested columns not being mentioned inside this option
/// will be unnested with depth = 1
Expand All @@ -90,6 +111,8 @@ impl Default for UnnestOptions {
Self {
// default to true to maintain backwards compatible behavior
preserve_nulls: true,
// default to false to maintain backwards compatible behavior
preserve_empty_as_null: false,
recursions: vec![],
}
}
Expand All @@ -108,6 +131,14 @@ impl UnnestOptions {
self
}

/// Set the behavior with empty arrays in the input.
/// When true, empty arrays `[]` will produce a row with NULL value,
/// similar to Spark's `explode_outer` behavior.
pub fn with_preserve_empty_as_null(mut self, preserve_empty_as_null: bool) -> Self {
self.preserve_empty_as_null = preserve_empty_as_null;
self
}

/// Set the recursions for the unnest operation
pub fn with_recursions(mut self, recursion: RecursionUnnestOption) -> Self {
self.recursions.push(recursion);
Expand Down
78 changes: 77 additions & 1 deletion datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,16 @@ fn build_batch(
///
/// whereas if `preserve_nulls` is true, the longest length array will be:
///
///
/// ```ignore
/// longest_length: [3, 1, 1, 2]
/// ```
///
/// If `preserve_empty_as_null` is true (Spark's explode_outer behavior),
/// empty arrays are treated like NULL arrays:
///
/// ```ignore
/// longest_length: [3, 1, 1, 2] // empty array [] at index 1 gets length 1
/// ```
fn find_longest_length(
list_arrays: &[ArrayRef],
options: &UnnestOptions,
Expand All @@ -763,14 +769,36 @@ fn find_longest_length(
} else {
Scalar::new(Int64Array::from_value(0, 1))
};

// The length to use for empty arrays when preserve_empty_as_null is true
let empty_length = Scalar::new(Int64Array::from_value(1, 1));
let zero_length = Scalar::new(Int64Array::from_value(0, 1));

let list_lengths: Vec<ArrayRef> = list_arrays
.iter()
.map(|list_array| {
let mut length_array = length(list_array)?;
// Make sure length arrays have the same type. Int64 is the most general one.
length_array = cast(&length_array, &DataType::Int64)?;

// Handle empty arrays when preserve_empty_as_null is true
// This must be done BEFORE handling NULLs, because we need to distinguish
// between actual empty arrays (length 0, not null) and NULL arrays.
// Empty arrays have length 0 but are not null, so we need to
// replace 0 lengths with 1 to produce a NULL output row (Spark's explode_outer behavior)
if options.preserve_empty_as_null {
// Only replace 0 with 1 for non-null entries (actual empty arrays)
let is_not_null_mask = is_not_null(&length_array)?;
let is_zero = kernels::cmp::eq(&length_array, &zero_length)?;
// is_empty_array = not null AND length == 0
let is_empty_array = kernels::boolean::and(&is_not_null_mask, &is_zero)?;
length_array = zip(&is_empty_array, &empty_length, &length_array)?;
}

// Handle NULL arrays: replace null lengths with null_length
length_array =
zip(&is_not_null(&length_array)?, &length_array, &null_length)?;

Ok(length_array)
})
.collect::<Result<_>>()?;
Expand Down Expand Up @@ -1193,6 +1221,7 @@ mod tests {
&HashSet::default(),
&UnnestOptions {
preserve_nulls: true,
preserve_empty_as_null: false,
recursions: vec![],
},
)?
Expand Down Expand Up @@ -1277,9 +1306,19 @@ mod tests {
list_arrays: &[ArrayRef],
preserve_nulls: bool,
expected: Vec<i64>,
) -> Result<()> {
verify_longest_length_with_options(list_arrays, preserve_nulls, false, expected)
}

fn verify_longest_length_with_options(
list_arrays: &[ArrayRef],
preserve_nulls: bool,
preserve_empty_as_null: bool,
expected: Vec<i64>,
) -> Result<()> {
let options = UnnestOptions {
preserve_nulls,
preserve_empty_as_null,
recursions: vec![],
};
let longest_length = find_longest_length(list_arrays, &options)?;
Expand Down Expand Up @@ -1326,6 +1365,43 @@ mod tests {
Ok(())
}

#[test]
fn test_longest_list_length_preserve_empty_as_null() -> Result<()> {
// Test with single ListArray
// [A, B, C], [], NULL, [D], NULL, [NULL, F]
// With preserve_empty_as_null=true, empty arrays [] get length 1
let list_array = Arc::new(make_generic_array::<i32>()) as ArrayRef;

// preserve_nulls=false, preserve_empty_as_null=true
// NULL arrays still get length 0, but empty arrays get length 1
verify_longest_length_with_options(
&[Arc::clone(&list_array)],
false, // preserve_nulls
true, // preserve_empty_as_null
vec![3, 1, 0, 1, 0, 2], // index 1 (empty []) now gets length 1
)?;

// preserve_nulls=true, preserve_empty_as_null=true (Spark's explode_outer behavior)
// Both NULL arrays and empty arrays get length 1
verify_longest_length_with_options(
&[Arc::clone(&list_array)],
true, // preserve_nulls
true, // preserve_empty_as_null
vec![3, 1, 1, 1, 1, 2], // index 1 (empty []) gets length 1, NULLs also get 1
)?;

// Test with single LargeListArray
let list_array = Arc::new(make_generic_array::<i64>()) as ArrayRef;
verify_longest_length_with_options(
&[Arc::clone(&list_array)],
true,
true,
vec![3, 1, 1, 1, 1, 2],
)?;

Ok(())
}

#[test]
fn test_create_take_indices() -> Result<()> {
let length_array = Int64Array::from(vec![2, 3, 1]);
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ message ColumnUnnestListRecursion {
message UnnestOptions {
bool preserve_nulls = 1;
repeated RecursionUnnestOption recursions = 2;
bool preserve_empty_as_null = 3;
}

message RecursionUnnestOption {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl From<&protobuf::UnnestOptions> for UnnestOptions {
fn from(opts: &protobuf::UnnestOptions) -> Self {
Self {
preserve_nulls: opts.preserve_nulls,
preserve_empty_as_null: opts.preserve_empty_as_null,
recursions: opts
.recursions
.iter()
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl From<&UnnestOptions> for protobuf::UnnestOptions {
fn from(opts: &UnnestOptions) -> Self {
Self {
preserve_nulls: opts.preserve_nulls,
preserve_empty_as_null: opts.preserve_empty_as_null,
recursions: opts
.recursions
.iter()
Expand Down