diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs index db48edd061605..c39df3de2f84b 100644 --- a/datafusion/common/src/unnest.rs +++ b/datafusion/common/src/unnest.rs @@ -63,6 +63,22 @@ use crate::Column; /// c1 c2 c1 c2 /// ``` /// +/// ## `Unnest(c1)`, preserve_nulls: true, preserve_empty_as_null: true (Spark's explode_outer behavior) +/// ```text +/// ┌─────────┐ ┌─────┐ ┌─────────┐ ┌─────┐ +/// │ {1, 2} │ │ A │ Unnest │ 1 │ │ A │ +/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ +/// │ null │ │ B │ │ 2 │ │ A │ +/// ├─────────┤ ├─────┤ ────────────▶ ├─────────┤ ├─────┤ +/// │ {} │ │ D │ │ null │ │ B │ +/// ├─────────┤ ├─────┤ ├─────────┤ ├─────┤ +/// │ {3} │ │ E │ │ null │ │ D │ <-- empty array {} produces null +/// └─────────┘ └─────┘ ├─────────┤ ├─────┤ +/// c1 c2 │ 3 │ │ E │ +/// └─────────┘ └─────┘ +/// c1 c2 +/// ``` +/// /// `recursions` instruct how a column should be unnested (e.g unnesting a column multiple /// time, with depth = 1 and depth = 2). Any unnested column not being mentioned inside this /// options is inferred to be unnested with depth = 1 @@ -70,6 +86,11 @@ use crate::Column; pub struct UnnestOptions { /// Should nulls in the input be preserved? Defaults to true pub preserve_nulls: bool, + /// Should empty arrays be treated as NULL arrays? Defaults to false. + /// When true, empty arrays `[]` will produce a row with NULL value, + /// similar to Spark's `explode_outer` behavior. + /// When false (default), empty arrays produce no output rows. + pub preserve_empty_as_null: bool, /// If specific columns need to be unnested multiple times (e.g at different depth), /// declare them here. Any unnested columns not being mentioned inside this option /// will be unnested with depth = 1 @@ -90,6 +111,8 @@ impl Default for UnnestOptions { Self { // default to true to maintain backwards compatible behavior preserve_nulls: true, + // default to false to maintain backwards compatible behavior + preserve_empty_as_null: false, recursions: vec![], } } @@ -108,6 +131,14 @@ impl UnnestOptions { self } + /// Set the behavior with empty arrays in the input. + /// When true, empty arrays `[]` will produce a row with NULL value, + /// similar to Spark's `explode_outer` behavior. + pub fn with_preserve_empty_as_null(mut self, preserve_empty_as_null: bool) -> Self { + self.preserve_empty_as_null = preserve_empty_as_null; + self + } + /// Set the recursions for the unnest operation pub fn with_recursions(mut self, recursion: RecursionUnnestOption) -> Self { self.recursions.push(recursion); diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 5fef754e80780..cce121ae457cf 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -749,10 +749,16 @@ fn build_batch( /// /// whereas if `preserve_nulls` is true, the longest length array will be: /// -/// /// ```ignore /// longest_length: [3, 1, 1, 2] /// ``` +/// +/// If `preserve_empty_as_null` is true (Spark's explode_outer behavior), +/// empty arrays are treated like NULL arrays: +/// +/// ```ignore +/// longest_length: [3, 1, 1, 2] // empty array [] at index 1 gets length 1 +/// ``` fn find_longest_length( list_arrays: &[ArrayRef], options: &UnnestOptions, @@ -763,14 +769,36 @@ fn find_longest_length( } else { Scalar::new(Int64Array::from_value(0, 1)) }; + + // The length to use for empty arrays when preserve_empty_as_null is true + let empty_length = Scalar::new(Int64Array::from_value(1, 1)); + let zero_length = Scalar::new(Int64Array::from_value(0, 1)); + let list_lengths: Vec = list_arrays .iter() .map(|list_array| { let mut length_array = length(list_array)?; // Make sure length arrays have the same type. Int64 is the most general one. length_array = cast(&length_array, &DataType::Int64)?; + + // Handle empty arrays when preserve_empty_as_null is true + // This must be done BEFORE handling NULLs, because we need to distinguish + // between actual empty arrays (length 0, not null) and NULL arrays. + // Empty arrays have length 0 but are not null, so we need to + // replace 0 lengths with 1 to produce a NULL output row (Spark's explode_outer behavior) + if options.preserve_empty_as_null { + // Only replace 0 with 1 for non-null entries (actual empty arrays) + let is_not_null_mask = is_not_null(&length_array)?; + let is_zero = kernels::cmp::eq(&length_array, &zero_length)?; + // is_empty_array = not null AND length == 0 + let is_empty_array = kernels::boolean::and(&is_not_null_mask, &is_zero)?; + length_array = zip(&is_empty_array, &empty_length, &length_array)?; + } + + // Handle NULL arrays: replace null lengths with null_length length_array = zip(&is_not_null(&length_array)?, &length_array, &null_length)?; + Ok(length_array) }) .collect::>()?; @@ -1193,6 +1221,7 @@ mod tests { &HashSet::default(), &UnnestOptions { preserve_nulls: true, + preserve_empty_as_null: false, recursions: vec![], }, )? @@ -1277,9 +1306,19 @@ mod tests { list_arrays: &[ArrayRef], preserve_nulls: bool, expected: Vec, + ) -> Result<()> { + verify_longest_length_with_options(list_arrays, preserve_nulls, false, expected) + } + + fn verify_longest_length_with_options( + list_arrays: &[ArrayRef], + preserve_nulls: bool, + preserve_empty_as_null: bool, + expected: Vec, ) -> Result<()> { let options = UnnestOptions { preserve_nulls, + preserve_empty_as_null, recursions: vec![], }; let longest_length = find_longest_length(list_arrays, &options)?; @@ -1326,6 +1365,43 @@ mod tests { Ok(()) } + #[test] + fn test_longest_list_length_preserve_empty_as_null() -> Result<()> { + // Test with single ListArray + // [A, B, C], [], NULL, [D], NULL, [NULL, F] + // With preserve_empty_as_null=true, empty arrays [] get length 1 + let list_array = Arc::new(make_generic_array::()) as ArrayRef; + + // preserve_nulls=false, preserve_empty_as_null=true + // NULL arrays still get length 0, but empty arrays get length 1 + verify_longest_length_with_options( + &[Arc::clone(&list_array)], + false, // preserve_nulls + true, // preserve_empty_as_null + vec![3, 1, 0, 1, 0, 2], // index 1 (empty []) now gets length 1 + )?; + + // preserve_nulls=true, preserve_empty_as_null=true (Spark's explode_outer behavior) + // Both NULL arrays and empty arrays get length 1 + verify_longest_length_with_options( + &[Arc::clone(&list_array)], + true, // preserve_nulls + true, // preserve_empty_as_null + vec![3, 1, 1, 1, 1, 2], // index 1 (empty []) gets length 1, NULLs also get 1 + )?; + + // Test with single LargeListArray + let list_array = Arc::new(make_generic_array::()) as ArrayRef; + verify_longest_length_with_options( + &[Arc::clone(&list_array)], + true, + true, + vec![3, 1, 1, 1, 1, 2], + )?; + + Ok(()) + } + #[test] fn test_create_take_indices() -> Result<()> { let length_array = Int64Array::from(vec![2, 3, 1]); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index bd7dd3a6aff3c..778ce29196cb1 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -311,6 +311,7 @@ message ColumnUnnestListRecursion { message UnnestOptions { bool preserve_nulls = 1; repeated RecursionUnnestOption recursions = 2; + bool preserve_empty_as_null = 3; } message RecursionUnnestOption { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e269606d163a3..e6bbcb674dbe4 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -23472,6 +23472,9 @@ impl serde::Serialize for UnnestOptions { if !self.recursions.is_empty() { len += 1; } + if self.preserve_empty_as_null { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.UnnestOptions", len)?; if self.preserve_nulls { struct_ser.serialize_field("preserveNulls", &self.preserve_nulls)?; @@ -23479,6 +23482,9 @@ impl serde::Serialize for UnnestOptions { if !self.recursions.is_empty() { struct_ser.serialize_field("recursions", &self.recursions)?; } + if self.preserve_empty_as_null { + struct_ser.serialize_field("preserveEmptyAsNull", &self.preserve_empty_as_null)?; + } struct_ser.end() } } @@ -23492,12 +23498,15 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { "preserve_nulls", "preserveNulls", "recursions", + "preserve_empty_as_null", + "preserveEmptyAsNull", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { PreserveNulls, Recursions, + PreserveEmptyAsNull, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -23521,6 +23530,7 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { match value { "preserveNulls" | "preserve_nulls" => Ok(GeneratedField::PreserveNulls), "recursions" => Ok(GeneratedField::Recursions), + "preserveEmptyAsNull" | "preserve_empty_as_null" => Ok(GeneratedField::PreserveEmptyAsNull), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -23542,6 +23552,7 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { { let mut preserve_nulls__ = None; let mut recursions__ = None; + let mut preserve_empty_as_null__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::PreserveNulls => { @@ -23556,11 +23567,18 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { } recursions__ = Some(map_.next_value()?); } + GeneratedField::PreserveEmptyAsNull => { + if preserve_empty_as_null__.is_some() { + return Err(serde::de::Error::duplicate_field("preserveEmptyAsNull")); + } + preserve_empty_as_null__ = Some(map_.next_value()?); + } } } Ok(UnnestOptions { preserve_nulls: preserve_nulls__.unwrap_or_default(), recursions: recursions__.unwrap_or_default(), + preserve_empty_as_null: preserve_empty_as_null__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index cf343e0258d0b..d0eecf8cea169 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -516,6 +516,8 @@ pub struct UnnestOptions { pub preserve_nulls: bool, #[prost(message, repeated, tag = "2")] pub recursions: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "3")] + pub preserve_empty_as_null: bool, } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct RecursionUnnestOption { diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 179fe8bb7d7fe..24c0fec4530ac 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -57,6 +57,7 @@ impl From<&protobuf::UnnestOptions> for UnnestOptions { fn from(opts: &protobuf::UnnestOptions) -> Self { Self { preserve_nulls: opts.preserve_nulls, + preserve_empty_as_null: opts.preserve_empty_as_null, recursions: opts .recursions .iter() diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 6e4e5d0b6eea4..7d4012dbf93d0 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -54,6 +54,7 @@ impl From<&UnnestOptions> for protobuf::UnnestOptions { fn from(opts: &UnnestOptions) -> Self { Self { preserve_nulls: opts.preserve_nulls, + preserve_empty_as_null: opts.preserve_empty_as_null, recursions: opts .recursions .iter()