Skip to content

Commit ddeb54a

Browse files
pepijnvehknlof
authored andcommitted
apache#16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the correct size (apache#16995)
* apache#16994 Ensure CooperativeExec#maintains_input_order returns a Vec of the correct size * apache#16994 Extend default ExecutionPlan invariant checks Add checks that verify the length of the vectors returned by methods that need to return a value per child.
1 parent f1d58c5 commit ddeb54a

File tree

4 files changed

+42
-15
lines changed

4 files changed

+42
-15
lines changed

datafusion/physical-plan/src/coop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ impl ExecutionPlan for CooperativeExec {
252252
}
253253

254254
fn maintains_input_order(&self) -> Vec<bool> {
255-
self.input.maintains_input_order()
255+
vec![true; self.children().len()]
256256
}
257257

258258
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use crate::stream::RecordBatchStreamAdapter;
4747
use arrow::array::{Array, RecordBatch};
4848
use arrow::datatypes::SchemaRef;
4949
use datafusion_common::config::ConfigOptions;
50-
use datafusion_common::{exec_err, Constraints, Result};
50+
use datafusion_common::{exec_err, Constraints, DataFusionError, Result};
5151
use datafusion_common_runtime::JoinSet;
5252
use datafusion_execution::TaskContext;
5353
use datafusion_physical_expr::EquivalenceProperties;
@@ -117,10 +117,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
117117
/// Returns an error if this individual node does not conform to its invariants.
118118
/// These invariants are typically only checked in debug mode.
119119
///
120-
/// A default set of invariants is provided in the default implementation.
120+
/// A default set of invariants is provided in the [check_default_invariants] function.
121+
/// The default implementation of `check_invariants` calls this function.
121122
/// Extension nodes can provide their own invariants.
122-
fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
123-
Ok(())
123+
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
124+
check_default_invariants(self, check)
124125
}
125126

126127
/// Specifies the data distribution requirements for all the
@@ -1035,6 +1036,37 @@ impl PlanProperties {
10351036
}
10361037
}
10371038

1039+
macro_rules! check_len {
1040+
($target:expr, $func_name:ident, $expected_len:expr) => {
1041+
let actual_len = $target.$func_name().len();
1042+
if actual_len != $expected_len {
1043+
return internal_err!(
1044+
"{}::{} returned Vec with incorrect size: {} != {}",
1045+
$target.name(),
1046+
stringify!($func_name),
1047+
actual_len,
1048+
$expected_len
1049+
);
1050+
}
1051+
};
1052+
}
1053+
1054+
/// Checks a set of invariants that apply to all ExecutionPlan implementations.
1055+
/// Returns an error if the given node does not conform.
1056+
pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
1057+
plan: &P,
1058+
_check: InvariantLevel,
1059+
) -> Result<(), DataFusionError> {
1060+
let children_len = plan.children().len();
1061+
1062+
check_len!(plan, maintains_input_order, children_len);
1063+
check_len!(plan, required_input_ordering, children_len);
1064+
check_len!(plan, required_input_distribution, children_len);
1065+
check_len!(plan, benefits_from_input_partitioning, children_len);
1066+
1067+
Ok(())
1068+
}
1069+
10381070
/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
10391071
/// especially for the distributed engine to judge whether need to deal with shuffling.
10401072
/// Currently, there are 3 kinds of execution plan which needs data exchange

datafusion/physical-plan/src/union.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use super::{
3333
SendableRecordBatchStream, Statistics,
3434
};
3535
use crate::execution_plan::{
36-
boundedness_from_children, emission_type_from_children, InvariantLevel,
36+
boundedness_from_children, check_default_invariants, emission_type_from_children,
37+
InvariantLevel,
3738
};
3839
use crate::metrics::BaselineMetrics;
3940
use crate::projection::{make_with_child, ProjectionExec};
@@ -176,7 +177,9 @@ impl ExecutionPlan for UnionExec {
176177
&self.cache
177178
}
178179

179-
fn check_invariants(&self, _check: InvariantLevel) -> Result<()> {
180+
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
181+
check_default_invariants(self, check)?;
182+
180183
(self.inputs().len() >= 2)
181184
.then_some(())
182185
.ok_or(DataFusionError::Internal(

datafusion/physical-plan/src/work_table.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,6 @@ impl ExecutionPlan for WorkTableExec {
174174
&self.cache
175175
}
176176

177-
fn maintains_input_order(&self) -> Vec<bool> {
178-
vec![false]
179-
}
180-
181-
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
182-
vec![false]
183-
}
184-
185177
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
186178
vec![]
187179
}

0 commit comments

Comments
 (0)