Skip to content

Commit fd15140

Browse files
authored
feat(query): support nondeterministic update (#17555)
1 parent f601195 commit fd15140

File tree

6 files changed

+296
-1
lines changed

6 files changed

+296
-1
lines changed

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,13 @@ impl DefaultSettings {
12141214
scope: SettingScope::Both,
12151215
range: Some(SettingRange::Numeric(0..=1)),
12161216
}),
1217+
("error_on_nondeterministic_update", DefaultSettingValue {
1218+
value: UserSettingValue::UInt64(1),
1219+
desc: "Whether to return an error when updating a multi-joined row.",
1220+
mode: SettingMode::Both,
1221+
scope: SettingScope::Both,
1222+
range: Some(SettingRange::Numeric(0..=1)),
1223+
}),
12171224
]);
12181225

12191226
Ok(Arc::new(DefaultSettings {

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,10 @@ impl Settings {
873873
Ok(self.try_get_u64("copy_dedup_full_path_by_default")? == 1)
874874
}
875875

876+
pub fn get_error_on_nondeterministic_update(&self) -> Result<bool> {
877+
Ok(self.try_get_u64("error_on_nondeterministic_update")? == 1)
878+
}
879+
876880
pub fn get_max_query_memory_usage(&self) -> Result<u64> {
877881
self.try_get_u64("max_query_memory_usage")
878882
}

src/query/sql/src/planner/binder/bind_mutation/update.rs

Lines changed: 198 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,31 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashSet;
16+
use std::sync::Arc;
17+
1518
use databend_common_ast::ast::MatchOperation;
1619
use databend_common_ast::ast::MatchedClause;
1720
use databend_common_ast::ast::MutationUpdateExpr;
1821
use databend_common_ast::ast::TableReference;
1922
use databend_common_ast::ast::UpdateStmt;
2023
use databend_common_exception::Result;
2124

25+
use crate::binder::aggregate::AggregateRewriter;
2226
use crate::binder::bind_mutation::bind::Mutation;
2327
use crate::binder::bind_mutation::bind::MutationStrategy;
2428
use crate::binder::bind_mutation::mutation_expression::MutationExpression;
2529
use crate::binder::util::TableIdentifier;
2630
use crate::binder::Binder;
31+
use crate::optimizer::SExpr;
32+
use crate::plans::AggregateFunction;
33+
use crate::plans::BoundColumnRef;
2734
use crate::plans::Plan;
35+
use crate::plans::RelOperator;
36+
use crate::plans::ScalarItem;
37+
use crate::plans::VisitorMut;
2838
use crate::BindContext;
39+
use crate::ScalarExpr;
2940

3041
impl Binder {
3142
#[async_backtrace::framed]
@@ -93,6 +104,192 @@ impl Binder {
93104
unmatched_clauses: vec![],
94105
};
95106

96-
self.bind_mutation(bind_context, mutation).await
107+
let plan = self.bind_mutation(bind_context, mutation).await?;
108+
109+
let settings = self.ctx.get_settings();
110+
if settings.get_error_on_nondeterministic_update()? {
111+
Ok(plan)
112+
} else {
113+
self.rewrite_nondeterministic_update(plan)
114+
}
115+
}
116+
117+
fn rewrite_nondeterministic_update(&mut self, plan: Plan) -> Result<Plan> {
118+
let Plan::DataMutation { box s_expr, .. } = &plan else {
119+
return Ok(plan);
120+
};
121+
let RelOperator::Mutation(mutation) = &*s_expr.plan else {
122+
return Ok(plan);
123+
};
124+
let filter_expr = &s_expr.children[0];
125+
let RelOperator::Filter(_) = &*filter_expr.plan else {
126+
return Ok(plan);
127+
};
128+
let input = &filter_expr.children[0];
129+
let RelOperator::Join(_) = &*input.plan else {
130+
return Ok(plan);
131+
};
132+
133+
let mut mutation = mutation.clone();
134+
135+
let row_id = mutation
136+
.bind_context
137+
.columns
138+
.iter()
139+
.find(|binding| binding.index == mutation.row_id_index)
140+
.unwrap()
141+
.clone();
142+
143+
let table_schema = mutation
144+
.metadata
145+
.read()
146+
.table(mutation.target_table_index)
147+
.table()
148+
.schema();
149+
150+
let fields_bindings = table_schema
151+
.fields
152+
.iter()
153+
.filter_map(|field| {
154+
if field.computed_expr.is_some() {
155+
return None;
156+
}
157+
mutation
158+
.bind_context
159+
.columns
160+
.iter()
161+
.find(|binding| {
162+
binding.table_index == Some(mutation.target_table_index)
163+
&& binding.column_name == field.name
164+
})
165+
.cloned()
166+
})
167+
.collect::<Vec<_>>();
168+
169+
let used_columns = mutation
170+
.matched_evaluators
171+
.iter()
172+
.flat_map(|eval| {
173+
eval.update.iter().flat_map(|update| {
174+
update
175+
.values()
176+
.flat_map(|expr| expr.used_columns().into_iter())
177+
})
178+
})
179+
.chain(mutation.required_columns.iter().copied())
180+
.collect::<HashSet<_>>();
181+
182+
let used_columns = used_columns
183+
.difference(&fields_bindings.iter().map(|column| column.index).collect())
184+
.copied()
185+
.collect::<HashSet<_>>();
186+
187+
let aggr_columns = used_columns
188+
.iter()
189+
.copied()
190+
.filter_map(|i| {
191+
if i == mutation.row_id_index {
192+
return None;
193+
}
194+
195+
let binding = mutation
196+
.bind_context
197+
.columns
198+
.iter()
199+
.find(|binding| binding.index == i)
200+
.cloned()?;
201+
202+
let display_name = format!("any({})", binding.index);
203+
let old = binding.index;
204+
let mut aggr_func = ScalarExpr::AggregateFunction(AggregateFunction {
205+
span: None,
206+
func_name: "any".to_string(),
207+
distinct: false,
208+
params: vec![],
209+
args: vec![ScalarExpr::BoundColumnRef(BoundColumnRef {
210+
span: None,
211+
column: binding.clone(),
212+
})],
213+
return_type: binding.data_type.clone(),
214+
sort_descs: vec![],
215+
display_name: display_name.clone(),
216+
});
217+
218+
let mut rewriter =
219+
AggregateRewriter::new(&mut mutation.bind_context, self.metadata.clone());
220+
rewriter.visit(&mut aggr_func).unwrap();
221+
222+
let new = mutation
223+
.bind_context
224+
.aggregate_info
225+
.get_aggregate_function(&display_name)
226+
.unwrap()
227+
.index;
228+
229+
Some((aggr_func, old, new))
230+
})
231+
.collect::<Vec<_>>();
232+
233+
mutation.bind_context.aggregate_info.group_items = fields_bindings
234+
.into_iter()
235+
.chain(std::iter::once(row_id))
236+
.map(|column| ScalarItem {
237+
index: column.index,
238+
scalar: ScalarExpr::BoundColumnRef(BoundColumnRef { span: None, column }),
239+
})
240+
.collect();
241+
242+
for eval in &mut mutation.matched_evaluators {
243+
if let Some(expr) = &mut eval.condition {
244+
for (_, old, new) in &aggr_columns {
245+
expr.replace_column(*old, *new)?
246+
}
247+
}
248+
249+
if let Some(update) = &mut eval.update {
250+
for (_, expr) in update.iter_mut() {
251+
for (_, old, new) in &aggr_columns {
252+
expr.replace_column(*old, *new)?
253+
}
254+
}
255+
}
256+
}
257+
258+
for (_, column) in mutation.field_index_map.iter_mut() {
259+
if let Some((_, _, index)) = aggr_columns
260+
.iter()
261+
.find(|(_, i, _)| i.to_string() == *column)
262+
{
263+
*column = index.to_string()
264+
};
265+
}
266+
267+
mutation.required_columns = Box::new(
268+
std::iter::once(mutation.row_id_index)
269+
.chain(aggr_columns.into_iter().map(|(_, _, i)| i))
270+
.collect(),
271+
);
272+
273+
let aggr_expr = self.bind_aggregate(&mut mutation.bind_context, (**filter_expr).clone())?;
274+
275+
let s_expr = SExpr::create_unary(
276+
Arc::new(RelOperator::Mutation(mutation)),
277+
Arc::new(aggr_expr),
278+
);
279+
280+
let Plan::DataMutation {
281+
schema, metadata, ..
282+
} = plan
283+
else {
284+
unreachable!()
285+
};
286+
287+
let plan = Plan::DataMutation {
288+
s_expr: Box::new(s_expr),
289+
schema,
290+
metadata,
291+
};
292+
293+
Ok(plan)
97294
}
98295
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
statement ok
2+
CREATE OR REPLACE TABLE old_products (
3+
product_id INT,
4+
category_code VARCHAR(50),
5+
price DECIMAL(10, 2),
6+
stock INT
7+
);
8+
9+
statement ok
10+
INSERT INTO old_products (product_id, category_code, price, stock) VALUES
11+
(1, 'ELEC', 299.99, 50),
12+
(2, 'ELEC', 399.99, 30),
13+
(3, 'FURN', 199.99, 20),
14+
(4, 'FURN', 89.99, 100);
15+
16+
statement ok
17+
CREATE OR REPLACE TABLE new_products (
18+
product_id INT,
19+
category_code VARCHAR(50),
20+
new_price DECIMAL(10, 2),
21+
new_stock INT
22+
);
23+
24+
statement ok
25+
INSERT INTO new_products (product_id, category_code, new_price, new_stock) VALUES
26+
(1, 'ELEC', 279.99, 55),
27+
(2, 'ELEC', 399.99, 25),
28+
(3, 'FURN', 189.99, 22),
29+
(4, 'FURN', 99.99, 50),
30+
(4, 'FURN', 39.99, 60);
31+
32+
statement ok
33+
set error_on_nondeterministic_update = 1;
34+
35+
statement error (?s)4001.*multi rows from source match one and the same row in the target_table multi times
36+
UPDATE old_products o
37+
SET o.price = n.new_price,
38+
o.stock = n.new_stock
39+
FROM new_products n
40+
WHERE o.product_id = n.product_id
41+
AND o.category_code = n.category_code;
42+
43+
statement ok
44+
set error_on_nondeterministic_update = 0;
45+
46+
statement ok
47+
UPDATE old_products o
48+
SET o.price = n.new_price,
49+
o.stock = n.new_stock
50+
FROM new_products n
51+
WHERE o.product_id = n.product_id
52+
AND o.category_code = n.category_code;
53+
54+
query ITRI
55+
select * from old_products order by product_id;
56+
----
57+
1 ELEC 279.99 55
58+
2 ELEC 399.99 25
59+
3 FURN 189.99 22
60+
4 FURN 99.99 50
61+
62+
statement ok
63+
drop table old_products
64+
65+
statement ok
66+
drop table new_products
67+
68+
include ../issues/issue_15278.test
69+
70+
include ./03_0035_update.test
71+
72+
statement ok
73+
unset error_on_nondeterministic_update;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
statement ok
2+
set error_on_nondeterministic_update = 0;
3+
4+
include ./update.test
5+
6+
statement ok
7+
unset error_on_nondeterministic_update;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
statement ok
2+
set error_on_nondeterministic_update = 0;
3+
4+
include ./update_cte.test
5+
6+
statement ok
7+
unset error_on_nondeterministic_update;

0 commit comments

Comments
 (0)