Skip to content

Commit 6cfa8ad

Browse files
authored
feat: enhancing concurrent conflict resolution capability for multi-statement transactions (#17984)
* impl segments diff * support resolve conflict * fix ut * fix concurrently consume stream
1 parent af7ac4c commit 6cfa8ad

File tree

22 files changed

+629
-133
lines changed

22 files changed

+629
-133
lines changed

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2611,6 +2611,7 @@ impl SchemaApiTestSuite {
26112611
table_id,
26122612
seq: MatchSeq::Exact(table_version),
26132613
new_table_meta: new_table_meta.clone(),
2614+
base_snapshot_location: None,
26142615
};
26152616
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
26162617
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2640,6 +2641,7 @@ impl SchemaApiTestSuite {
26402641
table_id,
26412642
seq: MatchSeq::Exact(table_version + 1),
26422643
new_table_meta: new_table_meta.clone(),
2644+
base_snapshot_location: None,
26432645
};
26442646
let res = mt
26452647
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2687,6 +2689,7 @@ impl SchemaApiTestSuite {
26872689
table_id,
26882690
seq: MatchSeq::Exact(table_version),
26892691
new_table_meta: new_table_meta.clone(),
2692+
base_snapshot_location: None,
26902693
};
26912694
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
26922695
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2736,6 +2739,7 @@ impl SchemaApiTestSuite {
27362739
table_id,
27372740
seq: MatchSeq::Exact(table_version),
27382741
new_table_meta: new_table_meta.clone(),
2742+
base_snapshot_location: None,
27392743
};
27402744
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
27412745
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2785,6 +2789,7 @@ impl SchemaApiTestSuite {
27852789
table_id,
27862790
seq: MatchSeq::Exact(table_version),
27872791
new_table_meta: new_table_meta.clone(),
2792+
base_snapshot_location: None,
27882793
};
27892794
let result = mt
27902795
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2829,6 +2834,7 @@ impl SchemaApiTestSuite {
28292834
table_id,
28302835
seq: MatchSeq::Exact(table_version),
28312836
new_table_meta: new_table_meta.clone(),
2837+
base_snapshot_location: None,
28322838
};
28332839
let err = mt
28342840
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2876,6 +2882,7 @@ impl SchemaApiTestSuite {
28762882
table_id,
28772883
seq: MatchSeq::Exact(table_version),
28782884
new_table_meta: new_table_meta.clone(),
2885+
base_snapshot_location: None,
28792886
};
28802887
let _ = mt
28812888
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2933,6 +2940,7 @@ impl SchemaApiTestSuite {
29332940
table_id,
29342941
seq: MatchSeq::Exact(table_version),
29352942
new_table_meta: new_table_meta.clone(),
2943+
base_snapshot_location: None,
29362944
};
29372945
let err = mt
29382946
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -3801,6 +3809,7 @@ impl SchemaApiTestSuite {
38013809
table_id,
38023810
seq: MatchSeq::Any,
38033811
new_table_meta: table_meta.clone(),
3812+
base_snapshot_location: None,
38043813
};
38053814

38063815
let table = mt
@@ -3963,6 +3972,7 @@ impl SchemaApiTestSuite {
39633972
table_id,
39643973
seq: MatchSeq::Any,
39653974
new_table_meta: create_table_meta.clone(),
3975+
base_snapshot_location: None,
39663976
};
39673977

39683978
let table = mt
@@ -5934,6 +5944,7 @@ impl SchemaApiTestSuite {
59345944
table_id,
59355945
seq: MatchSeq::Any,
59365946
new_table_meta: table_meta(created_on),
5947+
base_snapshot_location: None,
59375948
};
59385949

59395950
let table = mt
@@ -5984,6 +5995,7 @@ impl SchemaApiTestSuite {
59845995
table_id,
59855996
seq: MatchSeq::Any,
59865997
new_table_meta: table_meta(created_on),
5998+
base_snapshot_location: None,
59875999
};
59886000

59896001
let table = mt
@@ -7553,6 +7565,7 @@ impl SchemaApiTestSuite {
75537565
table_id,
75547566
seq: MatchSeq::Any,
75557567
new_table_meta: table_meta(created_on),
7568+
base_snapshot_location: None,
75567569
};
75577570

75587571
let table = mt
@@ -7611,6 +7624,7 @@ impl SchemaApiTestSuite {
76117624
table_id,
76127625
seq: MatchSeq::Any,
76137626
new_table_meta: table_meta(created_on),
7627+
base_snapshot_location: None,
76147628
};
76157629

76167630
let table = mt
@@ -7666,6 +7680,7 @@ impl SchemaApiTestSuite {
76667680
table_id,
76677681
seq: MatchSeq::Any,
76687682
new_table_meta: table_meta(created_on),
7683+
base_snapshot_location: None,
76697684
};
76707685

76717686
let table = mt
@@ -8083,6 +8098,7 @@ where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError>
80838098
table_id: self.table_id,
80848099
seq: MatchSeq::Any,
80858100
new_table_meta: self.table_meta(),
8101+
base_snapshot_location: None,
80868102
};
80878103

