Skip to content

Commit 1384a4f

Browse files
authored
feat(core): Add support for _file column (#1824)
## Which issue does this PR close? - Closes #1766. ## What changes are included in this PR? Integrates virtual field handling for the `_file` metadata column into `RecordBatchTransformer` using a pre-computed constants map, eliminating post-processing and duplicate lookups. ## Key Changes **New `metadata_columns.rs` module**: Centralized utilities for metadata columns - Constants: `RESERVED_FIELD_ID_FILE`, `RESERVED_COL_NAME_FILE` - Helper functions: `get_metadata_column_name()`, `get_metadata_field_id()`, `is_metadata_field()`, `is_metadata_column_name()` **Enhanced `RecordBatchTransformer`**: - Added `constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>` - pre-computed during initialization - New `with_constant()` method - computes Arrow type once during setup - Updated to use pre-computed types and values (avoids duplicate lookups) - Handles `DataType::RunEndEncoded` for constant strings (memory efficient) **Simplified `reader.rs`**: - Pass full `project_field_ids` (including virtual) to RecordBatchTransformer - Single `with_constant()` call to register `_file` column - Removed post-processing loop **Updated `scan/mod.rs`**: - Use `is_metadata_column_name()` and `get_metadata_field_id()` instead of hardcoded checks ## Are these changes tested? Yes, comprehensive tests have been added to verify the functionality: ### New Tests (7 tests added) #### Table Scan API Tests (7 tests) 1. **`test_select_with_file_column`** - Verifies basic functionality of selecting `_file` with regular columns 2. **`test_select_file_column_position`** - Verifies column ordering is preserved 3. **`test_select_file_column_only`** - Tests selecting only the `_file` column 4. **`test_file_column_with_multiple_files`** - Tests multiple data files scenario 5. **`test_file_column_at_start`** - Tests `_file` at position 0 6. **`test_file_column_at_end`** - Tests `_file` at the last position 7. **`test_select_with_repeated_column_names`** - Tests repeated column selection
1 parent 26b9839 commit 1384a4f

File tree

7 files changed

+1031
-213
lines changed

7 files changed

+1031
-213
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
5454
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
5555
use crate::expr::{BoundPredicate, BoundReference};
5656
use crate::io::{FileIO, FileMetadata, FileRead};
57+
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
5758
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
5859
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
5960
use crate::utils::available_parallelism;
@@ -250,12 +251,20 @@ impl ArrowReader {
250251
initial_stream_builder
251252
};
252253

254+
// Filter out metadata fields for Parquet projection (they don't exist in files)
255+
let project_field_ids_without_metadata: Vec<i32> = task
256+
.project_field_ids
257+
.iter()
258+
.filter(|&&id| !is_metadata_field(id))
259+
.copied()
260+
.collect();
261+
253262
// Create projection mask based on field IDs
254263
// - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
255264
// - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
256265
// - If fallback IDs: position-based projection (missing_field_ids=true)
257266
let projection_mask = Self::get_arrow_projection_mask(
258-
&task.project_field_ids,
267+
&project_field_ids_without_metadata,
259268
&task.schema,
260269
record_batch_stream_builder.parquet_schema(),
261270
record_batch_stream_builder.schema(),
@@ -266,16 +275,23 @@ impl ArrowReader {
266275
record_batch_stream_builder.with_projection(projection_mask.clone());
267276

268277
// RecordBatchTransformer performs any transformations required on the RecordBatches
269-
// that come back from the file, such as type promotion, default column insertion
270-
// and column re-ordering.
278+
// that come back from the file, such as type promotion, default column insertion,
279+
// column re-ordering, partition constants, and virtual field addition (like _file)
271280
let mut record_batch_transformer_builder =
272281
RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids());
273282

283+
// Add the _file metadata column if it's in the projected fields
284+
if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) {
285+
let file_datum = Datum::string(task.data_file_path.clone());
286+
record_batch_transformer_builder =
287+
record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum);
288+
}
289+
274290
if let (Some(partition_spec), Some(partition_data)) =
275291
(task.partition_spec.clone(), task.partition.clone())
276292
{
277293
record_batch_transformer_builder =
278-
record_batch_transformer_builder.with_partition(partition_spec, partition_data);
294+
record_batch_transformer_builder.with_partition(partition_spec, partition_data)?;
279295
}
280296

281297
let mut record_batch_transformer = record_batch_transformer_builder.build();
@@ -416,7 +432,10 @@ impl ArrowReader {
416432
record_batch_stream_builder
417433
.build()?
418434
.map(move |batch| match batch {
419-
Ok(batch) => record_batch_transformer.process_record_batch(batch),
435+
Ok(batch) => {
436+
// Process the record batch (type promotion, column reordering, virtual fields, etc.)
437+
record_batch_transformer.process_record_batch(batch)
438+
}
420439
Err(err) => Err(err.into()),
421440
});
422441

0 commit comments

Comments
 (0)