Skip to content

Commit e338e11

Browse files
committed
refactor(query): support lazy read for update from
1 parent 40136f2 commit e338e11

File tree

15 files changed

+219
-25
lines changed

15 files changed

+219
-25
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ pub struct TransformHashJoinProbe {
112112
partition_id_to_restore: usize,
113113

114114
step: Step,
115-
step_logs: Vec<Step>,
116115
}
117116

118117
impl TransformHashJoinProbe {
@@ -176,7 +175,6 @@ impl TransformHashJoinProbe {
176175
spiller,
177176
partition_id_to_restore: 0,
178177
step: Step::Async(AsyncStep::WaitBuild),
179-
step_logs: vec![Step::Async(AsyncStep::WaitBuild)],
180178
}))
181179
}
182180

@@ -192,7 +190,6 @@ impl TransformHashJoinProbe {
192190
}
193191
};
194192
self.step = step.clone();
195-
self.step_logs.push(step);
196193
Ok(event)
197194
}
198195

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1297,6 +1297,13 @@ impl DefaultSettings {
12971297
scope: SettingScope::Both,
12981298
range: Some(SettingRange::Numeric(0..=1)),
12991299
}),
1300+
("nondeterministic_update_lazy_read_threshold", DefaultSettingValue {
1301+
value: UserSettingValue::UInt64(u64::MAX),
1302+
desc: "Sets the maximum rows in a query to enable lazy read optimization when updating a multi-joined row. Setting it to 0 disables the optimization.",
1303+
mode: SettingMode::Both,
1304+
scope: SettingScope::Both,
1305+
range: Some(SettingRange::Numeric(0..=u64::MAX)),
1306+
}),
13001307
("enable_auto_vacuum", DefaultSettingValue {
13011308
value: UserSettingValue::UInt64(0),
13021309
desc: "Whether to automatically trigger VACUUM operations on tables (using vacuum2)",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,4 +1013,8 @@ impl Settings {
10131013
pub fn get_enable_parallel_union_all(&self) -> Result<bool> {
10141014
Ok(self.try_get_u64("enable_parallel_union_all")? == 1)
10151015
}
1016+
1017+
pub fn get_nondeterministic_update_lazy_read_threshold(&self) -> Result<u64> {
1018+
self.try_get_u64("nondeterministic_update_lazy_read_threshold")
1019+
}
10161020
}

src/query/sql/src/executor/physical_plan_builder.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ impl PhysicalPlanBuilder {
129129
self.build_mutation_source(mutation_source).await
130130
}
131131
RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await,
132+
RelOperator::RowFetch(row_fetch) => {
133+
self.build_row_fetch(s_expr, row_fetch, required, stat_info)
134+
.await
135+
}
132136
}
133137
}
134138

src/query/sql/src/executor/physical_plans/physical_row_fetch.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,19 @@
1414

1515
use databend_common_catalog::plan::DataSourcePlan;
1616
use databend_common_catalog::plan::Projection;
17+
use databend_common_exception::ErrorCode;
1718
use databend_common_exception::Result;
1819
use databend_common_expression::DataField;
1920
use databend_common_expression::DataSchemaRef;
2021
use databend_common_expression::DataSchemaRefExt;
22+
use databend_common_expression::ROW_ID_COL_NAME;
2123

2224
use crate::executor::explain::PlanStatsInfo;
2325
use crate::executor::PhysicalPlan;
26+
use crate::executor::PhysicalPlanBuilder;
27+
use crate::optimizer::ir::SExpr;
28+
use crate::ColumnEntry;
29+
use crate::ColumnSet;
2430

