diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index f9985084cc4..13a7d4b30bc 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -15,24 +15,28 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, - compute::CastOptions, + array::{ + self, Array, ArrayRef, BinaryViewArray, GenericListArray, GenericListViewArray, + StructArray, UInt64Array, + }, + compute::{CastOptions, take}, datatypes::Field, error::Result, }; use arrow_schema::{ArrowError, DataType, FieldRef}; -use parquet_variant::{VariantPath, VariantPathElement}; +use parquet_variant::{EMPTY_VARIANT_METADATA_BYTES, VariantPath, VariantPathElement}; use crate::VariantArray; -use crate::variant_array::BorrowedShreddingState; +use crate::arrow_to_variant::ListLikeArray; use crate::variant_to_arrow::make_variant_to_arrow_row_builder; +use crate::{ShreddingState, variant_array::StructArrayBuilder}; use arrow::array::AsArray; use std::sync::Arc; -pub(crate) enum ShreddedPathStep<'a> { +pub(crate) enum ShreddedPathStep { /// Path step succeeded, return the new shredding state - Success(BorrowedShreddingState<'a>), + Success(ShreddingState), /// The path element is not present in the `typed_value` column and there is no `value` column, /// so we know it does not exist. It, and all paths under it, are all-NULL. Missing, @@ -42,16 +46,108 @@ pub(crate) enum ShreddedPathStep<'a> { NotShredded, } +/// Build the next shredding state by taking one list-like element (at `index`) per input row. +/// +/// With `cast_options.safe = true`, out-of-bounds indices become nulls for those rows. +/// With `cast_options.safe = false`, out-of-bounds indices return [`ArrowError::CastError`]. +fn take_list_like_index_as_shredding_state( + typed_value: &dyn Array, + index: usize, + cast_options: &CastOptions, +) -> Result> { + let list_array = typed_value.as_any().downcast_ref::().ok_or_else(|| { + ArrowError::ComputeError(format!( + "Expected array type '{}' while handling list-like path step, got '{}'", + std::any::type_name::(), + typed_value.data_type() + )) + })?; + + let values = list_array.values(); + + let Some(struct_array) = values.as_any().downcast_ref::() else { + return Ok(None); + }; + let shredding_state = ShreddingState::try_from(struct_array)?; + + let value_array = shredding_state.value_field(); + let typed_array = shredding_state.typed_value_field(); + + // If list elements have neither typed nor fallback value, this path step is missing. + if value_array.is_none() && typed_array.is_none() { + return Ok(None); + } + + let mut take_indices = Vec::with_capacity(list_array.len()); + for row in 0..list_array.len() { + let row_range = list_array.element_range(row); + let len = row_range.len(); + + if index < len { + let absolute_index = row_range.start.checked_add(index).ok_or_else(|| { + ArrowError::ComputeError( + "List-like index overflow while building take indices".into(), + ) + })?; + let absolute_index = u64::try_from(absolute_index).map_err(|_| { + ArrowError::ComputeError("List-like index does not fit into u64".into()) + })?; + take_indices.push(Some(absolute_index)); + } else if cast_options.safe { + take_indices.push(None); + } else { + return Err(ArrowError::CastError(format!( + "Cannot access index '{}' for row {} with list length {}", + index, row, len + ))); + } + } + + let index_array = UInt64Array::from(take_indices); + + // Gather both typed and fallback values at the requested element index. + let taken_value = value_array + .map(|value| take(value, &index_array, None)) + .transpose()?; + let taken_typed = typed_array + .map(|typed| take(typed, &index_array, None)) + .transpose()?; + + let metadata_array = BinaryViewArray::from_iter_values(std::iter::repeat_n( + EMPTY_VARIANT_METADATA_BYTES, + index_array.len(), + )); + + let mut builder = + StructArrayBuilder::new().with_field("metadata", Arc::new(metadata_array), false); + if let Some(taken_value) = taken_value { + builder = builder.with_field("value", taken_value, true); + } + if let Some(taken_typed) = taken_typed { + builder = builder.with_field("typed_value", taken_typed, true); + } + + Ok(Some(ShreddingState::try_from(&builder.build())?)) +} + /// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step /// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this /// level, or if `typed_value` is not a struct, or if the requested field name does not exist. /// -/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. -pub(crate) fn follow_shredded_path_element<'a>( - shredding_state: &BorrowedShreddingState<'a>, +/// Safe-cast behavior (`cast_options.safe = true`): +/// - Type mismatch during path traversal (for example field access on non-struct, index access on +/// non-list) returns [`ShreddedPathStep::Missing`] or [`ShreddedPathStep::NotShredded`], allowing +/// the caller to continue with null/fallback semantics. +/// - List index out-of-bounds produces nulls for the corresponding rows. +/// +/// Unsafe-cast behavior (`cast_options.safe = false`): +/// - Type mismatch during path traversal returns [`ArrowError::CastError`]. +/// - List index out-of-bounds returns [`ArrowError::CastError`]. +pub(crate) fn follow_shredded_path_element( + shredding_state: &ShreddingState, path_element: &VariantPathElement<'_>, cast_options: &CastOptions, -) -> Result> { +) -> Result { // If the requested path element is not present in `typed_value`, and `value` is missing, then // we know it does not exist; it, and all paths under it, are all-NULL. let missing_path_step = || match shredding_state.value_field() { @@ -96,15 +192,47 @@ pub(crate) fn follow_shredded_path_element<'a>( )) })?; - let state = BorrowedShreddingState::try_from(struct_array)?; + let state = ShreddingState::try_from(struct_array)?; Ok(ShreddedPathStep::Success(state)) } - VariantPathElement::Index { .. } => { - // TODO: Support array indexing. Among other things, it will require slicing not - // only the array we have here, but also the corresponding metadata and null masks. - Err(ArrowError::NotYetImplemented( - "Pathing into shredded variant array index".into(), - )) + VariantPathElement::Index { index } => { + let state = match typed_value.data_type() { + DataType::List(_) => take_list_like_index_as_shredding_state::< + GenericListArray, + >(typed_value.as_ref(), *index, cast_options)?, + DataType::LargeList(_) => take_list_like_index_as_shredding_state::< + GenericListArray, + >( + typed_value.as_ref(), *index, cast_options + )?, + DataType::ListView(_) => take_list_like_index_as_shredding_state::< + GenericListViewArray, + >( + typed_value.as_ref(), *index, cast_options + )?, + DataType::LargeListView(_) => take_list_like_index_as_shredding_state::< + GenericListViewArray, + >( + typed_value.as_ref(), *index, cast_options + )?, + _ if cast_options.safe => { + // With safe cast options, return NULL (missing_path_step) + return Ok(missing_path_step()); + } + _ => { + // Downcast failure - if strict cast options are enabled, this should be an error + return Err(ArrowError::CastError(format!( + "Cannot access index '{}' on non-list type: {}", + index, + typed_value.data_type() + ))); + } + }; + + match state { + Some(state) => Ok(ShreddedPathStep::Success(state)), + None => Ok(missing_path_step()), + } } } } @@ -160,7 +288,7 @@ fn shredded_get_path( // Peel away the prefix of path elements that traverses the shredded parts of this variant // column. Shredding will traverse the rest of the path on a per-row basis. - let mut shredding_state = input.shredding_state().borrow(); + let mut shredding_state = input.shredding_state().clone(); let mut accumulated_nulls = input.inner().nulls().cloned(); let mut path_index = 0; for path_element in path { @@ -334,7 +462,9 @@ mod test { use super::{GetOptions, variant_get}; use crate::variant_array::{ShreddedVariantFieldArray, StructArrayBuilder}; - use crate::{VariantArray, VariantArrayBuilder, json_to_variant}; + use crate::{ + ShreddedSchemaBuilder, VariantArray, VariantArrayBuilder, json_to_variant, shred_variant, + }; use arrow::array::{ Array, ArrayRef, AsArray, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array, @@ -424,7 +554,7 @@ mod test { fn get_primitive_variant_inside_object_of_list() { single_variant_get_test( r#"{"some_field": [1234]}"#, - VariantPath::try_from("some_field").unwrap().join(0), + VariantPath::try_from("some_field[0]").unwrap(), "1234", ); } @@ -1812,6 +1942,7 @@ mod test { Arc::new(struct_array) } + /// This test manually constructs a shredded variant array representing objects /// like {"x": 1, "y": "foo"} and {"x": 42} and tests extracting the "x" field /// as VariantArray using variant_get. @@ -1848,6 +1979,282 @@ mod test { assert_eq!(&result, &expected); } + /// This test uses a pre-shredded list array and validates index-path access. + #[test] + fn test_shredded_list_index_access() { + let array = shredded_list_variant_array(); + // Test: Extract the 0 index field as VariantArray first + let options = GetOptions::new_with_path(VariantPath::from(0)); + let result = variant_get(&array, options).unwrap(); + let result_variant = VariantArray::try_new(&result).unwrap(); + assert_eq!(result_variant.len(), 2); + + // Row 0: expect 0 index = "comedy" + assert_eq!(result_variant.value(0), Variant::from("comedy")); + // Row 1: expect 0 index = "horror" + assert_eq!(result_variant.value(1), Variant::from("horror")); + } + + /// Test extracting shredded list field with type conversion. + #[test] + fn test_shredded_list_as_string() { + let array = shredded_list_variant_array(); + // Test: Extract the 0 index values as StringArray (type conversion) + let field = Field::new("typed_value", DataType::Utf8, false); + let options = GetOptions::new_with_path(VariantPath::from(0)) + .with_as_type(Some(FieldRef::from(field))); + let result = variant_get(&array, options).unwrap(); + // Should get StringArray + let expected: ArrayRef = Arc::new(StringArray::from(vec![Some("comedy"), Some("horror")])); + assert_eq!(&result, &expected); + } + + #[test] + fn test_shredded_list_index_access_from_value_field() { + let array = shredded_list_variant_array(); + // Index 1 maps to "drama" for row 0, and to fallback value 123 for row 1. + let options = GetOptions::new_with_path(VariantPath::from(1)); + let result = variant_get(&array, options).unwrap(); + let result_variant = VariantArray::try_new(&result).unwrap(); + + assert_eq!(result_variant.value(0), Variant::from("drama")); + assert_eq!(result_variant.value(1).as_int64(), Some(123)); + } + + #[test] + fn test_shredded_list_index_access_from_value_field_as_int64() { + let array = shredded_list_variant_array(); + let field = Field::new("typed_value", DataType::Int64, true); + let options = GetOptions::new_with_path(VariantPath::from(1)) + .with_as_type(Some(FieldRef::from(field))); + let result = variant_get(&array, options).unwrap(); + + // "drama" -> NULL, 123 -> 123. + let expected: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(123)])); + assert_eq!(&result, &expected); + } + + #[test] + fn test_shredded_list_index_out_of_bounds_unsafe_cast_errors() { + let options = + GetOptions::new_with_path(VariantPath::from(10)).with_cast_options(CastOptions { + safe: false, + ..Default::default() + }); + + let err = variant_get(&shredded_list_variant_array(), options.clone()).unwrap_err(); + assert!(err.to_string().contains("Cannot access index '10'")); + } + + #[test] + fn test_shredded_large_list_index_access_from_value_field() { + let array = shredded_large_list_variant_array(); + // Index 1 maps to "drama" for row 0, and to fallback value 123 for row 1. + let options = GetOptions::new_with_path(VariantPath::from(1)); + let result = variant_get(&array, options).unwrap(); + let result_variant = VariantArray::try_new(&result).unwrap(); + + assert_eq!(result_variant.value(0), Variant::from("drama")); + assert_eq!(result_variant.value(1).as_int64(), Some(123)); + } + + #[test] + fn test_shredded_large_list_index_out_of_bounds_unsafe_cast_errors() { + let options = + GetOptions::new_with_path(VariantPath::from(10)).with_cast_options(CastOptions { + safe: false, + ..Default::default() + }); + + let err = variant_get(&shredded_large_list_variant_array(), options).unwrap_err(); + assert!(err.to_string().contains("Cannot access index '10'")); + } + + #[test] + fn test_shredded_list_view_index_access_from_value_field() { + let array = shredded_list_view_variant_array(); + let options = GetOptions::new_with_path(VariantPath::from(1)); + let result = variant_get(&array, options).unwrap(); + let result_variant = VariantArray::try_new(&result).unwrap(); + + assert_eq!(result_variant.value(0), Variant::from("drama")); + assert_eq!(result_variant.value(1).as_int64(), Some(123)); + } + + #[test] + fn test_shredded_list_view_index_out_of_bounds_unsafe_cast_errors() { + let options = + GetOptions::new_with_path(VariantPath::from(10)).with_cast_options(CastOptions { + safe: false, + ..Default::default() + }); + + let err = variant_get(&shredded_list_view_variant_array(), options).unwrap_err(); + assert!(err.to_string().contains("Cannot access index '10'")); + } + + #[test] + fn test_shredded_large_list_view_index_access_from_value_field() { + let array = shredded_large_list_view_variant_array(); + let options = GetOptions::new_with_path(VariantPath::from(1)); + let result = variant_get(&array, options).unwrap(); + let result_variant = VariantArray::try_new(&result).unwrap(); + + assert_eq!(result_variant.value(0), Variant::from("drama")); + assert_eq!(result_variant.value(1).as_int64(), Some(123)); + } + + #[test] + fn test_shredded_large_list_view_index_out_of_bounds_unsafe_cast_errors() { + let options = + GetOptions::new_with_path(VariantPath::from(10)).with_cast_options(CastOptions { + safe: false, + ..Default::default() + }); + + let err = variant_get(&shredded_large_list_view_variant_array(), options).unwrap_err(); + assert!(err.to_string().contains("Cannot access index '10'")); + } + + #[test] + fn test_shredded_list_in_struct_index_access() { + let array = shredded_struct_with_list_variant_array(); + let options = GetOptions::new_with_path(VariantPath::try_from("a[1]").unwrap()); + let result = variant_get(&array, options).unwrap(); + let result_variant = VariantArray::try_new(&result).unwrap(); + + assert_eq!(result_variant.value(0), Variant::from("drama")); + assert_eq!(result_variant.value(1).as_int64(), Some(123)); + } + + #[test] + fn test_shredded_struct_in_list_field_access() { + let array = shredded_list_of_struct_variant_array(); + let field = Field::new("x", DataType::Int32, true); + let path = VariantPath::from(0).join("x"); + let options = GetOptions::new_with_path(path).with_as_type(Some(FieldRef::from(field))); + let result = variant_get(&array, options).unwrap(); + + let expected: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), Some(3)])); + assert_eq!(&result, &expected); + } + + #[test] + fn test_shredded_list_of_lists_index_access() { + let array = shredded_list_of_lists_variant_array(); + let path = VariantPath::from(0).join(1); + + let result = variant_get(&array, GetOptions::new_with_path(path.clone())).unwrap(); + let result_variant = VariantArray::try_new(&result).unwrap(); + assert_eq!(result_variant.value(0), Variant::from("b")); + assert_eq!(result_variant.value(1).as_int64(), Some(123)); + + let field = Field::new("typed_value", DataType::Int64, true); + let casted = variant_get( + &array, + GetOptions::new_with_path(path).with_as_type(Some(FieldRef::from(field))), + ) + .unwrap(); + let expected: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(123)])); + assert_eq!(&casted, &expected); + } + + /// Helper to create a shredded list variant array used by list index tests. + /// + /// Rows: + /// 1. `["comedy", "drama"]` (fully shred-able as `Utf8`) + /// 2. `["horror", 123]` (partially shredded, with fallback for the numeric element) + fn shredded_list_variant_array() -> ArrayRef { + let json_rows: ArrayRef = Arc::new(StringArray::from(vec![ + Some(r#"["comedy", "drama"]"#), + Some(r#"["horror", 123]"#), + ])); + let input = json_to_variant(&json_rows).unwrap(); + + let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))); + let shredded = shred_variant(&input, &list_schema).unwrap(); + ArrayRef::from(shredded) + } + + fn shredded_struct_with_list_variant_array() -> ArrayRef { + let json_rows: ArrayRef = Arc::new(StringArray::from(vec![ + Some(r#"{"a": ["comedy", "drama"]}"#), + Some(r#"{"a": ["horror", 123]}"#), + ])); + let input = json_to_variant(&json_rows).unwrap(); + + let list_schema = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))); + let shredding_schema = ShreddedSchemaBuilder::default() + .with_path("a", &list_schema) + .unwrap() + .build(); + let shredded = shred_variant(&input, &shredding_schema).unwrap(); + ArrayRef::from(shredded) + } + + fn shredded_list_of_struct_variant_array() -> ArrayRef { + let json_rows: ArrayRef = Arc::new(StringArray::from(vec![ + Some(r#"[{"x": 1}, {"x": 2}]"#), + Some(r#"[{"x": 3}, {"y": 4}]"#), + ])); + let input = json_to_variant(&json_rows).unwrap(); + + let struct_type = + DataType::Struct(Fields::from(vec![Field::new("x", DataType::Int32, true)])); + let list_schema = DataType::List(Arc::new(Field::new("item", struct_type, true))); + let shredded = shred_variant(&input, &list_schema).unwrap(); + ArrayRef::from(shredded) + } + + fn shredded_list_of_lists_variant_array() -> ArrayRef { + let json_rows: ArrayRef = Arc::new(StringArray::from(vec![ + Some(r#"[["a", "b"], ["c", "d"]]"#), + Some(r#"[["x", 123], ["y", "z"]]"#), + ])); + let input = json_to_variant(&json_rows).unwrap(); + + let inner_list = DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))); + let outer_list = DataType::List(Arc::new(Field::new("item", inner_list, true))); + let shredded = shred_variant(&input, &outer_list).unwrap(); + ArrayRef::from(shredded) + } + + fn shredded_large_list_variant_array() -> ArrayRef { + let json_rows: ArrayRef = Arc::new(StringArray::from(vec![ + Some(r#"["comedy", "drama"]"#), + Some(r#"["horror", 123]"#), + ])); + let input = json_to_variant(&json_rows).unwrap(); + + let list_schema = DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, true))); + let shredded = shred_variant(&input, &list_schema).unwrap(); + ArrayRef::from(shredded) + } + + fn shredded_list_view_variant_array() -> ArrayRef { + let json_rows: ArrayRef = Arc::new(StringArray::from(vec![ + Some(r#"["comedy", "drama"]"#), + Some(r#"["horror", 123]"#), + ])); + let input = json_to_variant(&json_rows).unwrap(); + + let list_schema = DataType::ListView(Arc::new(Field::new("item", DataType::Utf8, true))); + let shredded = shred_variant(&input, &list_schema).unwrap(); + ArrayRef::from(shredded) + } + + fn shredded_large_list_view_variant_array() -> ArrayRef { + let json_rows: ArrayRef = Arc::new(StringArray::from(vec![ + Some(r#"["comedy", "drama"]"#), + Some(r#"["horror", 123]"#), + ])); + let input = json_to_variant(&json_rows).unwrap(); + + let list_schema = + DataType::LargeListView(Arc::new(Field::new("item", DataType::Utf8, true))); + let shredded = shred_variant(&input, &list_schema).unwrap(); + ArrayRef::from(shredded) + } /// Helper function to create a shredded variant array representing objects /// /// This creates an array that represents: