diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 54fc4a58a..f2c63cc59 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -63,6 +63,7 @@ impl BasicDeleteFileLoader { data_file_path, self.file_io.clone(), false, + None, ) .await? .build()? diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index e0894ad6b..fed8f19c0 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -183,22 +183,54 @@ impl ArrowReader { let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone()); - let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder( + // Migrated tables lack field IDs, requiring us to inspect the schema to choose + // between field-ID-based or position-based projection + let initial_stream_builder = Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), should_load_page_index, + None, ) .await?; - // Create a projection mask for the batch stream to select which columns in the - // Parquet file that we want in the response + // Parquet files from Hive/Spark migrations lack field IDs in their metadata + let missing_field_ids = initial_stream_builder + .schema() + .fields() + .iter() + .next() + .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()); + + // Adding position-based fallback IDs at schema level (not per-batch) enables projection + // on files that lack embedded field IDs. We recreate the builder to apply the modified schema. + let mut record_batch_stream_builder = if missing_field_ids { + let arrow_schema = + add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema()); + let options = ArrowReaderOptions::new().with_schema(arrow_schema); + + Self::create_parquet_record_batch_stream_builder( + &task.data_file_path, + file_io.clone(), + should_load_page_index, + Some(options), + ) + .await? + } else { + initial_stream_builder + }; + + // Fallback IDs don't match Parquet's embedded field IDs (since they don't exist), + // so we must use position-based projection instead of field-ID matching let projection_mask = Self::get_arrow_projection_mask( &task.project_field_ids, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), + missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection )?; - record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); + + record_batch_stream_builder = + record_batch_stream_builder.with_projection(projection_mask.clone()); // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion @@ -353,6 +385,7 @@ impl ArrowReader { data_file_path: &str, file_io: FileIO, should_load_page_index: bool, + arrow_reader_options: Option, ) -> Result>> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within @@ -365,11 +398,9 @@ impl ArrowReader { .with_preload_page_index(should_load_page_index); // Create the record batch stream builder, which wraps the parquet file reader - let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( - parquet_file_reader, - ArrowReaderOptions::new(), - ) - .await?; + let options = arrow_reader_options.unwrap_or_default(); + let record_batch_stream_builder = + ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?; Ok(record_batch_stream_builder) } @@ -503,13 +534,18 @@ impl ArrowReader { visit(&mut collector, predicate)?; let iceberg_field_ids = collector.field_ids(); - let field_id_map = build_field_id_map(parquet_schema)?; + + // Without embedded field IDs, we fall back to position-based mapping for compatibility + let field_id_map = match build_field_id_map(parquet_schema)? { + Some(map) => map, + None => build_fallback_field_id_map(parquet_schema), + }; Ok((iceberg_field_ids, field_id_map)) } - /// Insert the leaf field id into the field_ids using for projection. - /// For nested type, it will recursively insert the leaf field id. + /// Recursively extract leaf field IDs because Parquet projection works at the leaf column level. + /// Nested types (struct/list/map) are flattened in Parquet's columnar format. fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec) { match field.field_type.as_ref() { Type::Primitive(_) => { @@ -535,6 +571,7 @@ impl ArrowReader { iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, arrow_schema: &ArrowSchemaRef, + use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark) ) -> Result { fn type_promotion_is_valid( file_type: Option<&PrimitiveType>, @@ -560,83 +597,132 @@ impl ArrowReader { } } - let mut leaf_field_ids = vec![]; - for field_id in field_ids { - let field = iceberg_schema_of_task.field_by_id(*field_id); - if let Some(field) = field { - Self::include_leaf_field_id(field, &mut leaf_field_ids); - } + if field_ids.is_empty() { + return Ok(ProjectionMask::all()); } - if leaf_field_ids.is_empty() { - Ok(ProjectionMask::all()) + if use_fallback { + // Position-based projection necessary because file lacks embedded field IDs + Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema) } else { - // Build the map between field id and column index in Parquet schema. - let mut column_map = HashMap::new(); - - let fields = arrow_schema.fields(); - // Pre-project only the fields that have been selected, possibly avoiding converting - // some Arrow types that are not yet supported. - let mut projected_fields: HashMap = HashMap::new(); - let projected_arrow_schema = ArrowSchema::new_with_metadata( - fields.filter_leaves(|_, f| { - f.metadata() - .get(PARQUET_FIELD_ID_META_KEY) - .and_then(|field_id| i32::from_str(field_id).ok()) - .is_some_and(|field_id| { - projected_fields.insert((*f).clone(), field_id); - leaf_field_ids.contains(&field_id) - }) - }), - arrow_schema.metadata().clone(), - ); - let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?; - - fields.filter_leaves(|idx, field| { - let Some(field_id) = projected_fields.get(field).cloned() else { - return false; - }; + // Field-ID-based projection using embedded field IDs from Parquet metadata + + // Parquet's columnar format requires leaf-level (not top-level struct/list/map) projection + let mut leaf_field_ids = vec![]; + for field_id in field_ids { + let field = iceberg_schema_of_task.field_by_id(*field_id); + if let Some(field) = field { + Self::include_leaf_field_id(field, &mut leaf_field_ids); + } + } + + Self::get_arrow_projection_mask_with_field_ids( + &leaf_field_ids, + iceberg_schema_of_task, + parquet_schema, + arrow_schema, + type_promotion_is_valid, + ) + } + } + + /// Standard projection using embedded field IDs from Parquet metadata. + /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns(). + fn get_arrow_projection_mask_with_field_ids( + leaf_field_ids: &[i32], + iceberg_schema_of_task: &Schema, + parquet_schema: &SchemaDescriptor, + arrow_schema: &ArrowSchemaRef, + type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool, + ) -> Result { + let mut column_map = HashMap::new(); + let fields = arrow_schema.fields(); + + // Pre-project only the fields that have been selected, possibly avoiding converting + // some Arrow types that are not yet supported. + let mut projected_fields: HashMap = HashMap::new(); + let projected_arrow_schema = ArrowSchema::new_with_metadata( + fields.filter_leaves(|_, f| { + f.metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|field_id| i32::from_str(field_id).ok()) + .is_some_and(|field_id| { + projected_fields.insert((*f).clone(), field_id); + leaf_field_ids.contains(&field_id) + }) + }), + arrow_schema.metadata().clone(), + ); + let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?; - let iceberg_field = iceberg_schema_of_task.field_by_id(field_id); - let parquet_iceberg_field = iceberg_schema.field_by_id(field_id); + fields.filter_leaves(|idx, field| { + let Some(field_id) = projected_fields.get(field).cloned() else { + return false; + }; - if iceberg_field.is_none() || parquet_iceberg_field.is_none() { - return false; - } + let iceberg_field = iceberg_schema_of_task.field_by_id(field_id); + let parquet_iceberg_field = iceberg_schema.field_by_id(field_id); - if !type_promotion_is_valid( - parquet_iceberg_field - .unwrap() - .field_type - .as_primitive_type(), - iceberg_field.unwrap().field_type.as_primitive_type(), - ) { - return false; - } + if iceberg_field.is_none() || parquet_iceberg_field.is_none() { + return false; + } - column_map.insert(field_id, idx); - true - }); - - // Only project columns that exist in the Parquet file. - // Missing columns will be added by RecordBatchTransformer with default/NULL values. - // This supports schema evolution where new columns are added to the table schema - // but old Parquet files don't have them yet. - let mut indices = vec![]; - for field_id in leaf_field_ids { - if let Some(col_idx) = column_map.get(&field_id) { - indices.push(*col_idx); - } - // Skip fields that don't exist in the Parquet file - they will be added later + if !type_promotion_is_valid( + parquet_iceberg_field + .unwrap() + .field_type + .as_primitive_type(), + iceberg_field.unwrap().field_type.as_primitive_type(), + ) { + return false; + } + + column_map.insert(field_id, idx); + true + }); + + // Schema evolution: New columns may not exist in old Parquet files. + // We only project existing columns; RecordBatchTransformer adds default/NULL values. + let mut indices = vec![]; + for field_id in leaf_field_ids { + if let Some(col_idx) = column_map.get(field_id) { + indices.push(*col_idx); } + } + + if indices.is_empty() { + // Edge case: All requested columns are new (don't exist in file). + // Project all columns so RecordBatchTransformer has a batch to transform. + Ok(ProjectionMask::all()) + } else { + Ok(ProjectionMask::leaves(parquet_schema, indices)) + } + } + + /// Fallback projection for Parquet files without field IDs. + /// Uses position-based matching: field ID N → column position N-1. + /// Projects entire top-level columns (including nested content) for iceberg-java compatibility. + fn get_arrow_projection_mask_fallback( + field_ids: &[i32], + parquet_schema: &SchemaDescriptor, + ) -> Result { + // Position-based: field_id N → column N-1 (field IDs are 1-indexed) + let parquet_root_fields = parquet_schema.root_schema().get_fields(); + let mut root_indices = vec![]; + + for field_id in field_ids.iter() { + let parquet_pos = (*field_id - 1) as usize; - if indices.is_empty() { - // If no columns from the projection exist in the file, project all columns - // This can happen if all requested columns are new and need to be added by the transformer - Ok(ProjectionMask::all()) - } else { - Ok(ProjectionMask::leaves(parquet_schema, indices)) + if parquet_pos < parquet_root_fields.len() { + root_indices.push(parquet_pos); } + // RecordBatchTransformer adds missing columns with NULL values + } + + if root_indices.is_empty() { + Ok(ProjectionMask::all()) + } else { + Ok(ProjectionMask::roots(parquet_schema, root_indices)) } } @@ -713,6 +799,13 @@ impl ArrowReader { )); }; + // If all row groups were filtered out, return an empty RowSelection (select no rows) + if let Some(selected_row_groups) = selected_row_groups { + if selected_row_groups.is_empty() { + return Ok(RowSelection::from(Vec::new())); + } + } + let mut selected_row_groups_idx = 0; let page_index = column_index @@ -785,22 +878,16 @@ impl ArrowReader { } /// Build the map of parquet field id to Parquet column index in the schema. -fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result> { +/// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables). +fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result>> { let mut column_map = HashMap::new(); + for (idx, field) in parquet_schema.columns().iter().enumerate() { let field_type = field.self_type(); match field_type { ParquetType::PrimitiveType { basic_info, .. } => { if !basic_info.has_id() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Leave column idx: {}, name: {}, type {:?} in schema doesn't have field id", - idx, - basic_info.name(), - field_type - ), - )); + return Ok(None); } column_map.insert(basic_info.id(), idx); } @@ -815,7 +902,59 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result HashMap { + let mut column_map = HashMap::new(); + + // 1-indexed to match iceberg-java's convention + for (idx, _field) in parquet_schema.columns().iter().enumerate() { + let field_id = (idx + 1) as i32; + column_map.insert(field_id, idx); + } + + column_map +} + +/// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them. +/// Enables projection on migrated files (e.g., from Hive/Spark). +/// +/// Why at schema level (not per-batch): Efficiency - avoids repeated schema modification. +/// Why only top-level: Nested projection uses leaf column indices, not parent struct IDs. +/// Why 1-indexed: Compatibility with iceberg-java's ParquetSchemaUtil.addFallbackIds(). +fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc { + debug_assert!( + arrow_schema + .fields() + .iter() + .next() + .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()), + "Schema already has field IDs" + ); + + use arrow_schema::Field; + + let fields_with_fallback_ids: Vec<_> = arrow_schema + .fields() + .iter() + .enumerate() + .map(|(pos, field)| { + let mut metadata = field.metadata().clone(); + let field_id = (pos + 1) as i32; // 1-indexed for Java compatibility + metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string()); + + Field::new(field.name(), field.data_type().clone(), field.is_nullable()) + .with_metadata(metadata) + }) + .collect(); + + Arc::new(ArrowSchema::new_with_metadata( + fields_with_fallback_ids, + arrow_schema.metadata().clone(), + )) } /// A visitor to collect field ids from bound predicates. @@ -1637,6 +1776,7 @@ message schema { &schema, &parquet_schema, &arrow_schema, + false, ) .unwrap_err(); @@ -1652,6 +1792,7 @@ message schema { &schema, &parquet_schema, &arrow_schema, + false, ) .unwrap_err(); @@ -1662,9 +1803,14 @@ message schema { ); // Finally avoid selecting fields with unsupported data types - let mask = - ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema) - .expect("Some ProjectionMask"); + let mask = ArrowReader::get_arrow_projection_mask( + &[1], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("Some ProjectionMask"); assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0])); } @@ -2887,4 +3033,679 @@ message schema { "Should have ids 101-200 (all of row group 1)" ); } + + /// Test reading Parquet files without field ID metadata (e.g., migrated tables). + /// This exercises the position-based fallback path. + /// + /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + pruneColumnsFallback() + /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java + #[tokio::test] + async fn test_read_parquet_file_without_field_ids() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + // Parquet file from a migrated table - no field ID metadata + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let name_data = vec!["Alice", "Bob", "Charlie"]; + let age_data = vec![30, 25, 35]; + + use arrow_array::Int32Array; + let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef; + let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef; + + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 2); + + // Verify position-based mapping: field_id 1 → position 0, field_id 2 → position 1 + let name_array = batch.column(0).as_string::(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + assert_eq!(name_array.value(2), "Charlie"); + + let age_array = batch + .column(1) + .as_primitive::(); + assert_eq!(age_array.value(0), 30); + assert_eq!(age_array.value(1), 25); + assert_eq!(age_array.value(2), 35); + } + + /// Test reading Parquet files without field IDs with partial projection. + /// Only a subset of columns are requested, verifying position-based fallback + /// handles column selection correctly. + #[tokio::test] + async fn test_read_parquet_without_field_ids_partial_projection() { + use arrow_array::Int32Array; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("col1", DataType::Utf8, false), + Field::new("col2", DataType::Int32, false), + Field::new("col3", DataType::Utf8, false), + Field::new("col4", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef; + let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef; + let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef; + let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef; + + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![ + col1_data, col2_data, col3_data, col4_data, + ]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 3], + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + + let col1_array = batch.column(0).as_string::(); + assert_eq!(col1_array.value(0), "a"); + assert_eq!(col1_array.value(1), "b"); + + let col3_array = batch.column(1).as_string::(); + assert_eq!(col3_array.value(0), "c"); + assert_eq!(col3_array.value(1), "d"); + } + + /// Test reading Parquet files without field IDs with schema evolution. + /// The Iceberg schema has more fields than the Parquet file, testing that + /// missing columns are filled with NULLs. + #[tokio::test] + async fn test_read_parquet_without_field_ids_schema_evolution() { + use arrow_array::{Array, Int32Array}; + + // Schema with field 3 added after the file was written + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef; + let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2, 3], + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 3); + + let name_array = batch.column(0).as_string::(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + + let age_array = batch + .column(1) + .as_primitive::(); + assert_eq!(age_array.value(0), 30); + assert_eq!(age_array.value(1), 25); + + // Verify missing column filled with NULLs + let city_array = batch.column(2).as_string::(); + assert_eq!(city_array.null_count(), 2); + assert!(city_array.is_null(0)); + assert!(city_array.is_null(1)); + } + + /// Test reading Parquet files without field IDs that have multiple row groups. + /// This ensures the position-based fallback works correctly across row group boundaries. + #[tokio::test] + async fn test_read_parquet_without_field_ids_multiple_row_groups() { + use arrow_array::Int32Array; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Small row group size to create multiple row groups + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_write_batch_size(2) + .set_max_row_group_size(2) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + + // Write 6 rows in 3 batches (will create 3 row groups) + for batch_num in 0..3 { + let name_data = Arc::new(StringArray::from(vec![ + format!("name_{}", batch_num * 2), + format!("name_{}", batch_num * 2 + 1), + ])) as ArrayRef; + let value_data = + Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef; + + let batch = + RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap(); + writer.write(&batch).expect("Writing batch"); + } + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert!(!result.is_empty()); + + let mut all_names = Vec::new(); + let mut all_values = Vec::new(); + + for batch in &result { + let name_array = batch.column(0).as_string::(); + let value_array = batch + .column(1) + .as_primitive::(); + + for i in 0..batch.num_rows() { + all_names.push(name_array.value(i).to_string()); + all_values.push(value_array.value(i)); + } + } + + assert_eq!(all_names.len(), 6); + assert_eq!(all_values.len(), 6); + + for i in 0..6 { + assert_eq!(all_names[i], format!("name_{}", i)); + assert_eq!(all_values[i], i as i32); + } + } + + /// Test reading Parquet files without field IDs with nested types (struct). + /// Java's pruneColumnsFallback() projects entire top-level columns including nested content. + /// This test verifies that a top-level struct field is projected correctly with all its nested fields. + #[tokio::test] + async fn test_read_parquet_without_field_ids_with_struct() { + use arrow_array::{Int32Array, StructArray}; + use arrow_schema::Fields; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required( + 2, + "person", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::required( + 3, + "name", + Type::Primitive(PrimitiveType::String), + ) + .into(), + NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "person", + DataType::Struct(Fields::from(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + ])), + false, + ), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef; + let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef; + let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef; + let person_data = Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("name", DataType::Utf8, false)), + name_data, + ), + ( + Arc::new(Field::new("age", DataType::Int32, false)), + age_data, + ), + ])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + + let id_array = batch + .column(0) + .as_primitive::(); + assert_eq!(id_array.value(0), 1); + assert_eq!(id_array.value(1), 2); + + let person_array = batch.column(1).as_struct(); + assert_eq!(person_array.num_columns(), 2); + + let name_array = person_array.column(0).as_string::(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + + let age_array = person_array + .column(1) + .as_primitive::(); + assert_eq!(age_array.value(0), 30); + assert_eq!(age_array.value(1), 25); + } + + /// Test reading Parquet files without field IDs with schema evolution - column added in the middle. + /// When a new column is inserted between existing columns in the schema order, + /// the fallback projection must correctly map field IDs to output positions. + #[tokio::test] + async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() { + use arrow_array::{Array, Int32Array}; + + let arrow_schema_old = Arc::new(ArrowSchema::new(vec![ + Field::new("col0", DataType::Int32, true), + Field::new("col1", DataType::Int32, true), + ])); + + // New column added between existing columns: col0 (id=1), newCol (id=5), col1 (id=2) + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef; + let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 5, 2], + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 3); + + let result_col0 = batch + .column(0) + .as_primitive::(); + assert_eq!(result_col0.value(0), 1); + assert_eq!(result_col0.value(1), 2); + + // New column should be NULL (doesn't exist in old file) + let result_newcol = batch + .column(1) + .as_primitive::(); + assert_eq!(result_newcol.null_count(), 2); + assert!(result_newcol.is_null(0)); + assert!(result_newcol.is_null(1)); + + let result_col1 = batch + .column(2) + .as_primitive::(); + assert_eq!(result_col1.value(0), 10); + assert_eq!(result_col1.value(1), 20); + } + + /// Test reading Parquet files without field IDs with a filter that eliminates all row groups. + /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and + /// all row groups are filtered out. + #[tokio::test] + async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() { + use arrow_array::{Float64Array, Int32Array}; + + // Schema with fields that will use fallback IDs 1, 2, 3 + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double)) + .into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Write data where all ids are >= 10 + let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef; + let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef; + let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Filter that eliminates all row groups: id < 5 + let predicate = Reference::new("id").less_than(Datum::int(5)); + + // Enable both row_group_filtering and row_selection - triggered the panic + let reader = ArrowReaderBuilder::new(file_io) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true) + .build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2, 3], + predicate: Some(predicate.bind(schema, true).unwrap()), + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + // Should no longer panic + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Should return empty results + assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0)); + } }