2531
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
2632
pub struct RowFetch {
@@ -46,3 +52,91 @@ impl RowFetch {
4652
Ok(DataSchemaRefExt::create(fields))
4753
}
4854
}
55+
56+
impl PhysicalPlanBuilder {
57+
pub(crate) async fn build_row_fetch(
58+
&mut self,
59+
s_expr: &SExpr,
60+
row_fetch: &crate::plans::RowFetch,
61+
mut required: ColumnSet,
62+
stat_info: PlanStatsInfo,
63+
) -> Result<PhysicalPlan> {
64+
// 1. Prune unused Columns.
65+
// Apply lazy.
66+
required = required
67+
.difference(&row_fetch.lazy_columns)
68+
.cloned()
69+
.collect::<ColumnSet>();
70+
71+
required.insert(row_fetch.row_id_index);
72+
73+
// 2. Build physical plan.
74+
let input_plan = self.build(s_expr.child(0)?, required).await?;
75+
let metadata = self.metadata.read().clone();
76+
77+
// If `lazy_columns` is not empty, build a `RowFetch` plan on top of the `Limit` plan.
78+
let input_schema = input_plan.output_schema()?;
79+
80+
// Lazy materialization is enabled.
81+
let row_id_col_index = metadata
82+
.columns()
83+
.iter()
84+
.position(|col| col.name() == ROW_ID_COL_NAME)
85+
.ok_or_else(|| ErrorCode::Internal("Internal column _row_id is not found"))?;
86+
87+
let Ok(row_id_col_offset) = input_schema.index_of(&row_id_col_index.to_string()) else {
88+
return Err(ErrorCode::Internal("Internal column _row_id is not found"));
89+
};
90+
91+
let lazy_columns = metadata
92+
.lazy_columns()
93+
.iter()
94+
.filter(|index| !input_schema.has_field(&index.to_string())) // If the column is already in the input schema, we don't need to fetch it.
95+
.cloned()
96+
.collect::<Vec<_>>();
97+
98+
if lazy_columns.is_empty() {
99+
// If there is no lazy column, we don't need to build a `RowFetch` plan.
100+
return Ok(input_plan);
101+
}
102+
103+
let mut has_inner_column = false;
104+
let fetched_fields = lazy_columns
105+
.iter()
106+
.map(|index| {
107+
let col = metadata.column(*index);
108+
if let ColumnEntry::BaseTableColumn(c) = col {
109+
if c.path_indices.is_some() {
110+
has_inner_column = true;
111+
}
112+
}
113+
DataField::new(&index.to_string(), col.data_type())
114+
})
115+
.collect();
116+
117+
let source = input_plan.try_find_single_data_source();
118+
debug_assert!(source.is_some());
119+
let source_info = source.cloned().unwrap();
120+
let table_schema = source_info.source_info.schema();
121+
let cols_to_fetch = Self::build_projection(
122+
&metadata,
123+
&table_schema,
124+
lazy_columns.iter(),
125+
has_inner_column,
126+
true,
127+
true,
128+
false,
129+
);
130+
131+
Ok(PhysicalPlan::RowFetch(RowFetch {
132+
plan_id: 0,
133+
input: Box::new(input_plan),
134+
source: Box::new(source_info),
135+
row_id_col_offset,
136+
cols_to_fetch,
137+
fetched_fields,
138+
need_wrap_nullable: row_fetch.need_wrap_nullable,
139+
stat_info: Some(stat_info),
140+
}))
141+
}
142+
}

src/query/sql/src/planner/binder/bind_mutation/update.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ use crate::binder::bind_mutation::mutation_expression::MutationExpression;
2929
use crate::binder::util::TableIdentifier;
3030
use crate::binder::Binder;
3131
use crate::optimizer::ir::Matcher;
32+
use crate::optimizer::ir::RelExpr;
3233
use crate::plans::AggregateFunction;
3334
use crate::plans::BoundColumnRef;
3435
use crate::plans::EvalScalar;
3536
use crate::plans::Plan;
3637
use crate::plans::RelOp;
3738
use crate::plans::RelOperator;
39+
use crate::plans::RowFetch;
3840
use crate::plans::ScalarItem;
3941
use crate::plans::VisitorMut;
4042
use crate::BindContext;
@@ -282,14 +284,36 @@ impl Binder {
282284
.collect();
283285
let eval_scalar = EvalScalar { items };
284286

285-
mutation.bind_context.aggregate_info.group_items = fields_bindings
286-
.into_iter()
287-
.chain(std::iter::once(row_id))
288-
.map(|column| ScalarItem {
289-
index: column.index,
290-
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column }),
291-
})
292-
.collect();
287+
mutation.bind_context.aggregate_info.group_items = vec![ScalarItem {
288+
index: row_id.index,
289+
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef {
290+
span: None,
291+
column: row_id.clone(),
292+
}),
293+
}];
294+
295+
let enable_lazy_read = {
296+
let settings = self.ctx.get_settings();
297+
let lazy_read_threshold = settings.get_nondeterministic_update_lazy_read_threshold()?;
298+
let rel_expr = RelExpr::with_s_expr(s_expr);
299+
let cardinality = rel_expr.derive_cardinality_child(0)?;
300+
301+
lazy_read_threshold != 0 && lazy_read_threshold >= cardinality.cardinality as u64
302+
};
303+
304+
if mutation.strategy == MutationStrategy::Direct || !enable_lazy_read {
305+
mutation
306+
.bind_context
307+
.aggregate_info
308+
.group_items
309+
.extend(fields_bindings.iter().map(|column| ScalarItem {
310+
index: column.index,
311+
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef {
312+
span: None,
313+
column: column.clone(),
314+
}),
315+
}));
316+
}
293317

