Skip to content

Commit cb9adfa

Browse files
erratic-patternalamb
authored andcommitted
fix: wrap join operators with cooperative() for cancellation support
1 parent 35d366e commit cb9adfa

File tree

5 files changed

+19
-14
lines changed

5 files changed

+19
-14
lines changed

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use super::utils::{
2525
OnceAsync, OnceFut, StatefulStreamResult, adjust_right_output_partitioning,
2626
reorder_output_after_swap,
2727
};
28+
use crate::coop::cooperative;
2829
use crate::execution_plan::{EmissionType, boundedness_from_children};
2930
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3031
use crate::projection::{
@@ -332,7 +333,7 @@ impl ExecutionPlan for CrossJoinExec {
332333
})?;
333334

334335
if enforce_batch_size_in_joins {
335-
Ok(Box::pin(CrossJoinStream {
336+
Ok(Box::pin(cooperative(CrossJoinStream {
336337
schema: Arc::clone(&self.schema),
337338
left_fut,
338339
right: stream,
@@ -341,9 +342,9 @@ impl ExecutionPlan for CrossJoinExec {
341342
state: CrossJoinStreamState::WaitBuildSide,
342343
left_data: RecordBatch::new_empty(self.left().schema()),
343344
batch_transformer: BatchSplitter::new(batch_size),
344-
}))
345+
})))
345346
} else {
346-
Ok(Box::pin(CrossJoinStream {
347+
Ok(Box::pin(cooperative(CrossJoinStream {
347348
schema: Arc::clone(&self.schema),
348349
left_fut,
349350
right: stream,
@@ -352,7 +353,7 @@ impl ExecutionPlan for CrossJoinExec {
352353
state: CrossJoinStreamState::WaitBuildSide,
353354
left_data: RecordBatch::new_empty(self.left().schema()),
354355
batch_transformer: NoopBatchTransformer::new(),
355-
}))
356+
})))
356357
}
357358
}
358359

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::sync::{Arc, OnceLock};
2222
use std::{any::Any, vec};
2323

2424
use crate::ExecutionPlanProperties;
25+
use crate::coop::cooperative;
2526
use crate::execution_plan::{EmissionType, boundedness_from_children};
2627
use crate::filter_pushdown::{
2728
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
@@ -1061,7 +1062,7 @@ impl ExecutionPlan for HashJoinExec {
10611062
.map(|(_, right_expr)| Arc::clone(right_expr))
10621063
.collect::<Vec<_>>();
10631064

1064-
Ok(Box::pin(HashJoinStream::new(
1065+
Ok(Box::pin(cooperative(HashJoinStream::new(
10651066
partition,
10661067
self.schema(),
10671068
on_right,
@@ -1079,7 +1080,7 @@ impl ExecutionPlan for HashJoinExec {
10791080
self.right.output_ordering().is_some(),
10801081
build_accumulator,
10811082
self.mode,
1082-
)))
1083+
))))
10831084
}
10841085

10851086
fn metrics(&self) -> Option<MetricsSet> {

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use super::utils::{
2929
reorder_output_after_swap, swap_join_projection,
3030
};
3131
use crate::common::can_project;
32+
use crate::coop::cooperative;
3233
use crate::execution_plan::{EmissionType, boundedness_from_children};
3334
use crate::joins::SharedBitmapBuilder;
3435
use crate::joins::utils::{
@@ -529,7 +530,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
529530
None => self.column_indices.clone(),
530531
};
531532

532-
Ok(Box::pin(NestedLoopJoinStream::new(
533+
Ok(Box::pin(cooperative(NestedLoopJoinStream::new(
533534
self.schema(),
534535
self.filter.clone(),
535536
self.join_type,
@@ -538,7 +539,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
538539
column_indices_after_projection,
539540
metrics,
540541
batch_size,
541-
)))
542+
))))
542543
}
543544

544545
fn metrics(&self) -> Option<MetricsSet> {

datafusion/physical-plan/src/joins/sort_merge_join/exec.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::any::Any;
2323
use std::fmt::Formatter;
2424
use std::sync::Arc;
2525

26+
use crate::coop::cooperative;
2627
use crate::execution_plan::{EmissionType, boundedness_from_children};
2728
use crate::expressions::PhysicalSortExpr;
2829
use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics;
@@ -497,7 +498,7 @@ impl ExecutionPlan for SortMergeJoinExec {
497498
.register(context.memory_pool());
498499

499500
// create join stream
500-
Ok(Box::pin(SortMergeJoinStream::try_new(
501+
Ok(Box::pin(cooperative(SortMergeJoinStream::try_new(
501502
context.session_config().spill_compression(),
502503
Arc::clone(&self.schema),
503504
self.sort_options.clone(),
@@ -512,7 +513,7 @@ impl ExecutionPlan for SortMergeJoinExec {
512513
SortMergeJoinMetrics::new(partition, &self.metrics),
513514
reservation,
514515
context.runtime_env(),
515-
)?))
516+
)?)))
516517
}
517518

518519
fn metrics(&self) -> Option<MetricsSet> {

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use std::task::{Context, Poll};
3333
use std::vec;
3434

3535
use crate::common::SharedMemoryReservation;
36+
use crate::coop::cooperative;
3637
use crate::execution_plan::{boundedness_from_children, emission_type_from_children};
3738
use crate::joins::stream_join_utils::{
3839
PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics,
@@ -534,7 +535,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
534535
}
535536

536537
if enforce_batch_size_in_joins {
537-
Ok(Box::pin(SymmetricHashJoinStream {
538+
Ok(Box::pin(cooperative(SymmetricHashJoinStream {
538539
left_stream,
539540
right_stream,
540541
schema: self.schema(),
@@ -552,9 +553,9 @@ impl ExecutionPlan for SymmetricHashJoinExec {
552553
state: SHJStreamState::PullRight,
553554
reservation,
554555
batch_transformer: BatchSplitter::new(batch_size),
555-
}))
556+
})))
556557
} else {
557-
Ok(Box::pin(SymmetricHashJoinStream {
558+
Ok(Box::pin(cooperative(SymmetricHashJoinStream {
558559
left_stream,
559560
right_stream,
560561
schema: self.schema(),
@@ -572,7 +573,7 @@ impl ExecutionPlan for SymmetricHashJoinExec {
572573
state: SHJStreamState::PullRight,
573574
reservation,
574575
batch_transformer: NoopBatchTransformer::new(),
575-
}))
576+
})))
576577
}
577578
}
578579

0 commit comments

Comments
 (0)