Skip to content

Commit 78e6bce

Browse files
xanderbaileyalamb
authored andcommitted
Fix repartition from dropping data when spilling (apache#20672)
## Which issue does this PR close? - Closes apache#20683 ## Rationale for this change In non-preserve-order repartitioning mode, all input partition tasks share clones of the same `SpillPoolWriter` for each output partition. `SpillPoolWriter` used `#[derive(Clone)]` but its `Drop` implementation unconditionally set `writer_dropped = true` and finalized the current spill file. This meant that when the **first** input task finished and its clone was dropped, the `SpillPoolReader` would see `writer_dropped = true` on an empty queue and return EOF — silently discarding every batch subsequently written by the still-running input tasks. This bug requires three conditions to trigger: 1. Non-preserve-order repartitioning (so spill writers are cloned across input tasks) 2. Memory pressure causing batches to spill to disk 3. Input tasks finishing at different times (the common case with varying partition sizes) ## What changes are included in this PR? Adds a `WriterCount` to track the number of writers currently live and doesn't finalize until all writers are dropped. ## Are these changes tested? Yes. A new unit test (`test_clone_drop_does_not_signal_eof_prematurely`) directly reproduces the bug. It was verified to **fail without the fix** and **pass with the fix**. ## Are there any user-facing changes? No.
1 parent 9797095 commit 78e6bce

File tree

1 file changed

+101
-1
lines changed

1 file changed

+101
-1
lines changed

datafusion/physical-plan/src/spill/spill_pool.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ struct SpillPoolShared {
6161
/// Writer's reference to the current file (shared by all cloned writers).
6262
/// Has its own lock to allow I/O without blocking queue access.
6363
current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
64+
/// Number of active writer clones. Only when this reaches zero should
65+
/// `writer_dropped` be set to true. This prevents premature EOF signaling
66+
/// when one writer clone is dropped while others are still active.
67+
active_writer_count: usize,
6468
}
6569

6670
impl SpillPoolShared {
@@ -72,6 +76,7 @@ impl SpillPoolShared {
7276
waker: None,
7377
writer_dropped: false,
7478
current_write_file: None,
79+
active_writer_count: 1,
7580
}
7681
}
7782

@@ -97,7 +102,6 @@ impl SpillPoolShared {
97102
/// The writer automatically manages file rotation based on the `max_file_size_bytes`
98103
/// configured in [`channel`]. When the last writer clone is dropped, it finalizes the
99104
/// current file so readers can access all written data.
100-
#[derive(Clone)]
101105
pub struct SpillPoolWriter {
102106
/// Maximum size in bytes before rotating to a new file.
103107
/// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`.
@@ -106,6 +110,18 @@ pub struct SpillPoolWriter {
106110
shared: Arc<Mutex<SpillPoolShared>>,
107111
}
108112

113+
impl Clone for SpillPoolWriter {
114+
fn clone(&self) -> Self {
115+
// Increment the active writer count so that `writer_dropped` is only
116+
// set to true when the *last* clone is dropped.
117+
self.shared.lock().active_writer_count += 1;
118+
Self {
119+
max_file_size_bytes: self.max_file_size_bytes,
120+
shared: Arc::clone(&self.shared),
121+
}
122+
}
123+
}
124+
109125
impl SpillPoolWriter {
110126
/// Spills a batch to the pool, rotating files when necessary.
111127
///
@@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter {
233249
fn drop(&mut self) {
234250
let mut shared = self.shared.lock();
235251

252+
shared.active_writer_count -= 1;
253+
let is_last_writer = shared.active_writer_count == 0;
254+
255+
if !is_last_writer {
256+
// Other writer clones are still active; do not finalize or
257+
// signal EOF to readers.
258+
return;
259+
}
260+
236261
// Finalize the current file when the last writer is dropped
237262
if let Some(current_file) = shared.current_write_file.take() {
238263
// Release shared lock before locking file
@@ -1343,6 +1368,81 @@ mod tests {
13431368
Ok(())
13441369
}
13451370

1371+
/// Verifies that the reader stays alive as long as any writer clone exists.
1372+
///
1373+
/// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
1374+
/// mode multiple input partition tasks share clones of the same writer.
1375+
/// The reader must not see EOF until **all** clones have been dropped,
1376+
/// even if the queue is temporarily empty between writes from different
1377+
/// clones.
1378+
///
1379+
/// The test sequence is:
1380+
///
1381+
/// 1. writer1 writes a batch, then is dropped.
1382+
/// 2. The reader consumes that batch (queue is now empty).
1383+
/// 3. writer2 (still alive) writes a batch.
1384+
/// 4. The reader must see that batch.
1385+
/// 5. EOF is only signalled after writer2 is also dropped.
1386+
#[tokio::test]
1387+
async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
1388+
let (writer1, mut reader) = create_spill_channel(1024 * 1024);
1389+
let writer2 = writer1.clone();
1390+
1391+
// Synchronization: tell writer2 when it may proceed.
1392+
let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();
1393+
1394+
// Spawn writer2 — it waits for the signal before writing.
1395+
let writer2_handle = SpawnedTask::spawn(async move {
1396+
proceed_rx.await.unwrap();
1397+
writer2.push_batch(&create_test_batch(10, 10)).unwrap();
1398+
// writer2 is dropped here (last clone → true EOF)
1399+
});
1400+
1401+
// Writer1 writes one batch, then drops.
1402+
writer1.push_batch(&create_test_batch(0, 10))?;
1403+
drop(writer1);
1404+
1405+
// Read writer1's batch.
1406+
let batch1 = reader.next().await.unwrap()?;
1407+
assert_eq!(batch1.num_rows(), 10);
1408+
let col = batch1
1409+
.column(0)
1410+
.as_any()
1411+
.downcast_ref::<Int32Array>()
1412+
.unwrap();
1413+
assert_eq!(col.value(0), 0);
1414+
1415+
// Signal writer2 to write its batch. It will execute when the
1416+
// current task yields (i.e. when reader.next() returns Pending).
1417+
proceed_tx.send(()).unwrap();
1418+
1419+
// The reader should wait (Pending) for writer2's data, not EOF.
1420+
let batch2 =
1421+
tokio::time::timeout(std::time::Duration::from_secs(5), reader.next())
1422+
.await
1423+
.expect("Reader timed out — should not hang");
1424+
1425+
assert!(
1426+
batch2.is_some(),
1427+
"Reader must not return EOF while a writer clone is still alive"
1428+
);
1429+
let batch2 = batch2.unwrap()?;
1430+
assert_eq!(batch2.num_rows(), 10);
1431+
let col = batch2
1432+
.column(0)
1433+
.as_any()
1434+
.downcast_ref::<Int32Array>()
1435+
.unwrap();
1436+
assert_eq!(col.value(0), 10);
1437+
1438+
writer2_handle.await.unwrap();
1439+
1440+
// All writers dropped — reader should see real EOF now.
1441+
assert!(reader.next().await.is_none());
1442+
1443+
Ok(())
1444+
}
1445+
13461446
#[tokio::test]
13471447
async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> {
13481448
use datafusion_execution::runtime_env::RuntimeEnvBuilder;

0 commit comments

Comments
 (0)