80888104
let req = UpdateMultiTableMetaReq {

src/meta/app/src/schema/table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,7 @@ pub struct UpdateTableMetaReq {
807807
pub table_id: u64,
808808
pub seq: MatchSeq,
809809
pub new_table_meta: TableMeta,
810+
pub base_snapshot_location: Option<String>,
810811
}
811812

812813
#[derive(Clone, Debug, PartialEq, Eq)]

src/query/service/src/interpreters/common/stream.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ pub async fn query_build_update_stream_req(
130130
table_id: stream_info.ident.table_id,
131131
seq: MatchSeq::Exact(stream_info.ident.seq),
132132
new_table_meta,
133+
base_snapshot_location: None,
133134
},
134135
stream_info.clone(),
135136
));

src/query/service/src/interpreters/interpreter_cluster_key_alter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ impl Interpreter for AlterTableClusterKeyInterpreter {
8686
table_id: table_info.ident.table_id,
8787
seq: MatchSeq::Exact(table_info.ident.seq),
8888
new_table_meta,
89+
base_snapshot_location: fuse_table.snapshot_loc(),
8990
};
9091
catalog.update_single_table_meta(req, table_info).await?;
9192

src/query/service/src/interpreters/interpreter_cluster_key_drop.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ impl Interpreter for DropTableClusterKeyInterpreter {
7474
table_id: table_info.ident.table_id,
7575
seq: MatchSeq::Exact(table_info.ident.seq),
7676
new_table_meta,
77+
base_snapshot_location: fuse_table.snapshot_loc(),
7778
};
7879
catalog.update_single_table_meta(req, table_info).await?;
7980

src/query/service/src/interpreters/interpreter_table_add_column.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ pub(crate) async fn commit_table_meta(
216216
table_id,
217217
seq: MatchSeq::Exact(table_version),
218218
new_table_meta: new_table_meta.clone(),
219+
base_snapshot_location: fuse_tbl.snapshot_loc(),
219220
};
220221

221222
catalog.update_single_table_meta(req, table_info).await?;

