Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 139 additions & 10 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{

use crate::arrow::array::{
cast::AsArray, make_array, new_null_array, Array as ArrowArray, BooleanArray, GenericListArray,
MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray,
MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray, StructArray,
};
use crate::arrow::buffer::NullBuffer;
use crate::arrow::compute::concat_batches;
Expand Down Expand Up @@ -168,15 +168,17 @@ impl RowIndexBuilder {
/// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have
/// accurate null masks that row visitors rely on for correctness.
/// `row_indexes` are passed through to `reorder_struct_array`.
/// `file_location` is used to populate file metadata columns if requested.
pub(crate) fn fixup_parquet_read<T>(
batch: RecordBatch,
requested_ordering: &[ReorderIndex],
row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
file_location: Option<&str>,
) -> DeltaResult<T>
where
StructArray: Into<T>,
{
let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes)?;
let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes, file_location)?;
let data = fix_nested_null_masks(data);
Ok(data.into())
}
Expand Down Expand Up @@ -306,6 +308,8 @@ pub(crate) enum ReorderIndexTransform {
Missing(ArrowFieldRef),
/// Row index column requested, compute it
RowIndex(ArrowFieldRef),
/// File name column requested, populate with file path
FileName(ArrowFieldRef),
}

impl ReorderIndex {
Expand Down Expand Up @@ -333,14 +337,19 @@ impl ReorderIndex {
ReorderIndex::new(index, ReorderIndexTransform::RowIndex(field))
}

fn file_name(index: usize, field: ArrowFieldRef) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::FileName(field))
}

