From e338e11aafa8468c36d3752a0008b4d056579472 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sat, 19 Jul 2025 22:14:37 +0800 Subject: [PATCH 1/5] refactor(query): support lazy read for update from --- .../hash_join/transform_hash_join_probe.rs | 3 - src/query/settings/src/settings_default.rs | 7 ++ .../settings/src/settings_getter_setter.rs | 4 + .../sql/src/executor/physical_plan_builder.rs | 4 + .../physical_plans/physical_row_fetch.rs | 94 +++++++++++++++++++ .../planner/binder/bind_mutation/update.rs | 58 +++++++++--- .../sql/src/planner/optimizer/ir/format.rs | 1 + .../optimizer/optimizers/hyper_dp/dphyp.rs | 1 + .../hyper_dp/dynamic_sample/dynamic_sample.rs | 3 +- .../decorrelate/subquery_decorrelator.rs | 13 +-- .../join_rules/rule_semi_to_inner_join.rs | 3 +- src/query/sql/src/planner/plans/mod.rs | 2 + src/query/sql/src/planner/plans/operator.rs | 3 + .../sql/src/planner/plans/operator_macros.rs | 1 + src/query/sql/src/planner/plans/row_fetch.rs | 47 ++++++++++ 15 files changed, 219 insertions(+), 25 deletions(-) create mode 100644 src/query/sql/src/planner/plans/row_fetch.rs diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index 62836ef7c5a1e..c991b1a71220a 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -112,7 +112,6 @@ pub struct TransformHashJoinProbe { partition_id_to_restore: usize, step: Step, - step_logs: Vec, } impl TransformHashJoinProbe { @@ -176,7 +175,6 @@ impl TransformHashJoinProbe { spiller, partition_id_to_restore: 0, step: Step::Async(AsyncStep::WaitBuild), - step_logs: vec![Step::Async(AsyncStep::WaitBuild)], })) } @@ -192,7 +190,6 @@ impl TransformHashJoinProbe { } }; self.step = step.clone(); - self.step_logs.push(step); Ok(event) } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 9b32bbbc233fd..3e727f228dafd 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1297,6 +1297,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("nondeterministic_update_lazy_read_threshold", DefaultSettingValue { + value: UserSettingValue::UInt64(u64::MAX), + desc: "Sets the maximum rows in a query to enable lazy read optimization when updating a multi-joined row. Setting it to 0 disables the optimization.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("enable_auto_vacuum", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Whether to automatically trigger VACUUM operations on tables (using vacuum2)", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0e9a7617a1ac9..74b247e13e21c 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -1013,4 +1013,8 @@ impl Settings { pub fn get_enable_parallel_union_all(&self) -> Result { Ok(self.try_get_u64("enable_parallel_union_all")? == 1) } + + pub fn get_nondeterministic_update_lazy_read_threshold(&self) -> Result { + self.try_get_u64("nondeterministic_update_lazy_read_threshold") + } } diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index c50adc9f30db4..e79ec81630d0d 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -129,6 +129,10 @@ impl PhysicalPlanBuilder { self.build_mutation_source(mutation_source).await } RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await, + RelOperator::RowFetch(row_fetch) => { + self.build_row_fetch(s_expr, row_fetch, required, stat_info) + .await + } } } diff --git a/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs b/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs index 3a5aca4f7a309..1cebec4f9e269 100644 --- a/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs +++ b/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs @@ -14,13 +14,19 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::Projection; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; +use databend_common_expression::ROW_ID_COL_NAME; use crate::executor::explain::PlanStatsInfo; use crate::executor::PhysicalPlan; +use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::ir::SExpr; +use crate::ColumnEntry; +use crate::ColumnSet; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct RowFetch { @@ -46,3 +52,91 @@ impl RowFetch { Ok(DataSchemaRefExt::create(fields)) } } + +impl PhysicalPlanBuilder { + pub(crate) async fn build_row_fetch( + &mut self, + s_expr: &SExpr, + row_fetch: &crate::plans::RowFetch, + mut required: ColumnSet, + stat_info: PlanStatsInfo, + ) -> Result { + // 1. Prune unused Columns. + // Apply lazy. + required = required + .difference(&row_fetch.lazy_columns) + .cloned() + .collect::(); + + required.insert(row_fetch.row_id_index); + + // 2. Build physical plan. + let input_plan = self.build(s_expr.child(0)?, required).await?; + let metadata = self.metadata.read().clone(); + + // If `lazy_columns` is not empty, build a `RowFetch` plan on top of the `Limit` plan. + let input_schema = input_plan.output_schema()?; + + // Lazy materialization is enabled. + let row_id_col_index = metadata + .columns() + .iter() + .position(|col| col.name() == ROW_ID_COL_NAME) + .ok_or_else(|| ErrorCode::Internal("Internal column _row_id is not found"))?; + + let Ok(row_id_col_offset) = input_schema.index_of(&row_id_col_index.to_string()) else { + return Err(ErrorCode::Internal("Internal column _row_id is not found")); + }; + + let lazy_columns = metadata + .lazy_columns() + .iter() + .filter(|index| !input_schema.has_field(&index.to_string())) // If the column is already in the input schema, we don't need to fetch it. + .cloned() + .collect::>(); + + if lazy_columns.is_empty() { + // If there is no lazy column, we don't need to build a `RowFetch` plan. + return Ok(input_plan); + } + + let mut has_inner_column = false; + let fetched_fields = lazy_columns + .iter() + .map(|index| { + let col = metadata.column(*index); + if let ColumnEntry::BaseTableColumn(c) = col { + if c.path_indices.is_some() { + has_inner_column = true; + } + } + DataField::new(&index.to_string(), col.data_type()) + }) + .collect(); + + let source = input_plan.try_find_single_data_source(); + debug_assert!(source.is_some()); + let source_info = source.cloned().unwrap(); + let table_schema = source_info.source_info.schema(); + let cols_to_fetch = Self::build_projection( + &metadata, + &table_schema, + lazy_columns.iter(), + has_inner_column, + true, + true, + false, + ); + + Ok(PhysicalPlan::RowFetch(RowFetch { + plan_id: 0, + input: Box::new(input_plan), + source: Box::new(source_info), + row_id_col_offset, + cols_to_fetch, + fetched_fields, + need_wrap_nullable: row_fetch.need_wrap_nullable, + stat_info: Some(stat_info), + })) + } +} diff --git a/src/query/sql/src/planner/binder/bind_mutation/update.rs b/src/query/sql/src/planner/binder/bind_mutation/update.rs index 56de12c2c2921..5e1b6f75b2c7a 100644 --- a/src/query/sql/src/planner/binder/bind_mutation/update.rs +++ b/src/query/sql/src/planner/binder/bind_mutation/update.rs @@ -29,12 +29,14 @@ use crate::binder::bind_mutation::mutation_expression::MutationExpression; use crate::binder::util::TableIdentifier; use crate::binder::Binder; use crate::optimizer::ir::Matcher; +use crate::optimizer::ir::RelExpr; use crate::plans::AggregateFunction; use crate::plans::BoundColumnRef; use crate::plans::EvalScalar; use crate::plans::Plan; use crate::plans::RelOp; use crate::plans::RelOperator; +use crate::plans::RowFetch; use crate::plans::ScalarItem; use crate::plans::VisitorMut; use crate::BindContext; @@ -282,14 +284,36 @@ impl Binder { .collect(); let eval_scalar = EvalScalar { items }; - mutation.bind_context.aggregate_info.group_items = fields_bindings - .into_iter() - .chain(std::iter::once(row_id)) - .map(|column| ScalarItem { - index: column.index, - scalar: ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column }), - }) - .collect(); + mutation.bind_context.aggregate_info.group_items = vec![ScalarItem { + index: row_id.index, + scalar: ScalarExpr::BoundColumnRef(BoundColumnRef { + span: None, + column: row_id.clone(), + }), + }]; + + let enable_lazy_read = { + let settings = self.ctx.get_settings(); + let lazy_read_threshold = settings.get_nondeterministic_update_lazy_read_threshold()?; + let rel_expr = RelExpr::with_s_expr(s_expr); + let cardinality = rel_expr.derive_cardinality_child(0)?; + + lazy_read_threshold != 0 && lazy_read_threshold >= cardinality.cardinality as u64 + }; + + if mutation.strategy == MutationStrategy::Direct || !enable_lazy_read { + mutation + .bind_context + .aggregate_info + .group_items + .extend(fields_bindings.iter().map(|column| ScalarItem { + index: column.index, + scalar: ScalarExpr::BoundColumnRef(BoundColumnRef { + span: None, + column: column.clone(), + }), + })); + } for eval in &mut mutation.matched_evaluators { if let Some(expr) = &mut eval.condition { @@ -319,14 +343,20 @@ impl Binder { .collect(), ); - let aggr_expr = + let mut input = self.bind_aggregate(&mut mutation.bind_context, s_expr.unary_child().clone())?; - let input = if eval_scalar.items.is_empty() { - aggr_expr - } else { - aggr_expr.build_unary(Arc::new(eval_scalar.into())) - }; + if !eval_scalar.items.is_empty() { + input = input.build_unary(Arc::new(eval_scalar.into())); + } + + if mutation.strategy != MutationStrategy::Direct && enable_lazy_read { + input = input.build_unary(RelOperator::RowFetch(RowFetch { + need_wrap_nullable: false, + row_id_index: row_id.index, + lazy_columns: fields_bindings.iter().map(|x| x.index).collect(), + })); + } let s_expr = Box::new(input.build_unary(Arc::new(mutation.into()))); let Plan::DataMutation { diff --git a/src/query/sql/src/planner/optimizer/ir/format.rs b/src/query/sql/src/planner/optimizer/ir/format.rs index f9613af6b35ef..634660d93e043 100644 --- a/src/query/sql/src/planner/optimizer/ir/format.rs +++ b/src/query/sql/src/planner/optimizer/ir/format.rs @@ -53,6 +53,7 @@ fn display_rel_op(rel_op: &RelOperator) -> String { RelOperator::Sort(_) => "Sort".to_string(), RelOperator::Limit(_) => "Limit".to_string(), RelOperator::UnionAll(_) => "UnionAll".to_string(), + RelOperator::RowFetch(_) => "RowFetch".to_string(), RelOperator::Exchange(op) => { format!("Exchange: ({})", match op { Exchange::Hash(scalars) => format!( diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs index 868b55f05e935..6869f401130eb 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dphyp.rs @@ -336,6 +336,7 @@ impl DPhpyOptimizer { | RelOperator::Aggregate(_) | RelOperator::Sort(_) | RelOperator::Limit(_) + | RelOperator::RowFetch(_) | RelOperator::EvalScalar(_) | RelOperator::Window(_) | RelOperator::Udf(_) diff --git a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs index eb645151c4230..e3fc8a4fd4499 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/hyper_dp/dynamic_sample/dynamic_sample.rs @@ -139,7 +139,8 @@ pub async fn dynamic_sample( | RelOperator::Exchange(_) | RelOperator::Window(_) | RelOperator::Udf(_) - | RelOperator::AsyncFunction(_) => { + | RelOperator::AsyncFunction(_) + | RelOperator::RowFetch(_) => { dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await } } diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs index 079f80afcb9fe..795767759be74 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/subquery_decorrelator.rs @@ -318,12 +318,13 @@ impl SubqueryDecorrelatorOptimizer { Arc::new(self.optimize_sync(s_expr.right_child())?), )), - RelOperator::Limit(_) | RelOperator::Udf(_) | RelOperator::AsyncFunction(_) => { - Ok(SExpr::create_unary( - s_expr.plan.clone(), - Arc::new(self.optimize_sync(s_expr.unary_child())?), - )) - } + RelOperator::Limit(_) + | RelOperator::Udf(_) + | RelOperator::AsyncFunction(_) + | RelOperator::RowFetch(_) => Ok(SExpr::create_unary( + s_expr.plan.clone(), + Arc::new(self.optimize_sync(s_expr.unary_child())?), + )), RelOperator::DummyTableScan(_) | RelOperator::Scan(_) diff --git a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs index 46ebf4648860d..d626ee8fffd2c 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/rule/join_rules/rule_semi_to_inner_join.rs @@ -152,7 +152,8 @@ fn find_group_by_keys( | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::MutationSource(_) - | RelOperator::CompactBlock(_) => {} + | RelOperator::CompactBlock(_) + | RelOperator::RowFetch(_) => {} } Ok(()) } diff --git a/src/query/sql/src/planner/plans/mod.rs b/src/query/sql/src/planner/plans/mod.rs index 2124d39c6aa79..4e9ef72f95a28 100644 --- a/src/query/sql/src/planner/plans/mod.rs +++ b/src/query/sql/src/planner/plans/mod.rs @@ -43,6 +43,7 @@ mod r_cte_scan; mod recluster; mod replace; mod revert_table; +mod row_fetch; mod scalar_expr; mod scan; mod set; @@ -90,6 +91,7 @@ pub use r_cte_scan::*; pub use recluster::*; pub use replace::Replace; pub use revert_table::RevertTablePlan; +pub use row_fetch::*; pub use scalar_expr::*; pub use scan::*; pub use set::*; diff --git a/src/query/sql/src/planner/plans/operator.rs b/src/query/sql/src/planner/plans/operator.rs index 092a010304ec0..5563c812de2a4 100644 --- a/src/query/sql/src/planner/plans/operator.rs +++ b/src/query/sql/src/planner/plans/operator.rs @@ -45,6 +45,7 @@ use crate::plans::Limit; use crate::plans::Mutation; use crate::plans::OptimizeCompactBlock as CompactBlock; use crate::plans::ProjectSet; +use crate::plans::RowFetch; use crate::plans::Scan; use crate::plans::Sort; use crate::plans::Udf; @@ -127,6 +128,7 @@ pub enum RelOp { MergeInto, CompactBlock, MutationSource, + RowFetch, } /// Relational operators @@ -155,6 +157,7 @@ pub enum RelOperator { ExpressionScan(ExpressionScan), CacheScan(CacheScan), Udf(Udf), + RowFetch(RowFetch), RecursiveCteScan(RecursiveCteScan), AsyncFunction(AsyncFunction), Mutation(Mutation), diff --git a/src/query/sql/src/planner/plans/operator_macros.rs b/src/query/sql/src/planner/plans/operator_macros.rs index 4b90525c53f43..50c1b7d414d2b 100644 --- a/src/query/sql/src/planner/plans/operator_macros.rs +++ b/src/query/sql/src/planner/plans/operator_macros.rs @@ -110,6 +110,7 @@ macro_rules! impl_match_rel_op { RelOperator::Mutation($rel_op) => $rel_op.$method($($arg),*), RelOperator::CompactBlock($rel_op) => $rel_op.$method($($arg),*), RelOperator::MutationSource($rel_op) => $rel_op.$method($($arg),*), + RelOperator::RowFetch($rel_op) => $rel_op.$method($($arg),*), } } } diff --git a/src/query/sql/src/planner/plans/row_fetch.rs b/src/query/sql/src/planner/plans/row_fetch.rs new file mode 100644 index 0000000000000..22995379595db --- /dev/null +++ b/src/query/sql/src/planner/plans/row_fetch.rs @@ -0,0 +1,47 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; + +use crate::optimizer::ir::RelExpr; +use crate::optimizer::ir::RelationalProperty; +use crate::optimizer::ir::StatInfo; +use crate::plans::Operator; +use crate::plans::RelOp; +use crate::ColumnSet; +use crate::IndexType; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct RowFetch { + pub need_wrap_nullable: bool, + + pub lazy_columns: ColumnSet, + pub row_id_index: IndexType, +} + +impl Operator for RowFetch { + fn rel_op(&self) -> RelOp { + RelOp::RowFetch + } + + fn derive_relational_prop(&self, rel_expr: &RelExpr) -> Result> { + rel_expr.derive_relational_prop_child(0) + } + + fn derive_stats(&self, rel_expr: &RelExpr) -> Result> { + rel_expr.derive_cardinality_child(0) + } +} From 8ef194999e278a70616d5fa2e8398d84e130dee7 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sat, 19 Jul 2025 22:20:04 +0800 Subject: [PATCH 2/5] refactor(query): support lazy read for update from --- src/query/settings/src/settings_default.rs | 2 +- .../suites/crdb/nondeterministic_update2.test | 13 +++++++++++++ .../query/cte/update_cte_nondeterministic2.test | 13 +++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 tests/sqllogictests/suites/crdb/nondeterministic_update2.test create mode 100644 tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 3e727f228dafd..cc11c2c7e9725 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1298,7 +1298,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("nondeterministic_update_lazy_read_threshold", DefaultSettingValue { - value: UserSettingValue::UInt64(u64::MAX), + value: UserSettingValue::UInt64(0), desc: "Sets the maximum rows in a query to enable lazy read optimization when updating a multi-joined row. Setting it to 0 disables the optimization.", mode: SettingMode::Both, scope: SettingScope::Both, diff --git a/tests/sqllogictests/suites/crdb/nondeterministic_update2.test b/tests/sqllogictests/suites/crdb/nondeterministic_update2.test new file mode 100644 index 0000000000000..5af8bd644880d --- /dev/null +++ b/tests/sqllogictests/suites/crdb/nondeterministic_update2.test @@ -0,0 +1,13 @@ +statement ok +set error_on_nondeterministic_update = 0; + +statement ok +set nondeterministic_update_lazy_read_threshold = 18446744073709551615; + +include ./update.test + +statement ok +unset error_on_nondeterministic_update; + +statement ok +unset nondeterministic_update_lazy_read_threshold; \ No newline at end of file diff --git a/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test b/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test new file mode 100644 index 0000000000000..0009da46aa776 --- /dev/null +++ b/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test @@ -0,0 +1,13 @@ +statement ok +set error_on_nondeterministic_update = 0; + +statement ok +set nondeterministic_update_lazy_read_threshold = 18446744073709551615; + +include ./update_cte.test + +statement ok +unset error_on_nondeterministic_update; + +statement ok +unset nondeterministic_update_lazy_read_threshold; From cad4a8dd8e234a0f34d62e02093b2dfdbc383059 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sat, 19 Jul 2025 22:20:43 +0800 Subject: [PATCH 3/5] refactor(query): support lazy read for update from --- tests/sqllogictests/suites/crdb/nondeterministic_update2.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sqllogictests/suites/crdb/nondeterministic_update2.test b/tests/sqllogictests/suites/crdb/nondeterministic_update2.test index 5af8bd644880d..f7d5ca87841aa 100644 --- a/tests/sqllogictests/suites/crdb/nondeterministic_update2.test +++ b/tests/sqllogictests/suites/crdb/nondeterministic_update2.test @@ -10,4 +10,4 @@ statement ok unset error_on_nondeterministic_update; statement ok -unset nondeterministic_update_lazy_read_threshold; \ No newline at end of file +unset nondeterministic_update_lazy_read_threshold; From 9cb9d55412d0a6c5f7f06db4526d4527aef97913 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sun, 20 Jul 2025 00:54:06 +0800 Subject: [PATCH 4/5] refactor(query): support lazy read for update from --- .../physical_plans/physical_mutation.rs | 19 +++++++++++++++---- .../physical_plans/physical_row_fetch.rs | 7 ++++--- .../planner/binder/bind_mutation/update.rs | 1 + src/query/sql/src/planner/plans/row_fetch.rs | 1 + 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/query/sql/src/executor/physical_plans/physical_mutation.rs b/src/query/sql/src/executor/physical_plans/physical_mutation.rs index 9feb40c125a1d..ed757c1ac1386 100644 --- a/src/query/sql/src/executor/physical_plans/physical_mutation.rs +++ b/src/query/sql/src/executor/physical_plans/physical_mutation.rs @@ -58,6 +58,7 @@ use crate::executor::physical_plans::MutationOrganize; use crate::executor::physical_plans::MutationSplit; use crate::executor::physical_plans::RowFetch; use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::ir::RelExpr; use crate::optimizer::ir::SExpr; use crate::parse_computed_expr; use crate::plans::BoundColumnRef; @@ -283,11 +284,21 @@ impl PhysicalPlanBuilder { })); } + let already_enable_lazy_read = { + let settings = self.ctx.get_settings(); + let lazy_read_threshold = settings.get_nondeterministic_update_lazy_read_threshold()?; + let rel_expr = RelExpr::with_s_expr(s_expr); + let cardinality = rel_expr.derive_cardinality_child(0)?; + + lazy_read_threshold != 0 && lazy_read_threshold >= cardinality.cardinality as u64 + }; + // Construct row fetch plan for lazy columns. - if let Some(lazy_columns) = self - .metadata - .read() - .get_table_lazy_columns(target_table_index) + if !already_enable_lazy_read + && let Some(lazy_columns) = self + .metadata + .read() + .get_table_lazy_columns(target_table_index) && !lazy_columns.is_empty() { plan = PhysicalPlan::RowFetch(build_mutation_row_fetch( diff --git a/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs b/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs index 1cebec4f9e269..434a240710a3d 100644 --- a/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs +++ b/src/query/sql/src/executor/physical_plans/physical_row_fetch.rs @@ -88,8 +88,8 @@ impl PhysicalPlanBuilder { return Err(ErrorCode::Internal("Internal column _row_id is not found")); }; - let lazy_columns = metadata - .lazy_columns() + let lazy_columns = row_fetch + .lazy_columns .iter() .filter(|index| !input_schema.has_field(&index.to_string())) // If the column is already in the input schema, we don't need to fetch it. .cloned() @@ -114,7 +114,8 @@ impl PhysicalPlanBuilder { }) .collect(); - let source = input_plan.try_find_single_data_source(); + let metadata = self.metadata.read(); + let source = metadata.get_table_source(&row_fetch.fetch_table_index); debug_assert!(source.is_some()); let source_info = source.cloned().unwrap(); let table_schema = source_info.source_info.schema(); diff --git a/src/query/sql/src/planner/binder/bind_mutation/update.rs b/src/query/sql/src/planner/binder/bind_mutation/update.rs index 5e1b6f75b2c7a..1b3239728dfb1 100644 --- a/src/query/sql/src/planner/binder/bind_mutation/update.rs +++ b/src/query/sql/src/planner/binder/bind_mutation/update.rs @@ -355,6 +355,7 @@ impl Binder { need_wrap_nullable: false, row_id_index: row_id.index, lazy_columns: fields_bindings.iter().map(|x| x.index).collect(), + fetch_table_index: mutation.target_table_index, })); } diff --git a/src/query/sql/src/planner/plans/row_fetch.rs b/src/query/sql/src/planner/plans/row_fetch.rs index 22995379595db..b5f162a4e5256 100644 --- a/src/query/sql/src/planner/plans/row_fetch.rs +++ b/src/query/sql/src/planner/plans/row_fetch.rs @@ -30,6 +30,7 @@ pub struct RowFetch { pub lazy_columns: ColumnSet, pub row_id_index: IndexType, + pub fetch_table_index: IndexType, } impl Operator for RowFetch { From f6fdfcfb28534b5f01b9ea3cc325d8c056bb35c8 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 21 Jul 2025 09:11:10 +0800 Subject: [PATCH 5/5] refactor(query): support lazy read for update from --- src/query/settings/src/settings_default.rs | 2 +- tests/sqllogictests/suites/crdb/nondeterministic_update2.test | 2 +- .../suites/query/cte/update_cte_nondeterministic2.test | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index cc11c2c7e9725..43783520ed773 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1298,7 +1298,7 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=1)), }), ("nondeterministic_update_lazy_read_threshold", DefaultSettingValue { - value: UserSettingValue::UInt64(0), + value: UserSettingValue::UInt64(18446744073709551615), desc: "Sets the maximum rows in a query to enable lazy read optimization when updating a multi-joined row. Setting it to 0 disables the optimization.", mode: SettingMode::Both, scope: SettingScope::Both, diff --git a/tests/sqllogictests/suites/crdb/nondeterministic_update2.test b/tests/sqllogictests/suites/crdb/nondeterministic_update2.test index f7d5ca87841aa..050f386d15762 100644 --- a/tests/sqllogictests/suites/crdb/nondeterministic_update2.test +++ b/tests/sqllogictests/suites/crdb/nondeterministic_update2.test @@ -2,7 +2,7 @@ statement ok set error_on_nondeterministic_update = 0; statement ok -set nondeterministic_update_lazy_read_threshold = 18446744073709551615; +set nondeterministic_update_lazy_read_threshold = 0; include ./update.test diff --git a/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test b/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test index 0009da46aa776..0f1ec162ad4ff 100644 --- a/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test +++ b/tests/sqllogictests/suites/query/cte/update_cte_nondeterministic2.test @@ -2,7 +2,7 @@ statement ok set error_on_nondeterministic_update = 0; statement ok -set nondeterministic_update_lazy_read_threshold = 18446744073709551615; +set nondeterministic_update_lazy_read_threshold = 0; include ./update_cte.test