Skip to content

Commit 7c94bf5

Browse files
committed
Refactor and fix comments.
1 parent c2d67ba commit 7c94bf5

File tree

1 file changed

+42
-74
lines changed

1 file changed

+42
-74
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 42 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ impl ArrowReader {
183183

184184
let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone());
185185

186-
// Create initial stream builder to check if the schema needs any modifications
186+
// Migrated tables lack field IDs, requiring us to inspect the schema to choose
187+
// between field-ID-based or position-based projection
187188
let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
188189
&task.data_file_path,
189190
file_io.clone(),
@@ -192,24 +193,21 @@ impl ArrowReader {
192193
)
193194
.await?;
194195

195-
// Check if Arrow schema is missing field IDs (for migrated Parquet files)
196+
// Parquet files from Hive/Spark migrations lack field IDs in their metadata
196197
let missing_field_ids = initial_stream_builder
197198
.schema()
198199
.fields()
199200
.iter()
200201
.next()
201202
.is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
202203

203-
// If the schema needs modifications, recreate the stream builder with the modified schema.
204-
// Currently we only modify schemas that are missing field IDs, but other transformations
205-
// may be added in the future.
204+
// Adding position-based fallback IDs at schema level (not per-batch) enables projection
205+
// on files that lack embedded field IDs. We recreate the builder to apply the modified schema.
206206
let mut record_batch_stream_builder = if missing_field_ids {
207-
// Build modified schema with necessary transformations
208207
let arrow_schema =
209208
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema());
210209
let options = ArrowReaderOptions::new().with_schema(arrow_schema);
211210

212-
// Recreate the builder with the modified schema
213211
Self::create_parquet_record_batch_stream_builder(
214212
&task.data_file_path,
215213
file_io.clone(),
@@ -218,19 +216,17 @@ impl ArrowReader {
218216
)
219217
.await?
220218
} else {
221-
// Schema doesn't need modifications, use the initial builder
222219
initial_stream_builder
223220
};
224221

