Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 139 additions & 10 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{

use crate::arrow::array::{
cast::AsArray, make_array, new_null_array, Array as ArrowArray, BooleanArray, GenericListArray,
MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray,
MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RunArray, StringArray, StructArray,
};
use crate::arrow::buffer::NullBuffer;
use crate::arrow::compute::concat_batches;
Expand Down Expand Up @@ -168,15 +168,17 @@ impl RowIndexBuilder {
/// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have
/// accurate null masks that row visitors rely on for correctness.
/// `row_indexes` are passed through to `reorder_struct_array`.
/// `file_location` is used to populate file metadata columns if requested.
pub(crate) fn fixup_parquet_read<T>(
batch: RecordBatch,
requested_ordering: &[ReorderIndex],
row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
file_location: Option<&str>,
) -> DeltaResult<T>
where
StructArray: Into<T>,
{
let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes)?;
let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes, file_location)?;
let data = fix_nested_null_masks(data);
Ok(data.into())
}
Expand Down Expand Up @@ -306,6 +308,8 @@ pub(crate) enum ReorderIndexTransform {
Missing(ArrowFieldRef),
/// Row index column requested, compute it
RowIndex(ArrowFieldRef),
/// File path column requested, populate with file path
FilePath(ArrowFieldRef),
}

impl ReorderIndex {
Expand Down Expand Up @@ -333,14 +337,19 @@ impl ReorderIndex {
ReorderIndex::new(index, ReorderIndexTransform::RowIndex(field))
}

fn file_path(index: usize, field: ArrowFieldRef) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::FilePath(field))
}

