Skip to content

Commit 7e169a7

Browse files
fix: make update table meta idempotent (#18466)
* fix: make update table meta idempotent * add ut * refine * fix * rename func * polish unit test --------- Co-authored-by: dantengsky <[email protected]>
1 parent f017809 commit 7e169a7

File tree

3 files changed

+132
-1
lines changed

3 files changed

+132
-1
lines changed

src/meta/api/src/schema_api.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,12 @@ pub trait SchemaApi: Send + Sync {
273273
req: UpdateMultiTableMetaReq,
274274
) -> Result<UpdateMultiTableMetaResult, KVAppError>;
275275

276+
async fn update_multi_table_meta_with_txn_id(
277+
&self,
278+
req: UpdateMultiTableMetaReq,
279+
txn_id: String,
280+
) -> Result<UpdateMultiTableMetaResult, KVAppError>;
281+
276282
async fn set_table_column_mask_policy(
277283
&self,
278284
req: SetTableColumnMaskPolicyReq,

src/meta/api/src/schema_api_impl.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ use databend_common_meta_types::MatchSeqExt;
187187
use databend_common_meta_types::MetaError;
188188
use databend_common_meta_types::MetaId;
189189
use databend_common_meta_types::SeqV;
190+
use databend_common_meta_types::TxnCondition;
190191
use databend_common_meta_types::TxnGetRequest;
191192
use databend_common_meta_types::TxnGetResponse;
192193
use databend_common_meta_types::TxnOp;
@@ -2104,6 +2105,21 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
21042105
async fn update_multi_table_meta(
21052106
&self,
21062107
req: UpdateMultiTableMetaReq,
2108+
) -> Result<UpdateMultiTableMetaResult, KVAppError> {
2109+
// Generate a random transaction ID for idempotency
2110+
let txn_id = Uuid::new_v4().to_string();
2111+
self.update_multi_table_meta_with_txn_id(req, txn_id).await
2112+
}
2113+
2114+
/// This function is ONLY for testing purposes.
2115+
/// In production environment, use `update_multi_table_meta` instead.
2116+
///
2117+
/// `retry_times` is used to simulate the retry of the transaction.
2118+
/// It is only for test.
2119+
async fn update_multi_table_meta_with_txn_id(
2120+
&self,
2121+
req: UpdateMultiTableMetaReq,
2122+
txn_id: String,
21072123
) -> Result<UpdateMultiTableMetaResult, KVAppError> {
21082124
let UpdateMultiTableMetaReq {
21092125
mut update_table_metas,
@@ -2279,10 +2295,46 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
22792295
txn.if_then
22802296
.push(build_upsert_table_deduplicated_label(deduplicated_label));
22812297
}
2298+
2299+
// Add transaction ID to the transaction with 5-minute expiration
2300+
let txn_id_key = format!("_txn_id/{}", txn_id);
2301+
2302+
// Add condition to check that transaction ID does not exist (empty)
2303+
txn.condition
2304+
.push(TxnCondition::eq_seq(txn_id_key.clone(), 0));
2305+
2306+
txn.if_then.push(TxnOp::put_with_ttl(
2307+
txn_id_key.clone(),
2308+
vec![],
2309+
Some(Duration::from_secs(300)),
2310+
));
2311+
2312+
// Add get operation to check if transaction ID exists in else branch
2313+
// NOTE: Orders of operations matter, please keep this as the last operation of else branch
2314+
txn.else_then.push(TxnOp::get(txn_id_key.clone()));
2315+
22822316
let (succ, responses) = send_txn(self, txn).await?;
2317+
22832318
if succ {
22842319
return Ok(Ok(UpdateTableMetaReply {}));
22852320
}
2321+
2322+
// Check if transaction ID exists in else branch response (idempotency check)
2323+
//
2324+
// Please note that this is a best effort check: the "idempotency check" is only guaranteed
2325+
// roughly within 300 secs, please DO NOT use it for any safety properties.
2326+
if let Some(Response::Get(get_resp)) = responses.last().and_then(|r| r.response.as_ref()) {
2327+
// Defensive check, make sure the get(tx_id_key) is the last operation
2328+
assert_eq!(get_resp.key, txn_id_key, "Transaction ID key mismatch");
2329+
if get_resp.value.is_some() {
2330+
info!(
2331+
"Transaction ID {} exists, the corresponding transaction has been executed successfully",
2332+
txn_id_key
2333+
);
2334+
return Ok(Ok(UpdateTableMetaReply {}));
2335+
}
2336+
}
2337+
22862338
let mut mismatched_tbs = vec![];
22872339
for (resp, req) in responses.iter().zip(update_table_metas.iter()) {
22882340
let Some(Response::Get(get_resp)) = &resp.response else {

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::vec;
2323
use chrono::DateTime;
2424
use chrono::Duration;
2525
use chrono::Utc;
26+
use databend_common_base::base::uuid::Uuid;
2627
use databend_common_base::runtime::Runtime;
2728
use databend_common_base::runtime::TrySpawn;
2829
use databend_common_exception::ErrorCode;
@@ -2605,6 +2606,7 @@ impl SchemaApiTestSuite {
26052606
new_table_meta: new_table_meta.clone(),
26062607
base_snapshot_location: None,
26072608
};
2609+
26082610
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
26092611
update_table_metas: vec![(req, table.as_ref().clone())],
26102612
..Default::default()
@@ -2643,10 +2645,81 @@ impl SchemaApiTestSuite {
26432645
.await?;
26442646

26452647
let err = res.unwrap_err();
2646-
26472648
assert!(!err.is_empty());
26482649
}
26492650

2651+
info!("--- update table meta: simulate kv txn retried after commit");
2652+
{
2653+
// 1. Update test table, bump table version, expect success -- just the normal case
2654+
let table = mt
2655+
.get_table((tenant_name, "db1", "tb2").into())
2656+
.await
2657+
.unwrap();
2658+
2659+
let new_table_meta = table.meta.clone();
2660+
let table_id = table.ident.table_id;
2661+
let table_version = table.ident.seq;
2662+
let req = UpdateTableMetaReq {
2663+
table_id,
2664+
seq: MatchSeq::Exact(table_version),
2665+
new_table_meta: new_table_meta.clone(),
2666+
base_snapshot_location: None,
2667+
};
2668+
let uuid = Uuid::new_v4();
2669+
let res = mt
2670+
.update_multi_table_meta_with_txn_id(
2671+
UpdateMultiTableMetaReq {
2672+
update_table_metas: vec![(req.clone(), table.as_ref().clone())],
2673+
..Default::default()
2674+
},
2675+
uuid.to_string(),
2676+
)
2677+
.await?;
2678+
2679+
assert!(res.is_ok());
2680+
2681+
// 2. Update test table again
2682+
// Simulate duplicated kv_txn by using the same kv txn_id.
2683+
// Expects:
2684+
// - `update_multi_table_meta_with_txn_id` returns NO error,
2685+
// - the table meta shou NOT be updated
2686+
2687+
let table = mt
2688+
.get_table((tenant_name, "db1", "tb2").into())
2689+
.await
2690+
.unwrap();
2691+
2692+
let table_version = table.ident.seq;
2693+
let mut req = req.clone();
2694+
// Using a fresh table version(seq), otherwise it will fail eagerly during the
2695+
// version checking phase of `update_multi_table_meta_with_txn_id`
2696+
req.seq = MatchSeq::Exact(table_version);
2697+
let prev_table_meta = table.meta.clone();
2698+
// Change the comment, this modification should not be committed
2699+
req.new_table_meta.comment = "some new comment".to_string();
2700+
// Expects no KV api level error, and no app level error.
2701+
// For the convenience of reviewing, using explict type signature
2702+
let _r: databend_common_meta_app::schema::UpdateTableMetaReply = mt
2703+
.update_multi_table_meta_with_txn_id(
2704+
UpdateMultiTableMetaReq {
2705+
update_table_metas: vec![(req, table.as_ref().clone())],
2706+
..Default::default()
2707+
},
2708+
// USING THE SAME UUID as kv txn_id
2709+
uuid.to_string(),
2710+
)
2711+
.await?
2712+
.unwrap();
2713+
2714+
let table = mt
2715+
.get_table((tenant_name, "db1", "tb2").into())
2716+
.await
2717+
.unwrap();
2718+
2719+
// The new_table_meta should NOT be committed
2720+
assert_eq!(table.meta, prev_table_meta);
2721+
}
2722+
26502723
info!("--- update table meta, with upsert file req");
26512724
{
26522725
let table = mt

0 commit comments

Comments
 (0)