src/query/service/src/interpreters/interpreter_table_modify_column.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,7 @@ impl ModifyTableColumnInterpreter {
502502

503503
let table_info = table.get_table_info();
504504
let schema = table.schema();
505+
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
505506
let new_schema = if let Some((i, field)) = schema.column_with_name(&column) {
506507
match field.computed_expr {
507508
Some(ComputedExpr::Stored(_)) => {}
@@ -534,6 +535,7 @@ impl ModifyTableColumnInterpreter {
534535
table_id,
535536
seq: MatchSeq::Exact(table_version),
536537
new_table_meta,
538+
base_snapshot_location: fuse_table.snapshot_loc(),
537539
};
538540

539541
let _resp = catalog.update_single_table_meta(req, table_info).await?;

src/query/service/src/interpreters/interpreter_txn_commit.rs

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
use std::sync::Arc;
1616
use std::time::Instant;
1717

18-
use databend_common_exception::ErrorCode;
1918
use databend_common_exception::Result;
2019
use databend_common_meta_app::principal::StageInfo;
2120
use databend_common_metrics::storage::metrics_inc_copy_purge_files_cost_milliseconds;
2221
use databend_common_metrics::storage::metrics_inc_copy_purge_files_counter;
2322
use databend_common_storage::init_stage_operator;
23+
use databend_common_storages_fuse::commit_with_backoff;
2424
use databend_common_storages_fuse::TableContext;
2525
use databend_storages_common_io::Files;
2626
use databend_storages_common_session::TxnManagerRef;
@@ -76,57 +76,9 @@ pub async fn execute_commit_statement(ctx: Arc<dyn TableContext>) -> Result<()>
7676
let _guard = ClearTxnManagerGuard(ctx.txn_mgr().clone());
7777
let is_active = ctx.txn_mgr().lock().is_active();
7878
if is_active {
79-
let catalog = ctx.get_default_catalog()?;
80-
79+
ctx.txn_mgr().lock().set_auto_commit();
8180
let req = ctx.txn_mgr().lock().req();
82-
83-
let update_summary = {
84-
let table_descriptions = req
85-
.update_table_metas
86-
.iter()
87-
.map(|(req, _)| (req.table_id, req.seq, req.new_table_meta.engine.clone()))
88-
.collect::<Vec<_>>();
89-
let stream_descriptions = req
90-
.update_stream_metas
91-
.iter()
92-
.map(|s| (s.stream_id, s.seq, "stream"))
93-
.collect::<Vec<_>>();
94-
(table_descriptions, stream_descriptions)
95-
};
96-
97-
let mismatched_tids = {
98-
ctx.txn_mgr().lock().set_auto_commit();
99-
let ret = catalog.retryable_update_multi_table_meta(req).await;
100-
if let Err(ref e) = ret {
101-
// other errors may occur, especially the version mismatch of streams,
102-
// let's log it here for the convenience of diagnostics
103-
error!(
104-
"Non-recoverable fault occurred during table metadata update: {}",
105-
e
106-
);
107-
}
108-
ret?
109-
};
110-
111-
match &mismatched_tids {
112-
Ok(_) => {
113-
info!(
114-
"Transaction committed successfully, updated targets: {:?}",
115-
update_summary
116-
);
117-
}
118-
Err(e) => {
119-
let err_msg = format!(
120-
"Due to concurrent transactions, explicit transaction commit failed. Conflicting table IDs: {:?}",
121-
e.iter().map(|(tid, _, _)| tid).collect::<Vec<_>>()
122-
);
123-
info!(
124-
"Transaction commit failed due to concurrent modifications. Conflicting table IDs: {:?}",
125-
e
126-
);
127-
return Err(ErrorCode::TableVersionMismatched(format!("{}", err_msg)));
128-
}
129-
}
81+
commit_with_backoff(ctx.clone(), req).await?;
13082
let need_purge_files = ctx.txn_mgr().lock().need_purge_files();
13183
for (stage_info, files) in need_purge_files {
13284
try_purge_files(ctx.clone(), &stage_info, &files).await;

src/query/service/src/test_kits/fuse.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -274,16 +274,17 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
274274
snapshot_2
275275
.write_meta(&operator, &new_snapshot_location)
276276
.await?;
277-
FuseTable::commit_to_meta_server(
278-
fixture.new_query_ctx().await?.as_ref(),
279-
fuse_table.get_table_info(),
280-
location_gen,
281-
snapshot_2,
282-
None,
283-
&None,
284-
&operator,
285-
)
286-
.await
277+
fuse_table
278+
.commit_to_meta_server(
279+
fixture.new_query_ctx().await?.as_ref(),
280+
fuse_table.get_table_info(),
281+
location_gen,
282+
snapshot_2,
283+
None,
284+
&None,
285+
&operator,
286+
)
287+
.await
287288
}
288289

289290
async fn write_v2_to_storage<T>(data_accessor: &Operator, location: &str, meta: &T) -> Result<()>

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -290,16 +290,17 @@ async fn test_commit_to_meta_server() -> Result<()> {
290290
error_injection: self.update_meta_error.clone(),
291291
};
292292
let ctx = Arc::new(CtxDelegation::new(ctx, faked_catalog));
293-
let r = FuseTable::commit_to_meta_server(
294-
ctx.as_ref(),
295-
fuse_table.get_table_info(),
296-
fuse_table.meta_location_generator(),
297-
new_snapshot,
298-
None,
299-
&None,
300-
fuse_table.get_operator_ref(),
301-
)
302-
.await;
293+
let r = fuse_table
294+
.commit_to_meta_server(
295+
ctx.as_ref(),
296+
fuse_table.get_table_info(),
297+
fuse_table.meta_location_generator(),
298+
new_snapshot,
299+
None,
300+
&None,
301+
fuse_table.get_operator_ref(),
302+
)
303+
.await;
303304

304305
if self.update_meta_error.is_some() {
305306
assert_eq!(

0 commit comments

Comments
 (0)