Skip to content

Commit 186b699

Browse files
committed
added tests
Signed-off-by: Ahmed hossam <[email protected]>
1 parent 102c121 commit 186b699

File tree

1 file changed

+40
-0
lines changed

1 file changed

+40
-0
lines changed

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,4 +1435,44 @@ mod tests {
14351435

14361436
Ok(())
14371437
}
1438+
1439+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1440+
async fn test_concurrent_writer_reader_race_condition() -> Result<()> {
1441+
// stress testing the concurncy in the reader and the reader to make sure there is now race condtion
1442+
// going for 100 iterations with a 5 batches per iteration
1443+
const NUM_BATCHES: usize = 5;
1444+
const ITERATIONS: usize = 100;
1445+
1446+
for iteration in 0..ITERATIONS {
1447+
let (writer, mut reader) = create_spill_channel(1024 * 1024);
1448+
1449+
let writer_handle = SpawnedTask::spawn(async move {
1450+
for i in 0..NUM_BATCHES {
1451+
let batch = create_test_batch(i as i32 * 10, 10);
1452+
writer.push_batch(&batch).unwrap();
1453+
tokio::task::yield_now().await;
1454+
}
1455+
});
1456+
1457+
let reader_handle = SpawnedTask::spawn(async move {
1458+
let mut batches_read = 0;
1459+
while let Some(result) = reader.next().await {
1460+
let _batch = result.unwrap();
1461+
batches_read += 1;
1462+
tokio::task::yield_now().await;
1463+
}
1464+
batches_read
1465+
});
1466+
1467+
writer_handle.join().await.unwrap();
1468+
let batches_read = reader_handle.join().await.unwrap();
1469+
1470+
assert_eq!(
1471+
batches_read, NUM_BATCHES,
1472+
"Iteration {iteration}: Expected {NUM_BATCHES} got {batches_read}."
1473+
);
1474+
}
1475+
1476+
Ok(())
1477+
}
14381478
}

0 commit comments

Comments
 (0)