/// Check if this reordering requires a transformation anywhere. See comment below on
/// [`ordering_needs_transform`] to understand why this is needed.
fn needs_transform(&self) -> bool {
match self.transform {
// if we're casting, inserting null, or generating row index, we need to transform
// if we're casting, inserting null, or generating row index/file name, we need to transform
ReorderIndexTransform::Cast(_)
| ReorderIndexTransform::Missing(_)
| ReorderIndexTransform::RowIndex(_) => true,
| ReorderIndexTransform::RowIndex(_)
| ReorderIndexTransform::FileName(_) => true,
// if our nested ordering needs a transform, we need a transform
ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children),
// no transform needed
Expand Down Expand Up @@ -595,6 +604,13 @@ fn get_indices(
Arc::new(field.try_into_arrow()?),
));
}
Some(MetadataColumnSpec::FileName) => {
debug!("Inserting a file name column: {}", field.name());
reorder_indices.push(ReorderIndex::file_name(
requested_position,
Arc::new(field.try_into_arrow()?),
));
}
Some(metadata_spec) => {
return Err(Error::Generic(format!(
"Metadata column {metadata_spec:?} is not supported by the default parquet reader"
Expand Down Expand Up @@ -765,10 +781,13 @@ type FieldArrayOpt = Option<(Arc<ArrowField>, Arc<dyn ArrowArray>)>;
///
/// If the requested ordering contains a [`ReorderIndexTransform::RowIndex`], `row_indexes`
/// must not be `None` to append a row index column to the output.
/// If the requested ordering contains a [`ReorderIndexTransform::FileName`], `file_location`
/// must not be `None` to append a file name column to the output.
pub(crate) fn reorder_struct_array(
input_data: StructArray,
requested_ordering: &[ReorderIndex],
mut row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
file_location: Option<&str>,
) -> DeltaResult<StructArray> {
debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}");
if !ordering_needs_transform(requested_ordering) {
Expand Down Expand Up @@ -806,6 +825,7 @@ pub(crate) fn reorder_struct_array(
struct_array,
children,
None, // Nested structures don't need row indexes since metadata columns can't be nested
None, // No file_location passed since metadata columns can't be nested
)?);
// create the new field specifying the correct order for the struct
let new_field = Arc::new(ArrowField::new_struct(
Expand Down Expand Up @@ -866,6 +886,27 @@ pub(crate) fn reorder_struct_array(
final_fields_cols[reorder_index.index] =
Some((Arc::clone(field), Arc::new(row_index_array)));
}
ReorderIndexTransform::FileName(field) => {
let Some(file_path) = file_location else {
return Err(Error::generic(
"File name column requested but file location not provided",
));
};
// Use run-end encoding for efficiency since the file name is constant for all rows
// Run-end encoding stores: [run_ends: [num_rows], values: [file_path]]
let run_ends = PrimitiveArray::<Int64Type>::from_iter_values([num_rows as i64]);
let values = StringArray::from_iter_values([file_path]);
let file_name_array = RunArray::try_new(&run_ends, &values)?;

// Create a field with the RunEndEncoded data type to match the array
let ree_field = Arc::new(ArrowField::new(
field.name(),
file_name_array.data_type().clone(),
field.is_nullable(),
));
final_fields_cols[reorder_index.index] =
Some((ree_field, Arc::new(file_name_array)));
}
}
}
let num_cols = final_fields_cols.len();
Expand Down Expand Up @@ -895,6 +936,7 @@ fn reorder_list<O: OffsetSizeTrait>(
struct_array,
children,
None, // Nested structures don't need row indexes since metadata columns can't be nested
None, // No file_location passed since metadata columns can't be nested
)?);
let new_list_field = Arc::new(ArrowField::new_struct(
list_field.name(),
Expand Down Expand Up @@ -930,6 +972,7 @@ fn reorder_map(
struct_array,
children,
None, // Nested structures don't need row indexes since metadata columns can't be nested
None, // No file_location passed since metadata columns can't be nested
)?;
let result_fields = result_array.fields();
let new_map_field = Arc::new(ArrowField::new_struct(
Expand Down Expand Up @@ -1793,7 +1836,7 @@ mod tests {
),
];

let result = reorder_struct_array(arry, &reorder, None);
let result = reorder_struct_array(arry, &reorder, None, None);
assert_result_error_with_message(
result,
"Row index column requested but row index iterator not provided",
Expand All @@ -1816,7 +1859,7 @@ mod tests {
#[allow(clippy::single_range_in_vec_init)]
let mut row_indexes = vec![(0..4)].into_iter().flatten();

let ordered = reorder_struct_array(arry, &reorder, Some(&mut row_indexes)).unwrap();
let ordered = reorder_struct_array(arry, &reorder, Some(&mut row_indexes), None).unwrap();
assert_eq!(ordered.column_names(), vec!["b", "row_idx"]);

// Verify the row index column contains the expected values
Expand Down Expand Up @@ -1853,6 +1896,92 @@ mod tests {
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn simple_file_name_field() {
let requested_schema = Arc::new(StructType::new_unchecked([
StructField::not_null("i", DataType::INTEGER),
StructField::create_metadata_column("_file", MetadataColumnSpec::FileName),
StructField::nullable("i2", DataType::INTEGER),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let mut arrow_file_name_field = ArrowField::new("_file", ArrowDataType::Utf8, false);
arrow_file_name_field.set_metadata(HashMap::from([(
"delta.metadataSpec".to_string(),
"_file".to_string(),
)]));
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::file_name(1, Arc::new(arrow_file_name_field)),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn test_reorder_struct_array_with_file_name() {
// Test that file names work when properly provided
let arry = make_struct_array();
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::file_name(
1,
Arc::new(ArrowField::new("_file", ArrowDataType::Utf8, false)),
),
];

let file_location = "s3://bucket/path/to/file.parquet";
let ordered = reorder_struct_array(arry, &reorder, None, Some(file_location)).unwrap();
assert_eq!(ordered.column_names(), vec!["b", "_file"]);

// Verify the file name column is run-end encoded and contains the expected value
let file_name_col = ordered.column(1);

// Check it's a RunArray<Int64Type, StringArray>
let run_array = file_name_col
.as_any()
.downcast_ref::<RunArray<Int64Type>>()
.expect("Expected RunArray");

// Verify it has 4 logical rows (same as input)
assert_eq!(run_array.len(), 4);

// Verify the physical representation is efficient: 1 run with value at end position 4
let run_ends = run_array.run_ends().values();
assert_eq!(run_ends.len(), 1, "Should have only 1 run");
assert_eq!(run_ends[0], 4, "Run should end at position 4");

// Verify the value
let values = run_array.values().as_string::<i32>();
assert_eq!(values.len(), 1, "Should have only 1 unique value");
assert_eq!(values.value(0), file_location);
}

#[test]
fn test_reorder_struct_array_missing_file_name() {
// Test that error occurs when file name is requested but not provided
let arry = make_struct_array();
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::file_name(
1,
Arc::new(ArrowField::new("_file", ArrowDataType::Utf8, false)),
),
];

let result = reorder_struct_array(arry, &reorder, None, None);
assert_result_error_with_message(
result,
"File name column requested but file location not provided",
);
}

#[test]
fn test_row_index_builder_no_skipping() {
let row_groups = vec![
Expand Down Expand Up @@ -2530,7 +2659,7 @@ mod tests {
fn simple_reorder_struct() {
let arry = make_struct_array();
let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)];
let ordered = reorder_struct_array(arry, &reorder, None).unwrap();
let ordered = reorder_struct_array(arry, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["c", "b"]);
}

Expand Down Expand Up @@ -2578,7 +2707,7 @@ mod tests {
],
),
];
let ordered = reorder_struct_array(nested, &reorder, None).unwrap();
let ordered = reorder_struct_array(nested, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]);
let ordered_s2 = ordered.column(0).as_struct();
assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]);
Expand Down Expand Up @@ -2625,7 +2754,7 @@ mod tests {
0,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
)];
let ordered = reorder_struct_array(struct_array, &reorder, None).unwrap();
let ordered = reorder_struct_array(struct_array, &reorder, None, None).unwrap();
let ordered_list_col = ordered.column(0).as_list::<i32>();
for i in 0..ordered_list_col.len() {
let array_item = ordered_list_col.value(i);
Expand Down Expand Up @@ -2691,7 +2820,7 @@ mod tests {
],
),
];
let ordered = reorder_struct_array(struct_array, &reorder, None).unwrap();
let ordered = reorder_struct_array(struct_array, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["map", "i"]);
if let ArrowDataType::Map(field, _) = ordered.column(0).data_type() {
if let ArrowDataType::Struct(fields) = field.data_type() {
Expand Down
18 changes: 15 additions & 3 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta, _range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture> {
let path = Path::from_url_path(file_meta.location.path())?;
let store = self.store.clone();
let file_location = file_meta.location.to_string();

let batch_size = self.batch_size;
// let projection = self.projection.clone();
Expand Down Expand Up @@ -341,7 +342,12 @@ impl FileOpener for ParquetOpener {
let stream = builder.with_batch_size(batch_size).build()?;

let stream = stream.map(move |rbr| {
fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut())
fixup_parquet_read(
rbr?,
&requested_ordering,
row_indexes.as_mut(),
Some(&file_location),
)
});
Ok(stream.boxed())
}))
Expand Down Expand Up @@ -380,10 +386,11 @@ impl FileOpener for PresignedUrlOpener {
let predicate = self.predicate.clone();
let limit = self.limit;
let client = self.client.clone(); // uses Arc internally according to reqwest docs
let file_location = file_meta.location.to_string();

Ok(Box::pin(async move {
// fetch the file from the interweb
let reader = client.get(file_meta.location).send().await?.bytes().await?;
let reader = client.get(&file_location).send().await?.bytes().await?;
let metadata = ArrowReaderMetadata::load(&reader, Default::default())?;
let parquet_schema = metadata.schema();
let (indices, requested_ordering) =
Expand Down Expand Up @@ -418,7 +425,12 @@ impl FileOpener for PresignedUrlOpener {
let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?;
let stream = futures::stream::iter(reader);
let stream = stream.map(move |rbr| {
fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut())
fixup_parquet_read(
rbr?,
&requested_ordering,
row_indexes.as_mut(),
Some(&file_location),
)
});
Ok(stream.boxed())
}))
Expand Down
4 changes: 4 additions & 0 deletions kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ use crate::{

pub(crate) struct SyncJsonHandler;

/// Note: This function must match the signature expected by `read_files` helper function,
/// which is also used by `try_create_from_parquet`. The `_file_location` parameter is unused
/// here but required to satisfy the shared function signature.
fn try_create_from_json(
file: File,
_schema: SchemaRef,
arrow_schema: ArrowSchemaRef,
_predicate: Option<PredicateRef>,
_file_location: String,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ArrowEngineData>>> {
let json = ReaderBuilder::new(arrow_schema)
.build(BufReader::new(file))?
Expand Down
5 changes: 3 additions & 2 deletions kernel/src/engine/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn read_files<F, I>(
) -> DeltaResult<FileDataReadResultIterator>
where
I: Iterator<Item = DeltaResult<ArrowEngineData>> + Send + 'static,
F: FnMut(File, SchemaRef, ArrowSchemaRef, Option<PredicateRef>) -> DeltaResult<I>
F: FnMut(File, SchemaRef, ArrowSchemaRef, Option<PredicateRef>, String) -> DeltaResult<I>
+ Send
+ 'static,
{
Expand All @@ -79,7 +79,7 @@ where
.into_iter()
// Produces Iterator<DeltaResult<Iterator<DeltaResult<ArrowEngineData>>>>
.map(move |file| {
let location = file.location;
let location = file.location.clone();
debug!("Reading {location:#?} with schema {schema:#?} and predicate {predicate:#?}");
let path = location
.to_file_path()
Expand All @@ -89,6 +89,7 @@ where
schema.clone(),
arrow_schema.clone(),
predicate.clone(),
location.to_string(),
)
})
// Flatten to Iterator<DeltaResult<DeltaResult<ArrowEngineData>>>
Expand Down
Loading
Loading