Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,7 @@ name = "spm"
harness = false
name = "preserve_file_partitioning"
required-features = ["parquet"]

[[bench]]
harness = false
name = "reset_plan_states"
198 changes: 198 additions & 0 deletions datafusion/core/benches/reset_plan_states.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::{Arc, LazyLock};

use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion::prelude::SessionContext;
use datafusion_catalog::MemTable;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::displayable;
use datafusion_physical_plan::execution_plan::reset_plan_states;
use tokio::runtime::Runtime;

const NUM_FIELDS: usize = 1000;
const PREDICATE_LEN: usize = 50;

static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(Schema::new(
(0..NUM_FIELDS)
.map(|i| Arc::new(Field::new(format!("x_{i}"), DataType::Int64, false)))
.collect::<Fields>(),
))
});

fn col_name(i: usize) -> String {
format!("x_{i}")
}

fn aggr_name(i: usize) -> String {
format!("aggr_{i}")
}

fn physical_plan(
ctx: &SessionContext,
rt: &Runtime,
sql: &str,
) -> Arc<dyn ExecutionPlan> {
rt.block_on(async {
ctx.sql(sql)
.await
.unwrap()
.create_physical_plan()
.await
.unwrap()
})
}

fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String {
let mut predicate = String::new();
for i in 0..len {
if i > 0 {
predicate.push_str(" AND ");
}
predicate.push_str(&col_name(i));
predicate.push_str(" = ");
predicate.push_str(&i.to_string());
}
predicate
}

/// Returns a typical plan for the query like:
///
/// ```sql
/// SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t
/// WHERE p1
/// HAVING p2
/// ```
///
/// Where `p1` and `p2` some long predicates.
///
fn query1() -> String {
let mut query = String::new();
query.push_str("SELECT ");
for i in 0..NUM_FIELDS {
if i > 0 {
query.push_str(", ");
}
query.push_str("AVG(");
query.push_str(&col_name(i));
query.push_str(") AS ");
query.push_str(&aggr_name(i));
}
query.push_str(" FROM t WHERE ");
query.push_str(&predicate(col_name, PREDICATE_LEN));
query.push_str(" HAVING ");
query.push_str(&predicate(aggr_name, PREDICATE_LEN));
query
}

/// Returns a typical plan for the query like:
///
/// ```sql
/// SELECT projection FROM t JOIN v ON t.a = v.a
/// WHERE p1
/// ```
///
fn query2() -> String {
let mut query = String::new();
query.push_str("SELECT ");
for i in (0..NUM_FIELDS).step_by(2) {
if i > 0 {
query.push_str(", ");
}
if (i / 2) % 2 == 0 {
query.push_str(&format!("t.{}", col_name(i)));
} else {
query.push_str(&format!("v.{}", col_name(i)));
}
}
query.push_str(" FROM t JOIN v ON t.x_0 = v.x_0 WHERE ");

fn qualified_name(i: usize) -> String {
format!("t.{}", col_name(i))
}

query.push_str(&predicate(qualified_name, PREDICATE_LEN));
query
}

/// Returns a typical plan for the query like:
///
/// ```sql
/// SELECT projection FROM t
/// WHERE p
/// ```
///
fn query3() -> String {
let mut query = String::new();
query.push_str("SELECT ");

// Create non-trivial projection.
for i in 0..NUM_FIELDS / 2 {
if i > 0 {
query.push_str(", ");
}
query.push_str(&col_name(i * 2));
query.push_str(" + ");
query.push_str(&col_name(i * 2 + 1));
}

query.push_str(" FROM t WHERE ");
query.push_str(&predicate(col_name, PREDICATE_LEN));
query
}

fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
b.iter(|| std::hint::black_box(reset_plan_states(Arc::clone(plan)).unwrap()));
}

/// Benchmark is intended to measure overhead of actions, required to perform
/// making an independent instance of the execution plan to re-execute it, avoiding
/// re-planning stage.
fn bench_reset_plan_states(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
ctx.register_table(
"t",
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
)
.unwrap();

ctx.register_table(
"v",
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
)
.unwrap();

macro_rules! bench_query {
($query_producer: expr) => {{
let sql = $query_producer();
let plan = physical_plan(&ctx, &rt, &sql);
log::debug!("plan:\n{}", displayable(plan.as_ref()).indent(true));
move |b| run_reset_states(b, &plan)
}};
}

c.bench_function("query1", bench_query!(query1));
c.bench_function("query2", bench_query!(query2));
c.bench_function("query3", bench_query!(query3));
}

criterion_group!(benches, bench_reset_plan_states);
criterion_main!(benches);
25 changes: 25 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::sort_pushdown::SortOrderPushdownResult;
pub use crate::stream::EmptyRecordBatchStream;

pub use datafusion_common::hash_utils;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
pub use datafusion_common::utils::project_schema;
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
Expand Down Expand Up @@ -1384,6 +1385,30 @@ pub fn check_not_null_constraints(
Ok(batch)
}

/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
///
/// Some plans will change their internal states after execution, making them unable to be executed again.
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
///
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
/// However, if the data of the left table is derived from the work table, it will become outdated
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
///
/// # Limitations
///
/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR):
///
/// * uses dynamic filters,
/// * represents a recursive query.
///
pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
let new_plan = Arc::clone(&plan).reset_state()?;
Ok(Transformed::yes(new_plan))
})
.data()
}

/// Utility function yielding a string representation of the given [`ExecutionPlan`].
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
Expand Down
16 changes: 1 addition & 15 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::task::{Context, Poll};
use super::work_table::{ReservedBatches, WorkTable};
use crate::aggregates::group_values::{GroupValues, new_group_values};
use crate::aggregates::order::GroupOrdering;
use crate::execution_plan::{Boundedness, EmissionType};
use crate::execution_plan::{Boundedness, EmissionType, reset_plan_states};
use crate::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
};
Expand Down Expand Up @@ -387,20 +387,6 @@ fn assign_work_table(
.data()
}

/// Some plans will change their internal states after execution, making them unable to be executed again.
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
///
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
/// However, if the data of the left table is derived from the work table, it will become outdated
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
let new_plan = Arc::clone(&plan).reset_state()?;
Ok(Transformed::yes(new_plan))
})
.data()
}

impl Stream for RecursiveQueryStream {
type Item = Result<RecordBatch>;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl ExecutionPlan for TestMemoryExec {
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
Ok(self)
}

fn repartitioned(
Expand Down