Skip to content

Commit ed4f097

Browse files
fix: wrap join operators with cooperative() for cancellation support
1 parent ee81b1c commit ed4f097

File tree

5 files changed

+19
-14
lines changed

5 files changed

+19
-14
lines changed

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use super::utils::{
2525
BatchTransformer, BuildProbeJoinMetrics, NoopBatchTransformer, OnceAsync, OnceFut,
2626
StatefulStreamResult,
2727
};
28+
use crate::coop::cooperative;
2829
use crate::execution_plan::{boundedness_from_children, EmissionType};
2930
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3031
use crate::projection::{
@@ -324,7 +325,7 @@ impl ExecutionPlan for CrossJoinExec {
324325
})?;
325326

326327
if enforce_batch_size_in_joins {
327-
Ok(Box::pin(CrossJoinStream {
328+
Ok(Box::pin(cooperative(CrossJoinStream {
328329
schema: Arc::clone(&self.schema),
329330
left_fut,
330331
right: stream,
@@ -333,9 +334,9 @@ impl ExecutionPlan for CrossJoinExec {
333334
state: CrossJoinStreamState::WaitBuildSide,
334335
left_data: RecordBatch::new_empty(self.left().schema()),
335336
batch_transformer: BatchSplitter::new(batch_size),
336-
}))
337+
})))
337338
} else {
338-
Ok(Box::pin(CrossJoinStream {
339+
Ok(Box::pin(cooperative(CrossJoinStream {
339340
schema: Arc::clone(&self.schema),
340341
left_fut,
341342
right: stream,
@@ -344,7 +345,7 @@ impl ExecutionPlan for CrossJoinExec {
344345
state: CrossJoinStreamState::WaitBuildSide,
345346
left_data: RecordBatch::new_empty(self.left().schema()),
346347
batch_transformer: NoopBatchTransformer::new(),
347-
}))
348+
})))
348349
}
349350
}
350351

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use super::{
3333
PartitionMode, SharedBitmapBuilder,
3434
};
3535
use super::{JoinOn, JoinOnRef};
36+
use crate::coop::cooperative;
3637
use crate::execution_plan::{boundedness_from_children, EmissionType};
3738
use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64};
3839
use crate::projection::{
@@ -880,7 +881,7 @@ impl ExecutionPlan for HashJoinExec {
880881
None => self.column_indices.clone(),
881882
};
882883

883-
Ok(Box::pin(HashJoinStream {
884+
Ok(Box::pin(cooperative(HashJoinStream {
884885
schema: self.schema(),
885886
on_right,
886887
filter: self.filter.clone(),
@@ -895,7 +896,7 @@ impl ExecutionPlan for HashJoinExec {
895896
batch_size,
896897
hashes_buffer: vec![],
897898
right_side_ordered: self.right.output_ordering().is_some(),
898-
}))
899+
})))
899900
}
900901

901902
fn metrics(&self) -> Option<MetricsSet> {

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use super::utils::{
3030
StatefulStreamResult,
3131
};
3232
use crate::common::can_project;
33+
use crate::coop::cooperative;
3334
use crate::execution_plan::{boundedness_from_children, EmissionType};
3435
use crate::joins::utils::{
3536
adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices,
@@ -530,7 +531,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
530531
None => self.column_indices.clone(),
531532
};
532533

533-
Ok(Box::pin(NestedLoopJoinStream {
534+
Ok(Box::pin(cooperative(NestedLoopJoinStream {
534535
schema: self.schema(),
535536
filter: self.filter.clone(),
536537
join_type: self.join_type,
@@ -544,7 +545,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
544545
left_data: None,
545546
join_result_status: None,
546547
intermediate_batch_size: batch_size,
547-
}))
548+
})))
548549
}
549550

550551
fn metrics(&self) -> Option<MetricsSet> {

datafusion/physical-plan/src/joins/sort_merge_join.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use std::sync::atomic::Ordering::Relaxed;
3333
use std::sync::Arc;
3434
use std::task::{Context, Poll};
3535

36+
use crate::coop::cooperative;
3637
use crate::execution_plan::{boundedness_from_children, EmissionType};
3738
use crate::expressions::PhysicalSortExpr;
3839
use crate::joins::utils::{
@@ -501,7 +502,7 @@ impl ExecutionPlan for SortMergeJoinExec {
501502
.register(context.memory_pool());
502503

503504
// create join stream
504-
Ok(Box::pin(SortMergeJoinStream::try_new(
505+
Ok(Box::pin(cooperative(SortMergeJoinStream::try_new(
505506
context.session_config().spill_compression(),
506507
Arc::clone(&self.schema),
507508
self.sort_options.clone(),
@@ -516,7 +517,7 @@ impl ExecutionPlan for SortMergeJoinExec {
516517
SortMergeJoinMetrics::new(partition, &self.metrics),
517518
reservation,
518519
context.runtime_env(),
519-
)?))
520+
)?)))
520521
}
521522

522523
fn metrics(&self) -> Option<MetricsSet> {

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use std::task::{Context, Poll};
3333
use std::vec;
3434

3535
use crate::common::SharedMemoryReservation;
36+
use crate::coop::cooperative;
3637
use crate::execution_plan::{boundedness_from_children, emission_type_from_children};
3738
use crate::joins::hash_join::{equal_rows_arr, update_hash};
3839
use crate::joins::stream_join_utils::{
@@ -533,7 +534,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
533534
}
534535

535536
if enforce_batch_size_in_joins {
536-
Ok(Box::pin(SymmetricHashJoinStream {
537+
Ok(Box::pin(cooperative(SymmetricHashJoinStream {
537538
left_stream,
538539
right_stream,
539540
schema: self.schema(),
@@ -551,9 +552,9 @@ impl ExecutionPlan for SymmetricHashJoinExec {
551552
state: SHJStreamState::PullRight,
552553
reservation,
553554
batch_transformer: BatchSplitter::new(batch_size),
554-
}))
555+
})))
555556
} else {
556-
Ok(Box::pin(SymmetricHashJoinStream {
557+
Ok(Box::pin(cooperative(SymmetricHashJoinStream {
557558
left_stream,
558559
right_stream,
559560
schema: self.schema(),
@@ -571,7 +572,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
571572
state: SHJStreamState::PullRight,
572573
reservation,
573574
batch_transformer: NoopBatchTransformer::new(),
574-
}))
575+
})))
575576
}
576577
}
577578

0 commit comments

Comments
 (0)