diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index bd88ed3b9ca1..d2dbe377b3c1 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -280,3 +280,7 @@ name = "spm" harness = false name = "preserve_file_partitioning" required-features = ["parquet"] + +[[bench]] +harness = false +name = "reset_plan_states" diff --git a/datafusion/core/benches/reset_plan_states.rs b/datafusion/core/benches/reset_plan_states.rs new file mode 100644 index 000000000000..f2f81f755b96 --- /dev/null +++ b/datafusion/core/benches/reset_plan_states.rs @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::{Arc, LazyLock}; + +use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion::prelude::SessionContext; +use datafusion_catalog::MemTable; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::displayable; +use datafusion_physical_plan::execution_plan::reset_plan_states; +use tokio::runtime::Runtime; + +const NUM_FIELDS: usize = 1000; +const PREDICATE_LEN: usize = 50; + +static SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(Schema::new( + (0..NUM_FIELDS) + .map(|i| Arc::new(Field::new(format!("x_{i}"), DataType::Int64, false))) + .collect::(), + )) +}); + +fn col_name(i: usize) -> String { + format!("x_{i}") +} + +fn aggr_name(i: usize) -> String { + format!("aggr_{i}") +} + +fn physical_plan( + ctx: &SessionContext, + rt: &Runtime, + sql: &str, +) -> Arc { + rt.block_on(async { + ctx.sql(sql) + .await + .unwrap() + .create_physical_plan() + .await + .unwrap() + }) +} + +fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String { + let mut predicate = String::new(); + for i in 0..len { + if i > 0 { + predicate.push_str(" AND "); + } + predicate.push_str(&col_name(i)); + predicate.push_str(" = "); + predicate.push_str(&i.to_string()); + } + predicate +} + +/// Returns a typical plan for the query like: +/// +/// ```sql +/// SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t +/// WHERE p1 +/// HAVING p2 +/// ``` +/// +/// Where `p1` and `p2` some long predicates. +/// +fn query1() -> String { + let mut query = String::new(); + query.push_str("SELECT "); + for i in 0..NUM_FIELDS { + if i > 0 { + query.push_str(", "); + } + query.push_str("AVG("); + query.push_str(&col_name(i)); + query.push_str(") AS "); + query.push_str(&aggr_name(i)); + } + query.push_str(" FROM t WHERE "); + query.push_str(&predicate(col_name, PREDICATE_LEN)); + query.push_str(" HAVING "); + query.push_str(&predicate(aggr_name, PREDICATE_LEN)); + query +} + +/// Returns a typical plan for the query like: +/// +/// ```sql +/// SELECT projection FROM t JOIN v ON t.a = v.a +/// WHERE p1 +/// ``` +/// +fn query2() -> String { + let mut query = String::new(); + query.push_str("SELECT "); + for i in (0..NUM_FIELDS).step_by(2) { + if i > 0 { + query.push_str(", "); + } + if (i / 2) % 2 == 0 { + query.push_str(&format!("t.{}", col_name(i))); + } else { + query.push_str(&format!("v.{}", col_name(i))); + } + } + query.push_str(" FROM t JOIN v ON t.x_0 = v.x_0 WHERE "); + + fn qualified_name(i: usize) -> String { + format!("t.{}", col_name(i)) + } + + query.push_str(&predicate(qualified_name, PREDICATE_LEN)); + query +} + +/// Returns a typical plan for the query like: +/// +/// ```sql +/// SELECT projection FROM t +/// WHERE p +/// ``` +/// +fn query3() -> String { + let mut query = String::new(); + query.push_str("SELECT "); + + // Create non-trivial projection. + for i in 0..NUM_FIELDS / 2 { + if i > 0 { + query.push_str(", "); + } + query.push_str(&col_name(i * 2)); + query.push_str(" + "); + query.push_str(&col_name(i * 2 + 1)); + } + + query.push_str(" FROM t WHERE "); + query.push_str(&predicate(col_name, PREDICATE_LEN)); + query +} + +fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc) { + b.iter(|| std::hint::black_box(reset_plan_states(Arc::clone(plan)).unwrap())); +} + +/// Benchmark is intended to measure overhead of actions, required to perform +/// making an independent instance of the execution plan to re-execute it, avoiding +/// re-planning stage. +fn bench_reset_plan_states(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let ctx = SessionContext::new(); + ctx.register_table( + "t", + Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()), + ) + .unwrap(); + + ctx.register_table( + "v", + Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()), + ) + .unwrap(); + + macro_rules! bench_query { + ($query_producer: expr) => {{ + let sql = $query_producer(); + let plan = physical_plan(&ctx, &rt, &sql); + log::debug!("plan:\n{}", displayable(plan.as_ref()).indent(true)); + move |b| run_reset_states(b, &plan) + }}; + } + + c.bench_function("query1", bench_query!(query1)); + c.bench_function("query2", bench_query!(query2)); + c.bench_function("query3", bench_query!(query3)); +} + +criterion_group!(benches, bench_reset_plan_states); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 06da0b8933c1..8d72921d0673 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -26,6 +26,7 @@ use crate::sort_pushdown::SortOrderPushdownResult; pub use crate::stream::EmptyRecordBatchStream; pub use datafusion_common::hash_utils; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; pub use datafusion_common::utils::project_schema; pub use datafusion_common::{ColumnStatistics, Statistics, internal_err}; pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; @@ -1384,6 +1385,30 @@ pub fn check_not_null_constraints( Ok(batch) } +/// Make plan ready to be re-executed returning its clone with state reset for all nodes. +/// +/// Some plans will change their internal states after execution, making them unable to be executed again. +/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan. +/// +/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan. +/// However, if the data of the left table is derived from the work table, it will become outdated +/// as the work table changes. When the next iteration executes this plan again, we must clear the left table. +/// +/// # Limitations +/// +/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR): +/// +/// * uses dynamic filters, +/// * represents a recursive query. +/// +pub fn reset_plan_states(plan: Arc) -> Result> { + plan.transform_up(|plan| { + let new_plan = Arc::clone(&plan).reset_state()?; + Ok(Transformed::yes(new_plan)) + }) + .data() +} + /// Utility function yielding a string representation of the given [`ExecutionPlan`]. pub fn get_plan_string(plan: &Arc) -> Vec { let formatted = displayable(plan.as_ref()).indent(true).to_string(); diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 683dbb4e4976..936a02581e89 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -24,7 +24,7 @@ use std::task::{Context, Poll}; use super::work_table::{ReservedBatches, WorkTable}; use crate::aggregates::group_values::{GroupValues, new_group_values}; use crate::aggregates::order::GroupOrdering; -use crate::execution_plan::{Boundedness, EmissionType}; +use crate::execution_plan::{Boundedness, EmissionType, reset_plan_states}; use crate::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, }; @@ -387,20 +387,6 @@ fn assign_work_table( .data() } -/// Some plans will change their internal states after execution, making them unable to be executed again. -/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan. -/// -/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan. -/// However, if the data of the left table is derived from the work table, it will become outdated -/// as the work table changes. When the next iteration executes this plan again, we must clear the left table. -fn reset_plan_states(plan: Arc) -> Result> { - plan.transform_up(|plan| { - let new_plan = Arc::clone(&plan).reset_state()?; - Ok(Transformed::yes(new_plan)) - }) - .data() -} - impl Stream for RecursiveQueryStream { type Item = Result; diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index c94b5a413139..c6d0940c3548 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -146,7 +146,7 @@ impl ExecutionPlan for TestMemoryExec { self: Arc, _: Vec>, ) -> Result> { - unimplemented!() + Ok(self) } fn repartitioned(