294318
for eval in &mut mutation.matched_evaluators {
295319
if let Some(expr) = &mut eval.condition {
@@ -319,14 +343,20 @@ impl Binder {
319343
.collect(),
320344
);
321345

322-
let aggr_expr =
346+
let mut input =
323347
self.bind_aggregate(&mut mutation.bind_context, s_expr.unary_child().clone())?;
324348

325-
let input = if eval_scalar.items.is_empty() {
326-
aggr_expr
327-
} else {
328-
aggr_expr.build_unary(Arc::new(eval_scalar.into()))
329-
};
349+
if !eval_scalar.items.is_empty() {
350+
input = input.build_unary(Arc::new(eval_scalar.into()));
351+
}
352+
353+
if mutation.strategy != MutationStrategy::Direct && enable_lazy_read {
354+
input = input.build_unary(RelOperator::RowFetch(RowFetch {
355+
need_wrap_nullable: false,
356+
row_id_index: row_id.index,
357+
lazy_columns: fields_bindings.iter().map(|x| x.index).collect(),
358+
}));
359+
}
330360

331361
let s_expr = Box::new(input.build_unary(Arc::new(mutation.into())));
332362
let Plan::DataMutation {

src/query/sql/src/planner/optimizer/ir/format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ fn display_rel_op(rel_op: &RelOperator) -> String {
5353
RelOperator::Sort(_) => "Sort".to_string(),
5454
RelOperator::Limit(_) => "Limit".to_string(),
5555
RelOperator::UnionAll(_) => "UnionAll".to_string(),
56+
RelOperator::RowFetch(_) => "RowFetch".to_string(),
5657
RelOperator::Exchange(op) => {
5758
format!("Exchange: ({})", match op {
5859
Exchange::Hash(scalars) => format!(

src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ impl DPhpyOptimizer {
336336
| RelOperator::Aggregate(_)
337337
| RelOperator::Sort(_)
338338
| RelOperator::Limit(_)
339+
| RelOperator::RowFetch(_)
339340
| RelOperator::EvalScalar(_)
340341
| RelOperator::Window(_)
341342
| RelOperator::Udf(_)

src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ pub async fn dynamic_sample(
139139
| RelOperator::Exchange(_)
140140
| RelOperator::Window(_)
141141
| RelOperator::Udf(_)
142-
| RelOperator::AsyncFunction(_) => {
142+
| RelOperator::AsyncFunction(_)
143+
| RelOperator::RowFetch(_) => {
143144
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await
144145
}
145146
}

src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,12 +318,13 @@ impl SubqueryDecorrelatorOptimizer {
318318
Arc::new(self.optimize_sync(s_expr.right_child())?),
319319
)),
320320

321-
RelOperator::Limit(_) | RelOperator::Udf(_) | RelOperator::AsyncFunction(_) => {
322-
Ok(SExpr::create_unary(
323-
s_expr.plan.clone(),
324-
Arc::new(self.optimize_sync(s_expr.unary_child())?),
325-
))
326-
}
321+
RelOperator::Limit(_)
322+
| RelOperator::Udf(_)
323+
| RelOperator::AsyncFunction(_)
324+
| RelOperator::RowFetch(_) => Ok(SExpr::create_unary(
325+
s_expr.plan.clone(),
326+
Arc::new(self.optimize_sync(s_expr.unary_child())?),
327+
)),
327328

328329
RelOperator::DummyTableScan(_)
329330
| RelOperator::Scan(_)

0 commit comments

Comments
 (0)