Skip to content

Commit c0f12f9

Browse files
authored
refactor(query): migrate physical plan from enum to trait-based architecture (#18268)
* refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan * refactor(query): use trait to refactor physical plan
1 parent a28a7d3 commit c0f12f9

File tree

241 files changed

+15745
-11045
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

241 files changed

+15745
-11045
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ databend-storages-common-session = { workspace = true }
119119
databend-storages-common-stage = { workspace = true }
120120
databend-storages-common-table-meta = { workspace = true }
121121
derive-visitor = { workspace = true }
122+
dyn-clone = { workspace = true }
122123
enum-as-inner = { workspace = true }
123124
ethnum = { workspace = true }
124125
fastrace = { workspace = true }

src/query/service/src/interpreters/interpreter_copy_into_location.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ use databend_common_exception::Result;
1919
use databend_common_expression::infer_table_schema;
2020
use databend_common_meta_app::schema::UpdateStreamMetaReq;
2121
use databend_common_pipeline_core::ExecutionInfo;
22-
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
23-
use databend_common_sql::executor::PhysicalPlan;
2422
use databend_storages_common_stage::CopyIntoLocationInfo;
2523
use log::debug;
2624
use log::info;
@@ -29,6 +27,9 @@ use crate::interpreters::common::check_deduplicate_label;
2927
use crate::interpreters::common::dml_build_update_stream_req;
3028
use crate::interpreters::Interpreter;
3129
use crate::interpreters::SelectInterpreter;
30+
use crate::physical_plans::CopyIntoLocation;
31+
use crate::physical_plans::PhysicalPlan;
32+
use crate::physical_plans::PhysicalPlanMeta;
3233
use crate::pipelines::PipelineBuildResult;
3334
use crate::schedulers::build_query_pipeline_without_render_result_set;
3435
use crate::sessions::QueryContext;
@@ -89,14 +90,14 @@ impl CopyIntoLocationInterpreter {
8990
let query_result_schema = query_interpreter.get_result_schema();
9091
let table_schema = infer_table_schema(&query_result_schema)?;
9192

92-
let mut physical_plan = PhysicalPlan::CopyIntoLocation(Box::new(CopyIntoLocation {
93-
plan_id: 0,
94-
input: Box::new(query_physical_plan),
93+
let mut physical_plan = PhysicalPlan::new(CopyIntoLocation {
94+
input: query_physical_plan,
9595
project_columns: query_interpreter.get_result_columns(),
9696
input_data_schema: query_result_schema,
9797
input_table_schema: table_schema,
9898
info: info.clone(),
99-
}));
99+
meta: PhysicalPlanMeta::new("CopyIntoLocation"),
100+
});
100101

101102
let mut next_plan_id = 0;
102103
physical_plan.adjust_plan_id(&mut next_plan_id);

src/query/service/src/interpreters/interpreter_copy_into_table.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,9 @@ use databend_common_expression::SendableDataBlockStream;
2626
use databend_common_meta_app::schema::TableInfo;
2727
use databend_common_meta_app::schema::UpdateStreamMetaReq;
2828
use databend_common_pipeline_core::Pipeline;
29-
use databend_common_sql::executor::physical_plans::CopyIntoTable;
30-
use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
31-
use databend_common_sql::executor::physical_plans::Exchange;
3229
use databend_common_sql::executor::physical_plans::FragmentKind;
3330
use databend_common_sql::executor::physical_plans::MutationKind;
34-
use databend_common_sql::executor::physical_plans::TableScan;
3531
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
36-
use databend_common_sql::executor::PhysicalPlan;
3732
use databend_common_storage::StageFileInfo;
3833
use databend_common_storages_fuse::FuseTable;
3934
use databend_common_storages_stage::StageTable;
@@ -46,6 +41,12 @@ use crate::interpreters::common::dml_build_update_stream_req;
4641
use crate::interpreters::HookOperator;
4742
use crate::interpreters::Interpreter;
4843
use crate::interpreters::SelectInterpreter;
44+
use crate::physical_plans::CopyIntoTable;
45+
use crate::physical_plans::CopyIntoTableSource;
46+
use crate::physical_plans::Exchange;
47+
use crate::physical_plans::PhysicalPlan;
48+
use crate::physical_plans::PhysicalPlanMeta;
49+
use crate::physical_plans::TableScan;
4950
use crate::pipelines::PipelineBuildResult;
5051
use crate::pipelines::PipelineBuilder;
5152
use crate::schedulers::build_query_pipeline_without_render_result_set;
@@ -113,7 +114,7 @@ impl CopyIntoTableInterpreter {
113114

114115
let (query_interpreter, update_stream_meta) = self.build_query(&query).await?;
115116
update_stream_meta_reqs = update_stream_meta;
116-
let query_physical_plan = Box::new(query_interpreter.build_physical_plan().await?);
117+
let query_physical_plan = query_interpreter.build_physical_plan().await?;
117118

118119
let result_columns = query_interpreter.get_result_columns();
119120
(
@@ -133,21 +134,20 @@ impl CopyIntoTableInterpreter {
133134
}
134135

135136
(
136-
CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan {
137-
plan_id: 0,
137+
CopyIntoTableSource::Stage(PhysicalPlan::new(TableScan {
138138
scan_id: 0,
139139
name_mapping,
140140
stat_info: None,
141141
table_index: None,
142142
internal_column: None,
143143
source: Box::new(data_source_plan),
144-
}))),
144+
meta: PhysicalPlanMeta::new("TableScan"),
145+
})),
145146
None,
146147
)
147148
};
148149

