diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index f7f90663a5..aa45a12973 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -148,26 +148,53 @@ impl ArrowReader { let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; - let stream = tasks - .map_ok(move |task| { - let file_io = file_io.clone(); - - Self::process_file_scan_task( - task, - batch_size, - file_io, - self.delete_file_loader.clone(), - row_group_filtering_enabled, - row_selection_enabled, - ) - }) - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err) - }) - .try_buffer_unordered(concurrency_limit_data_files) - .try_flatten_unordered(concurrency_limit_data_files); + // Fast-path for single concurrency to avoid overhead of try_flatten_unordered + let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 { + Box::pin( + tasks + .and_then(move |task| { + let file_io = file_io.clone(); + + Self::process_file_scan_task( + task, + batch_size, + file_io, + self.delete_file_loader.clone(), + row_group_filtering_enabled, + row_selection_enabled, + ) + }) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "file scan task generate failed") + .with_source(err) + }) + .try_flatten(), + ) + } else { + Box::pin( + tasks + .map_ok(move |task| { + let file_io = file_io.clone(); + + Self::process_file_scan_task( + task, + batch_size, + file_io, + self.delete_file_loader.clone(), + row_group_filtering_enabled, + row_selection_enabled, + ) + }) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "file scan task generate failed") + .with_source(err) + }) + .try_buffer_unordered(concurrency_limit_data_files) + .try_flatten_unordered(concurrency_limit_data_files), + ) + }; - Ok(Box::pin(stream) as ArrowRecordBatchStream) + Ok(stream) } #[allow(clippy::too_many_arguments)] @@ -3894,6 +3921,166 @@ message schema { assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0)); } + /// Test that concurrency=1 reads all files correctly and in deterministic order. + /// This verifies the fast-path optimization for single concurrency. + #[tokio::test] + async fn test_read_with_concurrency_one() { + use arrow_array::Int32Array; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int)) + .into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Create 3 parquet files with different data + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for file_num in 0..3 { + let id_data = Arc::new(Int32Array::from_iter_values( + file_num * 10..(file_num + 1) * 10, + )) as ArrayRef; + let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap(); + + let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap(); + let mut writer = + ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + } + + // Read with concurrency=1 (fast-path) + let reader = ArrowReaderBuilder::new(file_io) + .with_data_file_concurrency_limit(1) + .build(); + + // Create tasks in a specific order: file_0, file_1, file_2 + let tasks = vec![ + Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/file_0.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }), + Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/file_1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }), + Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/file_2.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }), + ]; + + let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream; + + let result = reader + .read(tasks_stream) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Verify we got all 30 rows (10 from each file) + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 30, "Should have 30 total rows"); + + // Collect all ids and file_nums to verify data + let mut all_ids = Vec::new(); + let mut all_file_nums = Vec::new(); + + for batch in &result { + let id_col = batch + .column(0) + .as_primitive::(); + let file_num_col = batch + .column(1) + .as_primitive::(); + + for i in 0..batch.num_rows() { + all_ids.push(id_col.value(i)); + all_file_nums.push(file_num_col.value(i)); + } + } + + assert_eq!(all_ids.len(), 30); + assert_eq!(all_file_nums.len(), 30); + + // With concurrency=1 and sequential processing, files should be processed in order + // file_0: ids 0-9, file_num=0 + // file_1: ids 10-19, file_num=1 + // file_2: ids 20-29, file_num=2 + for i in 0..10 { + assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0"); + assert_eq!(all_ids[i], i as i32, "IDs should be 0-9"); + } + for i in 10..20 { + assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1"); + assert_eq!(all_ids[i], i as i32, "IDs should be 10-19"); + } + for i in 20..30 { + assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2"); + assert_eq!(all_ids[i], i as i32, "IDs should be 20-29"); + } + } + /// Test bucket partitioning reads source column from data file (not partition metadata). /// /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.