Skip to content

Commit 3fabee7

Browse files
authored
Revert adding PhysicalOptimizerRule::optimize_plan (#19186)
This reverts #18739 by reverting b990987 and 7b4593f. Due to feedback in #18739 we've decided to revert it at least for the time being. Since this change has not been released this is not be a breaking API change.
1 parent f2b476f commit 3fabee7

34 files changed

+180
-548
lines changed

datafusion/core/benches/push_down_filter.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ use arrow::datatypes::{DataType, Field, Schema};
2020
use bytes::{BufMut, BytesMut};
2121
use criterion::{criterion_group, criterion_main, Criterion};
2222
use datafusion::config::ConfigOptions;
23-
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
23+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
2424
use datafusion_execution::object_store::ObjectStoreUrl;
2525
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
26-
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
26+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
2727
use datafusion_physical_plan::ExecutionPlan;
2828
use object_store::memory::InMemory;
2929
use object_store::path::Path;
@@ -106,13 +106,11 @@ fn bench_push_down_filter(c: &mut Criterion) {
106106
config.execution.parquet.pushdown_filters = true;
107107
let plan = BenchmarkPlan { plan, config };
108108
let optimizer = FilterPushdown::new();
109-
let session_config = SessionConfig::from(plan.config.clone());
110-
let optimizer_context = OptimizerContext::new(session_config);
111109

112110
c.bench_function("push_down_filter", |b| {
113111
b.iter(|| {
114112
optimizer
115-
.optimize_plan(Arc::clone(&plan.plan), &optimizer_context)
113+
.optimize(Arc::clone(&plan.plan), &plan.config)
116114
.unwrap();
117115
});
118116
});

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ use datafusion_physical_expr::expressions::Literal;
9292
use datafusion_physical_expr::{
9393
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
9494
};
95-
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
95+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
9696
use datafusion_physical_plan::empty::EmptyExec;
9797
use datafusion_physical_plan::execution_plan::InvariantLevel;
9898
use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
@@ -2291,14 +2291,11 @@ impl DefaultPhysicalPlanner {
22912291
// to verify that the plan fulfills the base requirements.
22922292
InvariantChecker(InvariantLevel::Always).check(&plan)?;
22932293

2294-
// Create optimizer context from session state
2295-
let optimizer_context = OptimizerContext::new(session_state.config().clone());
2296-
22972294
let mut new_plan = Arc::clone(&plan);
22982295
for optimizer in optimizers {
22992296
let before_schema = new_plan.schema();
23002297
new_plan = optimizer
2301-
.optimize_plan(new_plan, &optimizer_context)
2298+
.optimize(new_plan, session_state.config_options())
23022299
.map_err(|e| {
23032300
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
23042301
})?;

datafusion/core/tests/execution/coop.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_physical_expr::Partitioning;
4040
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4141
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
4242
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
43-
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
43+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4444
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
4545
use datafusion_physical_plan::coop::make_cooperative;
4646
use datafusion_physical_plan::filter::FilterExec;
@@ -810,8 +810,8 @@ async fn query_yields(
810810
task_ctx: Arc<TaskContext>,
811811
) -> Result<(), Box<dyn Error>> {
812812
// Run plan through EnsureCooperative
813-
let optimizer_context = OptimizerContext::new(task_ctx.session_config().clone());
814-
let optimized = EnsureCooperative::new().optimize_plan(plan, &optimizer_context)?;
813+
let optimized =
814+
EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?;
815815

816816
// Get the stream
817817
let stream = physical_plan::execute_stream(optimized, task_ctx)?;

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion_expr::{col, lit, Expr};
3939
use datafusion::datasource::physical_plan::FileScanConfig;
4040
use datafusion_common::config::ConfigOptions;
4141
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
42-
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
42+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4343
use datafusion_physical_plan::filter::FilterExec;
4444
use datafusion_physical_plan::ExecutionPlan;
4545
use tempfile::tempdir;
@@ -83,10 +83,8 @@ async fn check_stats_precision_with_filter_pushdown() {
8383
Arc::new(FilterExec::try_new(physical_filter, exec_with_filter).unwrap())
8484
as Arc<dyn ExecutionPlan>;
8585

86-
let session_config = SessionConfig::from(options.clone());
87-
let optimizer_context = OptimizerContext::new(session_config);
8886
let optimized_exec = FilterPushdown::new()
89-
.optimize_plan(filtered_exec, &optimizer_context)
87+
.optimize(filtered_exec, &options)
9088
.unwrap();
9189

9290
assert!(

datafusion/core/tests/physical_optimizer/aggregate_statistics.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,12 @@ use arrow::record_batch::RecordBatch;
2525
use datafusion::datasource::memory::MemorySourceConfig;
2626
use datafusion::datasource::source::DataSourceExec;
2727
use datafusion_common::cast::as_int64_array;
28+
use datafusion_common::config::ConfigOptions;
2829
use datafusion_common::Result;
29-
use datafusion_execution::config::SessionConfig;
3030
use datafusion_execution::TaskContext;
3131
use datafusion_expr::Operator;
3232
use datafusion_physical_expr::expressions::{self, cast};
3333
use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
34-
use datafusion_physical_optimizer::OptimizerContext;
3534
use datafusion_physical_optimizer::PhysicalOptimizerRule;
3635
use datafusion_physical_plan::aggregates::AggregateExec;
3736
use datafusion_physical_plan::aggregates::AggregateMode;
@@ -68,10 +67,8 @@ async fn assert_count_optim_success(
6867
let task_ctx = Arc::new(TaskContext::default());
6968
let plan: Arc<dyn ExecutionPlan> = Arc::new(plan);
7069

71-
let session_config = SessionConfig::new();
72-
let optimizer_context = OptimizerContext::new(session_config.clone());
73-
let optimized = AggregateStatistics::new()
74-
.optimize_plan(Arc::clone(&plan), &optimizer_context)?;
70+
let config = ConfigOptions::new();
71+
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?;
7572

7673
// A ProjectionExec is a sign that the count optimization was applied
7774
assert!(optimized.as_any().is::<ProjectionExec>());
@@ -267,10 +264,8 @@ async fn test_count_inexact_stat() -> Result<()> {
267264
Arc::clone(&schema),
268265
)?;
269266

270-
let session_config = SessionConfig::new();
271-
let optimizer_context = OptimizerContext::new(session_config.clone());
272-
let optimized = AggregateStatistics::new()
273-
.optimize_plan(Arc::new(final_agg), &optimizer_context)?;
267+
let conf = ConfigOptions::new();
268+
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
274269

275270
// check that the original ExecutionPlan was not replaced
276271
assert!(optimized.as_any().is::<AggregateExec>());
@@ -313,10 +308,8 @@ async fn test_count_with_nulls_inexact_stat() -> Result<()> {
313308
Arc::clone(&schema),
314309
)?;
315310

316-
let session_config = SessionConfig::new();
317-
let optimizer_context = OptimizerContext::new(session_config.clone());
318-
let optimized = AggregateStatistics::new()
319-
.optimize_plan(Arc::new(final_agg), &optimizer_context)?;
311+
let conf = ConfigOptions::new();
312+
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
320313

321314
// check that the original ExecutionPlan was not replaced
322315
assert!(optimized.as_any().is::<AggregateExec>());

datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ use std::sync::Arc;
2626
use crate::physical_optimizer::test_utils::parquet_exec;
2727

2828
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
29-
use datafusion_execution::config::SessionConfig;
29+
use datafusion_common::config::ConfigOptions;
3030
use datafusion_functions_aggregate::count::count_udaf;
3131
use datafusion_functions_aggregate::sum::sum_udaf;
3232
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
3333
use datafusion_physical_expr::expressions::{col, lit};
3434
use datafusion_physical_expr::Partitioning;
3535
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3636
use datafusion_physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate;
37-
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
37+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
3838
use datafusion_physical_plan::aggregates::{
3939
AggregateExec, AggregateMode, PhysicalGroupBy,
4040
};
@@ -47,9 +47,8 @@ macro_rules! assert_optimized {
4747
($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => {
4848
// run optimizer
4949
let optimizer = CombinePartialFinalAggregate {};
50-
let session_config = SessionConfig::new();
51-
let optimizer_context = OptimizerContext::new(session_config.clone());
52-
let optimized = optimizer.optimize_plan($PLAN, &optimizer_context)?;
50+
let config = ConfigOptions::new();
51+
let optimized = optimizer.optimize($PLAN, &config)?;
5352
// Now format correctly
5453
let plan = displayable(optimized.as_ref()).indent(true).to_string();
5554
let actual_lines = plan.trim();

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use datafusion_physical_expr_common::sort_expr::{
5252
use datafusion_physical_optimizer::enforce_distribution::*;
5353
use datafusion_physical_optimizer::enforce_sorting::EnforceSorting;
5454
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
55-
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
55+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
5656
use datafusion_physical_plan::aggregates::{
5757
AggregateExec, AggregateMode, PhysicalGroupBy,
5858
};
@@ -489,9 +489,7 @@ impl TestConfig {
489489
) -> Result<Arc<dyn ExecutionPlan>> {
490490
// Add the ancillary output requirements operator at the start:
491491
let optimizer = OutputRequirements::new_add_mode();
492-
let session_config = SessionConfig::from(self.config.clone());
493-
let optimizer_context = OptimizerContext::new(session_config.clone());
494-
let mut optimized = optimizer.optimize_plan(plan.clone(), &optimizer_context)?;
492+
let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
495493

496494
// This file has 2 rules that use tree node, apply these rules to original plan consecutively
497495
// After these operations tree nodes should be in a consistent state.
@@ -527,25 +525,21 @@ impl TestConfig {
527525
}
528526

529527
for run in optimizers_to_run {
530-
let session_config = SessionConfig::from(self.config.clone());
531-
let optimizer_context = OptimizerContext::new(session_config.clone());
532528
optimized = match run {
533529
Run::Distribution => {
534530
let optimizer = EnforceDistribution::new();
535-
optimizer.optimize_plan(optimized, &optimizer_context)?
531+
optimizer.optimize(optimized, &self.config)?
536532
}
537533
Run::Sorting => {
538534
let optimizer = EnforceSorting::new();
539-
optimizer.optimize_plan(optimized, &optimizer_context)?
535+
optimizer.optimize(optimized, &self.config)?
540536
}
541537
};
542538
}
543539

544540
// Remove the ancillary output requirements operator when done:
545541
let optimizer = OutputRequirements::new_remove_mode();
546-
let session_config = SessionConfig::from(self.config.clone());
547-
let optimizer_context = OptimizerContext::new(session_config.clone());
548-
let optimized = optimizer.optimize_plan(optimized, &optimizer_context)?;
542+
let optimized = optimizer.optimize(optimized, &self.config)?;
549543

550544
Ok(optimized)
551545
}
@@ -3378,10 +3372,7 @@ SortRequiredExec: [a@0 ASC]
33783372
config.execution.target_partitions = 10;
33793373
config.optimizer.enable_round_robin_repartition = true;
33803374
config.optimizer.prefer_existing_sort = false;
3381-
let session_config = SessionConfig::from(config);
3382-
let optimizer_context = OptimizerContext::new(session_config.clone());
3383-
let dist_plan =
3384-
EnforceDistribution::new().optimize_plan(physical_plan, &optimizer_context)?;
3375+
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
33853376
// Since at the start of the rule ordering requirement is not satisfied
33863377
// EnforceDistribution rule doesn't satisfy this requirement either.
33873378
assert_plan!(dist_plan, @r"
@@ -3417,10 +3408,7 @@ SortRequiredExec: [a@0 ASC]
34173408
config.execution.target_partitions = 10;
34183409
config.optimizer.enable_round_robin_repartition = true;
34193410
config.optimizer.prefer_existing_sort = false;
3420-
let session_config = SessionConfig::from(config);
3421-
let optimizer_context = OptimizerContext::new(session_config.clone());
3422-
let dist_plan =
3423-
EnforceDistribution::new().optimize_plan(physical_plan, &optimizer_context)?;
3411+
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
34243412
// Since at the start of the rule ordering requirement is satisfied
34253413
// EnforceDistribution rule satisfy this requirement also.
34263414
assert_plan!(dist_plan, @r"

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preservin
5656
use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts};
5757
use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
5858
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
59-
use datafusion_physical_optimizer::{OptimizerContext, PhysicalOptimizerRule};
59+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
6060
use datafusion::prelude::*;
6161
use arrow::array::{Int32Array, RecordBatch};
6262
use arrow::datatypes::{Field};
@@ -175,10 +175,8 @@ impl EnforceSortingTest {
175175
let input_plan_string = displayable(self.plan.as_ref()).indent(true).to_string();
176176

177177
// Run the actual optimizer
178-
let session_config = SessionConfig::from(config);
179-
let optimizer_context = OptimizerContext::new(session_config.clone());
180178
let optimized_physical_plan = EnforceSorting::new()
181-
.optimize_plan(Arc::clone(&self.plan), &optimizer_context)
179+
.optimize(Arc::clone(&self.plan), &config)
182180
.expect("enforce_sorting failed");
183181

184182
// Get string representation of the plan
@@ -2365,26 +2363,23 @@ async fn test_commutativity() -> Result<()> {
23652363
"#);
23662364

23672365
let config = ConfigOptions::new();
2368-
let session_config = SessionConfig::from(config);
2369-
let optimizer_context = OptimizerContext::new(session_config.clone());
23702366
let rules = vec![
23712367
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
23722368
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
23732369
];
23742370
let mut first_plan = orig_plan.clone();
23752371
for rule in rules {
2376-
first_plan = rule.optimize_plan(first_plan, &optimizer_context)?;
2372+
first_plan = rule.optimize(first_plan, &config)?;
23772373
}
23782374

2379-
let optimizer_context2 = OptimizerContext::new(session_config.clone());
23802375
let rules = vec![
23812376
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
23822377
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
23832378
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,
23842379
];
23852380
let mut second_plan = orig_plan.clone();
23862381
for rule in rules {
2387-
second_plan = rule.optimize_plan(second_plan, &optimizer_context2)?;
2382+
second_plan = rule.optimize(second_plan, &config)?;
23882383
}
23892384

23902385
assert_eq!(get_plan_string(&first_plan), get_plan_string(&second_plan));

0 commit comments

Comments
 (0)