Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 206 additions & 19 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,26 +148,53 @@ impl ArrowReader {
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;

let stream = tasks
.map_ok(move |task| {
let file_io = file_io.clone();

Self::process_file_scan_task(
task,
batch_size,
file_io,
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
)
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
})
.try_buffer_unordered(concurrency_limit_data_files)
.try_flatten_unordered(concurrency_limit_data_files);
// Fast-path for single concurrency to avoid overhead of try_flatten_unordered
let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
Box::pin(
tasks
.and_then(move |task| {
let file_io = file_io.clone();

Self::process_file_scan_task(
task,
batch_size,
file_io,
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
)
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed")
.with_source(err)
})
.try_flatten(),
)
} else {
Box::pin(
tasks
.map_ok(move |task| {
let file_io = file_io.clone();

Self::process_file_scan_task(
task,
batch_size,
file_io,
self.delete_file_loader.clone(),
row_group_filtering_enabled,
row_selection_enabled,
)
})
.map_err(|err| {
Error::new(ErrorKind::Unexpected, "file scan task generate failed")
.with_source(err)
})
.try_buffer_unordered(concurrency_limit_data_files)
.try_flatten_unordered(concurrency_limit_data_files),
)
};

Ok(Box::pin(stream) as ArrowRecordBatchStream)
Ok(stream)
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -3894,6 +3921,166 @@ message schema {
assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
}

/// Test that concurrency=1 reads all files correctly and in deterministic order.
/// This verifies the fast-path optimization for single concurrency.
#[tokio::test]
async fn test_read_with_concurrency_one() {
use arrow_array::Int32Array;

let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
.into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
]));

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();

// Create 3 parquet files with different data
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();

for file_num in 0..3 {
let id_data = Arc::new(Int32Array::from_iter_values(
file_num * 10..(file_num + 1) * 10,
)) as ArrayRef;
let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;

let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();

let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
let mut writer =
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();
}

// Read with concurrency=1 (fast-path)
let reader = ArrowReaderBuilder::new(file_io)
.with_data_file_concurrency_limit(1)
.build();

// Create tasks in a specific order: file_0, file_1, file_2
let tasks = vec![
Ok(FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/file_0.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
}),
Ok(FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/file_1.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
}),
Ok(FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{table_location}/file_2.parquet"),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
}),
];

let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;

let result = reader
.read(tasks_stream)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

// Verify we got all 30 rows (10 from each file)
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 30, "Should have 30 total rows");

// Collect all ids and file_nums to verify data
let mut all_ids = Vec::new();
let mut all_file_nums = Vec::new();

for batch in &result {
let id_col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
let file_num_col = batch
.column(1)
.as_primitive::<arrow_array::types::Int32Type>();

for i in 0..batch.num_rows() {
all_ids.push(id_col.value(i));
all_file_nums.push(file_num_col.value(i));
}
}

assert_eq!(all_ids.len(), 30);
assert_eq!(all_file_nums.len(), 30);

// With concurrency=1 and sequential processing, files should be processed in order
// file_0: ids 0-9, file_num=0
// file_1: ids 10-19, file_num=1
// file_2: ids 20-29, file_num=2
for i in 0..10 {
assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
}
for i in 10..20 {
assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
}
for i in 20..30 {
assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
}
}

/// Test bucket partitioning reads source column from data file (not partition metadata).
///
/// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
Expand Down
Loading