Skip to content

Commit ee21563

Browse files
perf(reader): Fast path ArrowReader::read when concurrency is 1 to avoid waker churn and add determinism to FileScanTask processing (#2020)
## Which issue does this PR close? - N/A. ## What changes are included in this PR? - Due to the way Comet maps DataFusion `SessionContext`, the tokio runtime, and Spark Tasks, we see frequent waker churn when concurrency is set to 1 in the `ArrowReader`. This adds a fast path that does not use `try_flatten_unordered` and its internal `replace_waker` calls. - This also prevents tasks from being reordered at runtime. Several Iceberg Java tests expect specific query results without an `ORDER BY`, so this enables those tests to keep working when concurrency is set to 1. See apache/datafusion-comet#3051 and <img width="3804" height="754" alt="flamegraph" src="https://github.com/user-attachments/assets/26b93e85-5835-4bf4-b7f1-b136face940d" /> ## Are these changes tested? New test for determinism, also running the entire Iceberg Java Spark suite via Comet in apache/datafusion-comet#3051. --------- Co-authored-by: Renjie Liu <[email protected]>
1 parent fbcf58e commit ee21563

File tree

1 file changed

+206
-19
lines changed

1 file changed

+206
-19
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 206 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,26 +148,53 @@ impl ArrowReader {
148148
let row_group_filtering_enabled = self.row_group_filtering_enabled;
149149
let row_selection_enabled = self.row_selection_enabled;
150150

151-
let stream = tasks
152-
.map_ok(move |task| {
153-
let file_io = file_io.clone();
154-
155-
Self::process_file_scan_task(
156-
task,
157-
batch_size,
158-
file_io,
159-
self.delete_file_loader.clone(),
160-
row_group_filtering_enabled,
161-
row_selection_enabled,
162-
)
163-
})
164-
.map_err(|err| {
165-
Error::new(ErrorKind::Unexpected, "file scan task generate failed").with_source(err)
166-
})
167-
.try_buffer_unordered(concurrency_limit_data_files)
168-
.try_flatten_unordered(concurrency_limit_data_files);
151+
// Fast-path for single concurrency to avoid overhead of try_flatten_unordered
152+
let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 {
153+
Box::pin(
154+
tasks
155+
.and_then(move |task| {
156+
let file_io = file_io.clone();
157+
158+
Self::process_file_scan_task(
159+
task,
160+
batch_size,
161+
file_io,
162+
self.delete_file_loader.clone(),
163+
row_group_filtering_enabled,
164+
row_selection_enabled,
165+
)
166+
})
167+
.map_err(|err| {
168+
Error::new(ErrorKind::Unexpected, "file scan task generate failed")
169+
.with_source(err)
170+
})
171+
.try_flatten(),
172+
)
173+
} else {
174+
Box::pin(
175+
tasks
176+
.map_ok(move |task| {
177+
let file_io = file_io.clone();
178+
179+
Self::process_file_scan_task(
180+
task,
181+
batch_size,
182+
file_io,
183+
self.delete_file_loader.clone(),
184+
row_group_filtering_enabled,
185+
row_selection_enabled,
186+
)
187+
})
188+
.map_err(|err| {
189+
Error::new(ErrorKind::Unexpected, "file scan task generate failed")
190+
.with_source(err)
191+
})
192+
.try_buffer_unordered(concurrency_limit_data_files)
193+
.try_flatten_unordered(concurrency_limit_data_files),
194+
)
195+
};
169196

170-
Ok(Box::pin(stream) as ArrowRecordBatchStream)
197+
Ok(stream)
171198
}
172199

173200
#[allow(clippy::too_many_arguments)]
@@ -3894,6 +3921,166 @@ message schema {
38943921
assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
38953922
}
38963923

3924+
/// Test that concurrency=1 reads all files correctly and in deterministic order.
3925+
/// This verifies the fast-path optimization for single concurrency.
3926+
#[tokio::test]
3927+
async fn test_read_with_concurrency_one() {
3928+
use arrow_array::Int32Array;
3929+
3930+
let schema = Arc::new(
3931+
Schema::builder()
3932+
.with_schema_id(1)
3933+
.with_fields(vec![
3934+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
3935+
NestedField::required(2, "file_num", Type::Primitive(PrimitiveType::Int))
3936+
.into(),
3937+
])
3938+
.build()
3939+
.unwrap(),
3940+
);
3941+
3942+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
3943+
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
3944+
PARQUET_FIELD_ID_META_KEY.to_string(),
3945+
"1".to_string(),
3946+
)])),
3947+
Field::new("file_num", DataType::Int32, false).with_metadata(HashMap::from([(
3948+
PARQUET_FIELD_ID_META_KEY.to_string(),
3949+
"2".to_string(),
3950+
)])),
3951+
]));
3952+
3953+
let tmp_dir = TempDir::new().unwrap();
3954+
let table_location = tmp_dir.path().to_str().unwrap().to_string();
3955+
let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
3956+
3957+
// Create 3 parquet files with different data
3958+
let props = WriterProperties::builder()
3959+
.set_compression(Compression::SNAPPY)
3960+
.build();
3961+
3962+
for file_num in 0..3 {
3963+
let id_data = Arc::new(Int32Array::from_iter_values(
3964+
file_num * 10..(file_num + 1) * 10,
3965+
)) as ArrayRef;
3966+
let file_num_data = Arc::new(Int32Array::from(vec![file_num; 10])) as ArrayRef;
3967+
3968+
let to_write =
3969+
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, file_num_data]).unwrap();
3970+
3971+
let file = File::create(format!("{table_location}/file_{file_num}.parquet")).unwrap();
3972+
let mut writer =
3973+
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
3974+
writer.write(&to_write).expect("Writing batch");
3975+
writer.close().unwrap();
3976+
}
3977+
3978+
// Read with concurrency=1 (fast-path)
3979+
let reader = ArrowReaderBuilder::new(file_io)
3980+
.with_data_file_concurrency_limit(1)
3981+
.build();
3982+
3983+
// Create tasks in a specific order: file_0, file_1, file_2
3984+
let tasks = vec![
3985+
Ok(FileScanTask {
3986+
start: 0,
3987+
length: 0,
3988+
record_count: None,
3989+
data_file_path: format!("{table_location}/file_0.parquet"),
3990+
data_file_format: DataFileFormat::Parquet,
3991+
schema: schema.clone(),
3992+
project_field_ids: vec![1, 2],
3993+
predicate: None,
3994+
deletes: vec![],
3995+
partition: None,
3996+
partition_spec: None,
3997+
name_mapping: None,
3998+
case_sensitive: false,
3999+
}),
4000+
Ok(FileScanTask {
4001+
start: 0,
4002+
length: 0,
4003+
record_count: None,
4004+
data_file_path: format!("{table_location}/file_1.parquet"),
4005+
data_file_format: DataFileFormat::Parquet,
4006+
schema: schema.clone(),
4007+
project_field_ids: vec![1, 2],
4008+
predicate: None,
4009+
deletes: vec![],
4010+
partition: None,
4011+
partition_spec: None,
4012+
name_mapping: None,
4013+
case_sensitive: false,
4014+
}),
4015+
Ok(FileScanTask {
4016+
start: 0,
4017+
length: 0,
4018+
record_count: None,
4019+
data_file_path: format!("{table_location}/file_2.parquet"),
4020+
data_file_format: DataFileFormat::Parquet,
4021+
schema: schema.clone(),
4022+
project_field_ids: vec![1, 2],
4023+
predicate: None,
4024+
deletes: vec![],
4025+
partition: None,
4026+
partition_spec: None,
4027+
name_mapping: None,
4028+
case_sensitive: false,
4029+
}),
4030+
];
4031+
4032+
let tasks_stream = Box::pin(futures::stream::iter(tasks)) as FileScanTaskStream;
4033+
4034+
let result = reader
4035+
.read(tasks_stream)
4036+
.unwrap()
4037+
.try_collect::<Vec<RecordBatch>>()
4038+
.await
4039+
.unwrap();
4040+
4041+
// Verify we got all 30 rows (10 from each file)
4042+
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
4043+
assert_eq!(total_rows, 30, "Should have 30 total rows");
4044+
4045+
// Collect all ids and file_nums to verify data
4046+
let mut all_ids = Vec::new();
4047+
let mut all_file_nums = Vec::new();
4048+
4049+
for batch in &result {
4050+
let id_col = batch
4051+
.column(0)
4052+
.as_primitive::<arrow_array::types::Int32Type>();
4053+
let file_num_col = batch
4054+
.column(1)
4055+
.as_primitive::<arrow_array::types::Int32Type>();
4056+
4057+
for i in 0..batch.num_rows() {
4058+
all_ids.push(id_col.value(i));
4059+
all_file_nums.push(file_num_col.value(i));
4060+
}
4061+
}
4062+
4063+
assert_eq!(all_ids.len(), 30);
4064+
assert_eq!(all_file_nums.len(), 30);
4065+
4066+
// With concurrency=1 and sequential processing, files should be processed in order
4067+
// file_0: ids 0-9, file_num=0
4068+
// file_1: ids 10-19, file_num=1
4069+
// file_2: ids 20-29, file_num=2
4070+
for i in 0..10 {
4071+
assert_eq!(all_file_nums[i], 0, "First 10 rows should be from file_0");
4072+
assert_eq!(all_ids[i], i as i32, "IDs should be 0-9");
4073+
}
4074+
for i in 10..20 {
4075+
assert_eq!(all_file_nums[i], 1, "Next 10 rows should be from file_1");
4076+
assert_eq!(all_ids[i], i as i32, "IDs should be 10-19");
4077+
}
4078+
for i in 20..30 {
4079+
assert_eq!(all_file_nums[i], 2, "Last 10 rows should be from file_2");
4080+
assert_eq!(all_ids[i], i as i32, "IDs should be 20-29");
4081+
}
4082+
}
4083+
38974084
/// Test bucket partitioning reads source column from data file (not partition metadata).
38984085
///
38994086
/// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.

0 commit comments

Comments
 (0)