/// Check if this reordering requires a transformation anywhere. See comment below on
/// [`ordering_needs_transform`] to understand why this is needed.
fn needs_transform(&self) -> bool {
match self.transform {
// if we're casting, inserting null, or generating row index, we need to transform
// if we're casting, inserting null, or generating row index/file path, we need to transform
ReorderIndexTransform::Cast(_)
| ReorderIndexTransform::Missing(_)
| ReorderIndexTransform::RowIndex(_) => true,
| ReorderIndexTransform::RowIndex(_)
| ReorderIndexTransform::FilePath(_) => true,
// if our nested ordering needs a transform, we need a transform
ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children),
// no transform needed
Expand Down Expand Up @@ -595,6 +604,13 @@ fn get_indices(
Arc::new(field.try_into_arrow()?),
));
}
Some(MetadataColumnSpec::FilePath) => {
debug!("Inserting a file path column: {}", field.name());
reorder_indices.push(ReorderIndex::file_path(
requested_position,
Arc::new(field.try_into_arrow()?),
));
}
Some(metadata_spec) => {
return Err(Error::Generic(format!(
"Metadata column {metadata_spec:?} is not supported by the default parquet reader"
Expand Down Expand Up @@ -765,10 +781,13 @@ type FieldArrayOpt = Option<(Arc<ArrowField>, Arc<dyn ArrowArray>)>;
///
/// If the requested ordering contains a [`ReorderIndexTransform::RowIndex`], `row_indexes`
/// must not be `None` to append a row index column to the output.
/// If the requested ordering contains a [`ReorderIndexTransform::FilePath`], `file_location`
/// must not be `None` to append a file path column to the output.
pub(crate) fn reorder_struct_array(
input_data: StructArray,
requested_ordering: &[ReorderIndex],
mut row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
file_location: Option<&str>,
) -> DeltaResult<StructArray> {
debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}");
if !ordering_needs_transform(requested_ordering) {
Expand Down Expand Up @@ -806,6 +825,7 @@ pub(crate) fn reorder_struct_array(
struct_array,
children,
None, // Nested structures don't need row indexes since metadata columns can't be nested
None, // No file_location passed since metadata columns can't be nested
)?);
// create the new field specifying the correct order for the struct
let new_field = Arc::new(ArrowField::new_struct(
Expand Down Expand Up @@ -866,6 +886,27 @@ pub(crate) fn reorder_struct_array(
final_fields_cols[reorder_index.index] =
Some((Arc::clone(field), Arc::new(row_index_array)));
}
ReorderIndexTransform::FilePath(field) => {
let Some(file_path) = file_location else {
return Err(Error::generic(
"File path column requested but file location not provided",
));
};
// Use run-end encoding for efficiency since the file path is constant for all rows
// Run-end encoding stores: [run_ends: [num_rows], values: [file_path]]
let run_ends = PrimitiveArray::<Int64Type>::from_iter_values([num_rows as i64]);
let values = StringArray::from_iter_values([file_path]);
let file_path_array = RunArray::try_new(&run_ends, &values)?;

// Create a field with the RunEndEncoded data type to match the array
let ree_field = Arc::new(ArrowField::new(
field.name(),
file_path_array.data_type().clone(),
field.is_nullable(),
));
final_fields_cols[reorder_index.index] =
Some((ree_field, Arc::new(file_path_array)));
}
}
}
let num_cols = final_fields_cols.len();
Expand Down Expand Up @@ -895,6 +936,7 @@ fn reorder_list<O: OffsetSizeTrait>(
struct_array,
children,
None, // Nested structures don't need row indexes since metadata columns can't be nested
None, // No file_location passed since metadata columns can't be nested
)?);
let new_list_field = Arc::new(ArrowField::new_struct(
list_field.name(),
Expand Down Expand Up @@ -930,6 +972,7 @@ fn reorder_map(
struct_array,
children,
None, // Nested structures don't need row indexes since metadata columns can't be nested
None, // No file_location passed since metadata columns can't be nested
)?;
let result_fields = result_array.fields();
let new_map_field = Arc::new(ArrowField::new_struct(
Expand Down Expand Up @@ -1793,7 +1836,7 @@ mod tests {
),
];

let result = reorder_struct_array(arry, &reorder, None);
let result = reorder_struct_array(arry, &reorder, None, None);
assert_result_error_with_message(
result,
"Row index column requested but row index iterator not provided",
Expand All @@ -1816,7 +1859,7 @@ mod tests {
#[allow(clippy::single_range_in_vec_init)]
let mut row_indexes = vec![(0..4)].into_iter().flatten();

let ordered = reorder_struct_array(arry, &reorder, Some(&mut row_indexes)).unwrap();
let ordered = reorder_struct_array(arry, &reorder, Some(&mut row_indexes), None).unwrap();
assert_eq!(ordered.column_names(), vec!["b", "row_idx"]);

// Verify the row index column contains the expected values
Expand Down Expand Up @@ -1853,6 +1896,92 @@ mod tests {
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn simple_file_path_field() {
let requested_schema = Arc::new(StructType::new_unchecked([
StructField::not_null("i", DataType::INTEGER),
StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
StructField::nullable("i2", DataType::INTEGER),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let mut arrow_file_path_field = ArrowField::new("_file", ArrowDataType::Utf8, false);
arrow_file_path_field.set_metadata(HashMap::from([(
"delta.metadataSpec".to_string(),
"_file".to_string(),
)]));
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::file_path(1, Arc::new(arrow_file_path_field)),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn test_reorder_struct_array_with_file_path() {
// Test that file paths work when properly provided
let arry = make_struct_array();
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::file_path(
1,
Arc::new(ArrowField::new("_file", ArrowDataType::Utf8, false)),
),
];

let file_location = "s3://bucket/path/to/file.parquet";
let ordered = reorder_struct_array(arry, &reorder, None, Some(file_location)).unwrap();
assert_eq!(ordered.column_names(), vec!["b", "_file"]);

// Verify the file path column is run-end encoded and contains the expected value
let file_path_col = ordered.column(1);

// Check it's a RunArray<Int64Type, StringArray>
let run_array = file_path_col
.as_any()
.downcast_ref::<RunArray<Int64Type>>()
.expect("Expected RunArray");

// Verify it has 4 logical rows (same as input)
assert_eq!(run_array.len(), 4);

// Verify the physical representation is efficient: 1 run with value at end position 4
let run_ends = run_array.run_ends().values();
assert_eq!(run_ends.len(), 1, "Should have only 1 run");
assert_eq!(run_ends[0], 4, "Run should end at position 4");

// Verify the value
let values = run_array.values().as_string::<i32>();
assert_eq!(values.len(), 1, "Should have only 1 unique value");
assert_eq!(values.value(0), file_location);
}

#[test]
fn test_reorder_struct_array_missing_file_path() {
// Test that error occurs when file path is requested but not provided
let arry = make_struct_array();
let reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::file_path(
1,
Arc::new(ArrowField::new("_file", ArrowDataType::Utf8, false)),
),
];

let result = reorder_struct_array(arry, &reorder, None, None);
assert_result_error_with_message(
result,
"File path column requested but file location not provided",
);
}

#[test]
fn test_row_index_builder_no_skipping() {
let row_groups = vec![
Expand Down Expand Up @@ -2530,7 +2659,7 @@ mod tests {
fn simple_reorder_struct() {
let arry = make_struct_array();
let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)];
let ordered = reorder_struct_array(arry, &reorder, None).unwrap();
let ordered = reorder_struct_array(arry, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["c", "b"]);
}

Expand Down Expand Up @@ -2578,7 +2707,7 @@ mod tests {
],
),
];
let ordered = reorder_struct_array(nested, &reorder, None).unwrap();
let ordered = reorder_struct_array(nested, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]);
let ordered_s2 = ordered.column(0).as_struct();
assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]);
Expand Down Expand Up @@ -2625,7 +2754,7 @@ mod tests {
0,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
)];
let ordered = reorder_struct_array(struct_array, &reorder, None).unwrap();
let ordered = reorder_struct_array(struct_array, &reorder, None, None).unwrap();
let ordered_list_col = ordered.column(0).as_list::<i32>();
for i in 0..ordered_list_col.len() {
let array_item = ordered_list_col.value(i);
Expand Down Expand Up @@ -2691,7 +2820,7 @@ mod tests {
],
),
];
let ordered = reorder_struct_array(struct_array, &reorder, None).unwrap();
let ordered = reorder_struct_array(struct_array, &reorder, None, None).unwrap();
assert_eq!(ordered.column_names(), vec!["map", "i"]);
if let ArrowDataType::Map(field, _) = ordered.column(0).data_type() {
if let ArrowDataType::Struct(fields) = field.data_type() {
Expand Down
18 changes: 15 additions & 3 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta, _range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture> {
let path = Path::from_url_path(file_meta.location.path())?;
let store = self.store.clone();
let file_location = file_meta.location.to_string();

let batch_size = self.batch_size;
// let projection = self.projection.clone();
Expand Down Expand Up @@ -341,7 +342,12 @@ impl FileOpener for ParquetOpener {
let stream = builder.with_batch_size(batch_size).build()?;

let stream = stream.map(move |rbr| {
fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut())
fixup_parquet_read(
rbr?,
&requested_ordering,
row_indexes.as_mut(),
Some(&file_location),
)
});
Ok(stream.boxed())
}))
Expand Down Expand Up @@ -380,10 +386,11 @@ impl FileOpener for PresignedUrlOpener {
let predicate = self.predicate.clone();
let limit = self.limit;
let client = self.client.clone(); // uses Arc internally according to reqwest docs
let file_location = file_meta.location.to_string();

Ok(Box::pin(async move {
// fetch the file from the interweb
let reader = client.get(file_meta.location).send().await?.bytes().await?;
let reader = client.get(&file_location).send().await?.bytes().await?;
let metadata = ArrowReaderMetadata::load(&reader, Default::default())?;
let parquet_schema = metadata.schema();
let (indices, requested_ordering) =
Expand Down Expand Up @@ -418,7 +425,12 @@ impl FileOpener for PresignedUrlOpener {
let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?;
let stream = futures::stream::iter(reader);
let stream = stream.map(move |rbr| {
fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut())
fixup_parquet_read(
rbr?,
&requested_ordering,
row_indexes.as_mut(),
Some(&file_location),
)
});
Ok(stream.boxed())
}))
Expand Down
4 changes: 4 additions & 0 deletions kernel/src/engine/sync/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ use crate::{

pub(crate) struct SyncJsonHandler;

/// Note: This function must match the signature expected by `read_files` helper function,
/// which is also used by `try_create_from_parquet`. The `_file_location` parameter is unused
/// here but required to satisfy the shared function signature.
fn try_create_from_json(
file: File,
_schema: SchemaRef,
arrow_schema: ArrowSchemaRef,
_predicate: Option<PredicateRef>,
_file_location: String,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ArrowEngineData>>> {
let json = ReaderBuilder::new(arrow_schema)
.build(BufReader::new(file))?
Expand Down
4 changes: 3 additions & 1 deletion kernel/src/engine/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn read_files<F, I>(
) -> DeltaResult<FileDataReadResultIterator>
where
I: Iterator<Item = DeltaResult<ArrowEngineData>> + Send + 'static,
F: FnMut(File, SchemaRef, ArrowSchemaRef, Option<PredicateRef>) -> DeltaResult<I>
F: FnMut(File, SchemaRef, ArrowSchemaRef, Option<PredicateRef>, String) -> DeltaResult<I>
+ Send
+ 'static,
{
Expand All @@ -79,6 +79,7 @@ where
.into_iter()
// Produces Iterator<DeltaResult<Iterator<DeltaResult<ArrowEngineData>>>>
.map(move |file| {
let location_string = file.location.to_string();
let location = file.location;
debug!("Reading {location:#?} with schema {schema:#?} and predicate {predicate:#?}");
let path = location
Expand All @@ -89,6 +90,7 @@ where
schema.clone(),
arrow_schema.clone(),
predicate.clone(),
location_string,
)
})
// Flatten to Iterator<DeltaResult<DeltaResult<ArrowEngineData>>>
Expand Down
10 changes: 9 additions & 1 deletion kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fn try_create_from_parquet(
schema: SchemaRef,
_arrow_schema: ArrowSchemaRef,
predicate: Option<PredicateRef>,
file_location: String,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ArrowEngineData>>> {
let metadata = ArrowReaderMetadata::load(&file, Default::default())?;
let parquet_schema = metadata.schema();
Expand All @@ -40,7 +41,14 @@ fn try_create_from_parquet(

let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?;
let stream = builder.build()?;
Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut())))
Ok(stream.map(move |rbr| {
fixup_parquet_read(
rbr?,
&requested_ordering,
row_indexes.as_mut(),
Some(&file_location),
)
}))
}

impl ParquetHandler for SyncParquetHandler {
Expand Down
Loading
Loading