149-
let mut root = PhysicalPlan::CopyIntoTable(Box::new(CopyIntoTable {
150-
plan_id: 0,
150+
let mut root = PhysicalPlan::new(CopyIntoTable {
151151
required_values_schema: plan.required_values_schema.clone(),
152152
values_consts: plan.values_consts.clone(),
153153
required_source_schema: plan.required_source_schema.clone(),
@@ -159,16 +159,17 @@ impl CopyIntoTableInterpreter {
159159
source,
160160
is_transform: plan.is_transform,
161161
table_meta_timestamps,
162-
}));
162+
meta: PhysicalPlanMeta::new("CopyIntoTable"),
163+
});
163164

164165
if plan.enable_distributed {
165-
root = PhysicalPlan::Exchange(Exchange {
166-
plan_id: 0,
167-
input: Box::new(root),
166+
root = PhysicalPlan::new(Exchange {
167+
input: root,
168168
kind: FragmentKind::Merge,
169169
keys: Vec::new(),
170170
allow_adjust_parallelism: true,
171171
ignore_exchange: false,
172+
meta: PhysicalPlanMeta::new("Exchange"),
172173
});
173174
}
174175

src/query/service/src/interpreters/interpreter_explain.rs

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ use databend_common_pipeline_core::processors::PlanProfile;
3434
use databend_common_pipeline_core::ExecutionInfo;
3535
use databend_common_pipeline_core::Pipeline;
3636
use databend_common_sql::binder::ExplainConfig;
37-
use databend_common_sql::executor::format_partial_tree;
38-
use databend_common_sql::executor::MutationBuildInfo;
3937
use databend_common_sql::plans::Mutation;
4038
use databend_common_sql::BindContext;
4139
use databend_common_sql::ColumnSet;
@@ -55,6 +53,10 @@ use crate::interpreters::interpreter::on_execution_finished;
5553
use crate::interpreters::interpreter_mutation::build_mutation_info;
5654
use crate::interpreters::interpreter_mutation::MutationInterpreter;
5755
use crate::interpreters::Interpreter;
56+
use crate::physical_plans::FormatContext;
57+
use crate::physical_plans::MutationBuildInfo;
58+
use crate::physical_plans::PhysicalPlan;
59+
use crate::physical_plans::PhysicalPlanBuilder;
5860
use crate::pipelines::executor::ExecutorSettings;
5961
use crate::pipelines::executor::PipelineCompleteExecutor;
6062
use crate::pipelines::executor::PipelinePullingExecutor;
@@ -64,8 +66,6 @@ use crate::schedulers::build_query_pipeline;
6466
use crate::schedulers::Fragmenter;
6567
use crate::schedulers::QueryFragmentsActions;
6668
use crate::sessions::QueryContext;
67-
use crate::sql::executor::PhysicalPlan;
68-
use crate::sql::executor::PhysicalPlanBuilder;
6969
use crate::sql::optimizer::ir::SExpr;
7070
use crate::sql::plans::Plan;
7171

@@ -174,7 +174,20 @@ impl Interpreter for ExplainInterpreter {
174174
let ctx = self.ctx.clone();
175175
let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx, true);
176176
let plan = builder.build(s_expr, bind_context.column_set()).await?;
177-
self.explain_join_order(&plan, metadata)?
177+
178+
let metadata = metadata.read();
179+
let mut context = FormatContext {
180+
profs: HashMap::new(),
181+
metadata: &metadata,
182+
scan_id_to_runtime_filters: HashMap::new(),
183+
};
184+
185+
let formatter = plan.formatter()?;
186+
let format_node = formatter.format_join(&mut context)?;
187+
let result = format_node.format_pretty()?;
188+
let line_split_result: Vec<&str> = result.lines().collect();
189+
let formatted_plan = StringType::from_data(line_split_result);
190+
vec![DataBlock::new_from_columns(vec![formatted_plan])]
178191
}
179192
_ => Err(ErrorCode::Unimplemented(
180193
"Unsupported EXPLAIN JOIN statement",
@@ -360,25 +373,15 @@ impl ExplainInterpreter {
360373
}
361374
}
362375

376+
let metadata = metadata.read();
363377
let result = plan
364-
.format(metadata.clone(), Default::default())?
378+
.format(&metadata, Default::default())?
365379
.format_pretty()?;
366380
let line_split_result: Vec<&str> = result.lines().collect();
367381
let formatted_plan = StringType::from_data(line_split_result);
368382
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
369383
}
370384

371-
pub fn explain_join_order(
372-
&self,
373-
plan: &PhysicalPlan,
374-
metadata: &MetadataRef,
375-
) -> Result<Vec<DataBlock>> {
376-
let result = plan.format_join(metadata)?.format_pretty()?;
377-
let line_split_result: Vec<&str> = result.lines().collect();
378-
let formatted_plan = StringType::from_data(line_split_result);
379-
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
380-
}
381-
382385
fn format_pipeline(build_res: &PipelineBuildResult) -> Vec<DataBlock> {
383386
let mut blocks = Vec::with_capacity(1 + build_res.sources_pipelines.len());
384387
// Format root pipeline
@@ -412,10 +415,13 @@ impl ExplainInterpreter {
412415
.build(&s_expr, required)
413416
.await?;
414417

415-
let root_fragment = Fragmenter::try_create(ctx.clone())?.build_fragment(&plan)?;
418+
let fragments = Fragmenter::try_create(ctx.clone())?.build_fragment(&plan)?;
416419

417420
let mut fragments_actions = QueryFragmentsActions::create(ctx.clone());
418-
root_fragment.get_actions(ctx, &mut fragments_actions)?;
421+
422+
for fragment in fragments {
423+
fragment.get_actions(ctx.clone(), &mut fragments_actions)?;
424+
}
419425

420426
let display_string = fragments_actions.display_indent(&metadata).to_string();
421427
let line_split_result = display_string.lines().collect::<Vec<_>>();
@@ -479,12 +485,27 @@ impl ExplainInterpreter {
479485
if !pruned_partitions_stats.is_empty() {
480486
plan.set_pruning_stats(&mut pruned_partitions_stats);
481487
}
482-
let result = if self.partial {
483-
format_partial_tree(&plan, metadata, &query_profiles)?.format_pretty()?
484-
} else {
485-
plan.format(metadata.clone(), query_profiles.clone())?
486-
.format_pretty()?
488+
489+
let result = match self.partial {
490+
true => {
491+
let metadata = metadata.read();
492+
let mut context = FormatContext {
493+
profs: query_profiles.clone(),
494+
metadata: &metadata,
495+
scan_id_to_runtime_filters: HashMap::new(),
496+
};
497+
498+
let formatter = plan.formatter()?;
499+
let format_node = formatter.partial_format(&mut context)?;
500+
format_node.format_pretty()?
501+
}
502+
false => {
503+
let metadata = metadata.read();
504+
plan.format(&metadata, query_profiles.clone())?
505+
.format_pretty()?
506+
}
487507
};
508+
488509
let line_split_result: Vec<&str> = result.lines().collect();
489510
let formatted_plan = StringType::from_data(line_split_result);
490511
if self.graphical {
@@ -570,10 +591,13 @@ impl ExplainInterpreter {
570591
mutation.metadata.clone(),
571592
)?;
572593
let plan = interpreter.build_physical_plan(&mutation, true).await?;
573-
let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;
594+
let fragments = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;
574595

575596
let mut fragments_actions = QueryFragmentsActions::create(self.ctx.clone());
576-
root_fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;
597+
598+
for fragment in fragments {
599+
fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;
600+
}
577601

578602
let display_string = fragments_actions
579603
.display_indent(&mutation.metadata)

src/query/service/src/interpreters/interpreter_factory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ impl InterpreterFactory {
250250
)?)),
251251
Plan::ExplainAnalyze {
252252
graphical,
253-
partial,
254253
plan,
254+
partial,
255255
} => Ok(Arc::new(ExplainInterpreter::try_create(
256256
ctx,
257257
*plan.clone(),

0 commit comments

Comments
 (0)