225-
// Create a projection mask for the batch stream to select which columns in the
226-
// Parquet file that we want in the response.
227-
// If we added fallback field IDs, we must use position-based projection (fallback path).
228-
let (projection_mask, _) = Self::get_arrow_projection_mask(
222+
// Fallback IDs don't match Parquet's embedded field IDs (since they don't exist),
223+
// so we must use position-based projection instead of field-ID matching
224+
let projection_mask = Self::get_arrow_projection_mask(
229225
&task.project_field_ids,
230226
&task.schema,
231227
record_batch_stream_builder.parquet_schema(),
232228
record_batch_stream_builder.schema(),
233-
missing_field_ids, // Pass whether schema is missing field IDs, to force fallback path
229+
missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection
234230
)?;
235231

236232
record_batch_stream_builder =
@@ -401,6 +397,7 @@ impl ArrowReader {
401397
.with_preload_offset_index(true)
402398
.with_preload_page_index(should_load_page_index);
403399

400+
// Create the record batch stream builder, which wraps the parquet file reader
404401
let options = arrow_reader_options.unwrap_or_default();
405402
let record_batch_stream_builder =
406403
ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
@@ -529,8 +526,7 @@ impl ArrowReader {
529526

530527
let iceberg_field_ids = collector.field_ids();
531528

532-
// Build field ID map from Parquet metadata.
533-
// For files without field IDs (migrated tables), use position-based fallback.
529+
// Without embedded field IDs, we fall back to position-based mapping for compatibility
534530
let field_id_map = match build_field_id_map(parquet_schema)? {
535531
Some(map) => map,
536532
None => build_fallback_field_id_map(parquet_schema),
@@ -539,8 +535,8 @@ impl ArrowReader {
539535
Ok((iceberg_field_ids, field_id_map))
540536
}
541537

542-
/// Insert the leaf field id into the field_ids using for projection.
543-
/// For nested type, it will recursively insert the leaf field id.
538+
/// Recursively extract leaf field IDs because Parquet projection works at the leaf column level.
539+
/// Nested types (struct/list/map) are flattened in Parquet's columnar format.
544540
fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec<i32>) {
545541
match field.field_type.as_ref() {
546542
Type::Primitive(_) => {
@@ -566,8 +562,8 @@ impl ArrowReader {
566562
iceberg_schema_of_task: &Schema,
567563
parquet_schema: &SchemaDescriptor,
568564
arrow_schema: &ArrowSchemaRef,
569-
use_fallback: bool, // Force use of position-based projection
570-
) -> Result<(ProjectionMask, Option<HashMap<usize, i32>>)> {
565+
use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark)
566+
) -> Result<ProjectionMask> {
571567
fn type_promotion_is_valid(
572568
file_type: Option<&PrimitiveType>,
573569
projected_type: Option<&PrimitiveType>,
@@ -593,21 +589,16 @@ impl ArrowReader {
593589
}
594590

595591
if field_ids.is_empty() {
596-
return Ok((ProjectionMask::all(), None));
592+
return Ok(ProjectionMask::all());
597593
}
598594

599-
// Use fallback path if explicitly requested (schema with fallback field IDs)
600595
if use_fallback {
601-
// Fallback path for Parquet files without field IDs (schema with fallback field IDs).
602-
// Use position-based projection of top-level columns.
603-
// This matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback()
596+
// Position-based projection necessary because file lacks embedded field IDs
604597
Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema)
605-
.map(|(mask, mapping)| (mask, Some(mapping)))
606598
} else {
607-
// Standard path: use field IDs from Arrow schema metadata
608-
// This matches iceberg-java's ReadConf when hasIds() returns true
599+
// Field-ID-based projection using embedded field IDs from Parquet metadata
609600

610-
// Expand top-level field IDs to leaf field IDs for matching against Parquet schema
601+
// Parquet's columnar format requires leaf-level (not top-level struct/list/map) projection
611602
let mut leaf_field_ids = vec![];
612603
for field_id in field_ids {
613604
let field = iceberg_schema_of_task.field_by_id(*field_id);
@@ -623,12 +614,11 @@ impl ArrowReader {
623614
arrow_schema,
624615
type_promotion_is_valid,
625616
)
626-
.map(|mask| (mask, None))
627617
}
628618
}
629619

630-
/// Standard projection mask creation using field IDs from Arrow schema metadata.
631-
/// Matches iceberg-java's ParquetSchemaUtil.pruneColumns()
620+
/// Standard projection using embedded field IDs from Parquet metadata.
621+
/// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns().
632622
fn get_arrow_projection_mask_with_field_ids(
633623
leaf_field_ids: &[i32],
634624
iceberg_schema_of_task: &Schema,
@@ -682,65 +672,48 @@ impl ArrowReader {
682672
true
683673
});
684674

685-
// Only project columns that exist in the Parquet file.
686-
// Missing columns will be added by RecordBatchTransformer with default/NULL values.
687-
// This supports schema evolution where new columns are added to the table schema
688-
// but old Parquet files don't have them yet.
675+
// Schema evolution: New columns may not exist in old Parquet files.
676+
// We only project existing columns; RecordBatchTransformer adds default/NULL values.
689677
let mut indices = vec![];
690678
for field_id in leaf_field_ids {
691679
if let Some(col_idx) = column_map.get(field_id) {
692680
indices.push(*col_idx);
693681
}
694-
// Skip fields that don't exist in the Parquet file - they will be added later
695682
}
696683

697684
if indices.is_empty() {
698-
// If no columns from the projection exist in the file, project all columns
699-
// This can happen if all requested columns are new and need to be added by the transformer
685+
// Edge case: All requested columns are new (don't exist in file).
686+
// Project all columns so RecordBatchTransformer has a batch to transform.
700687
Ok(ProjectionMask::all())
701688
} else {
702689
Ok(ProjectionMask::leaves(parquet_schema, indices))
703690
}
704691
}
705692

706-
/// Fallback projection mask creation for Parquet files without field IDs.
707-
/// Uses position-based matching where top-level field ID N maps to top-level column N-1.
708-
/// Projects entire top-level columns including nested content (structs, lists, maps).
709-
/// Matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback()
710-
///
711-
/// Returns the ProjectionMask and a mapping of output positions to field IDs.
693+
/// Fallback projection for Parquet files without field IDs.
694+
/// Uses position-based matching: field ID N → column position N-1.
695+
/// Projects entire top-level columns (including nested content) for iceberg-java compatibility.
712696
fn get_arrow_projection_mask_fallback(
713697
field_ids: &[i32],
714698
parquet_schema: &SchemaDescriptor,
715-
) -> Result<(ProjectionMask, HashMap<usize, i32>)> {
716-
// Position-based matching for files without field IDs (migrated tables).
717-
// Top-level field ID N maps to top-level column position N-1 (field IDs are 1-indexed).
718-
// Projects ENTIRE top-level columns including nested content, matching Java's behavior.
719-
699+
) -> Result<ProjectionMask> {
700+
// Position-based: field_id N → column N-1 (field IDs are 1-indexed)
720701
let parquet_root_fields = parquet_schema.root_schema().get_fields();
721702
let mut root_indices = vec![];
722-
let mut arrow_pos_to_field_id = HashMap::new();
723-
let mut arrow_output_pos = 0;
724703

725704
for field_id in field_ids.iter() {
726-
// Map top-level field_id to top-level Parquet column position
727705
let parquet_pos = (*field_id - 1) as usize;
728706

729707
if parquet_pos < parquet_root_fields.len() {
730708
root_indices.push(parquet_pos);
731-
arrow_pos_to_field_id.insert(arrow_output_pos, *field_id);
732-
arrow_output_pos += 1;
733709
}
734-
// Skip fields that don't exist in the file - RecordBatchTransformer will add them as NULLs
710+
// RecordBatchTransformer adds missing columns with NULL values
735711
}
736712

737713
if root_indices.is_empty() {
738-
Ok((ProjectionMask::all(), HashMap::new()))
714+
Ok(ProjectionMask::all())
739715
} else {
740-
Ok((
741-
ProjectionMask::roots(parquet_schema, root_indices),
742-
arrow_pos_to_field_id,
743-
))
716+
Ok(ProjectionMask::roots(parquet_schema, root_indices))
744717
}
745718
}
746719

@@ -925,13 +898,11 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<Option<HashMa
925898
}
926899

927900
/// Build a fallback field ID map for Parquet files without embedded field IDs.
928-
/// Assigns sequential field IDs (1, 2, 3, ...) based on column position.
929-
/// This matches iceberg-java's `addFallbackIds()` + `pruneColumnsFallback()` logic.
901+
/// Position-based (1, 2, 3, ...) for compatibility with iceberg-java migrations.
930902
fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32, usize> {
931903
let mut column_map = HashMap::new();
932904

933-
// Assign field IDs starting at 1 based on column position
934-
// This follows iceberg-java's convention where field IDs are 1-indexed
905+
// 1-indexed to match iceberg-java's convention
935906
for (idx, _field) in parquet_schema.columns().iter().enumerate() {
936907
let field_id = (idx + 1) as i32;
937908
column_map.insert(field_id, idx);
@@ -940,14 +911,13 @@ fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32
940911
column_map
941912
}
942913

943-
/// Add fallback field IDs to Arrow schema when the Parquet file lacks field IDs.
944-
/// This creates a schema with position-based field IDs (1, 2, 3, ...) added to field metadata.
914+
/// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them.
915+
/// Enables projection on migrated files (e.g., from Hive/Spark).
945916
///
946-
/// Matches iceberg-java's ParquetSchemaUtil.addFallbackIds() approach:
947-
/// - Assigns sequential IDs starting at 1 to top-level fields only
948-
/// - Field IDs are added once to the schema, not per-batch
917+
/// Why at schema level (not per-batch): Efficiency - avoids repeated schema modification.
918+
/// Why only top-level: Nested projection uses leaf column indices, not parent struct IDs.
919+
/// Why 1-indexed: Compatibility with iceberg-java's ParquetSchemaUtil.addFallbackIds().
949920
fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<ArrowSchema> {
950-
// Debug assertion to catch misuse during development
951921
debug_assert!(
952922
arrow_schema
953923
.fields()
@@ -957,7 +927,6 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<
957927
"Schema already has field IDs"
958928
);
959929

960-
// Create schema with position-based fallback field IDs
961930
use arrow_schema::Field;
962931

963932
let fields_with_fallback_ids: Vec<_> = arrow_schema
@@ -966,8 +935,7 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc<
966935
.enumerate()
967936
.map(|(pos, field)| {
968937
let mut metadata = field.metadata().clone();
969-
// Assign field ID based on position (1-indexed to match Java)
970-
let field_id = (pos + 1) as i32;
938+
let field_id = (pos + 1) as i32; // 1-indexed for Java compatibility
971939
metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
972940

973941
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
@@ -1825,7 +1793,7 @@ message schema {
18251793
);
18261794

18271795
// Finally avoid selecting fields with unsupported data types
1828-
let (mask, _) = ArrowReader::get_arrow_projection_mask(
1796+
let mask = ArrowReader::get_arrow_projection_mask(
18291797
&[1],
18301798
&schema,
18311799
&parquet_schema,

0 commit comments

Comments
 (0)