Skip to content

Commit 0811612

Browse files
authored
refactor: add IdempotentKVTxnSender (#18585)
* refactor: add IdempotentKVTxnSender - make kv txn of `commit_table_meta` idempotent - refactor `update_multi_table_meta` and `create_lock_revision` by using IdempotentKVTxnSender * refine comments * refine comments
1 parent a3ad46d commit 0811612

File tree

4 files changed

+195
-77
lines changed

4 files changed

+195
-77
lines changed

src/meta/api/src/schema_api.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ use databend_common_proto_conv::FromToProto;
105105
use crate::errors::TableError;
106106
use crate::kv_app_error::KVAppError;
107107
use crate::meta_txn_error::MetaTxnError;
108+
use crate::util::IdempotentKVTxnSender;
108109

109110
/// SchemaApi defines APIs that provides schema storage, such as database, table.
110111
#[async_trait::async_trait]
@@ -276,10 +277,10 @@ pub trait SchemaApi: Send + Sync {
276277
req: UpdateMultiTableMetaReq,
277278
) -> Result<UpdateMultiTableMetaResult, KVAppError>;
278279

279-
async fn update_multi_table_meta_with_txn_id(
280+
async fn update_multi_table_meta_with_sender(
280281
&self,
281282
req: UpdateMultiTableMetaReq,
282-
txn_id: String,
283+
kv_txn_sender: &IdempotentKVTxnSender,
283284
) -> Result<UpdateMultiTableMetaResult, KVAppError>;
284285

285286
async fn set_table_column_mask_policy(

src/meta/api/src/schema_api_impl.rs

Lines changed: 44 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ use databend_common_meta_types::MatchSeqExt;
193193
use databend_common_meta_types::MetaError;
194194
use databend_common_meta_types::MetaId;
195195
use databend_common_meta_types::SeqV;
196-
use databend_common_meta_types::TxnCondition;
197196
use databend_common_meta_types::TxnGetRequest;
198197
use databend_common_meta_types::TxnGetResponse;
199198
use databend_common_meta_types::TxnOp;
@@ -240,6 +239,8 @@ use crate::util::txn_op_put_pb;
240239
use crate::util::txn_put_pb;
241240
use crate::util::txn_replace_exact;
242241
use crate::util::unknown_database_error;
242+
use crate::util::IdempotentKVTxnResponse;
243+
use crate::util::IdempotentKVTxnSender;
243244
use crate::SchemaApi;
244245
use crate::DEFAULT_MGET_SIZE;
245246

@@ -1783,6 +1784,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
17831784

17841785
let tenant_dbname_tbname = &req.name_ident;
17851786

1787+
let txn_sender = IdempotentKVTxnSender::new();
1788+
17861789
let mut trials = txn_backoff(None, func_name!());
17871790
loop {
17881791
trials.next().unwrap()?.await;
@@ -1917,7 +1920,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
19171920
],
19181921
);
19191922

1920-
let (succ, _responses) = send_txn(self, txn_req).await?;
1923+
let txn_response = txn_sender.send_txn(self, txn_req).await?;
1924+
let succ = match txn_response {
1925+
IdempotentKVTxnResponse::Success(_) => true,
1926+
IdempotentKVTxnResponse::AlreadyCommitted => {
1927+
info!( "Transaction ID {} exists, the corresponding commit_table_meta transaction has been executed successfully", txn_sender.get_txn_id() );
1928+
true
1929+
}
1930+
IdempotentKVTxnResponse::Failed(_) => false,
1931+
};
19211932

19221933
debug!(
19231934
name :? =(tenant_dbname_tbname),
@@ -2113,20 +2124,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
21132124
&self,
21142125
req: UpdateMultiTableMetaReq,
21152126
) -> Result<UpdateMultiTableMetaResult, KVAppError> {
2116-
// Generate a random transaction ID for idempotency
2117-
let txn_id = Uuid::new_v4().to_string();
2118-
self.update_multi_table_meta_with_txn_id(req, txn_id).await
2127+
let kv_txn_sender = IdempotentKVTxnSender::new();
2128+
self.update_multi_table_meta_with_sender(req, &kv_txn_sender)
2129+
.await
21192130
}
21202131

2121-
/// This function is ONLY for testing purposes.
2122-
/// In production environment, use `update_multi_table_meta` instead.
2123-
///
2124-
/// `retry_times` is used to simulate the retry of the transaction.
2125-
/// It is only for test.
2126-
async fn update_multi_table_meta_with_txn_id(
2132+
async fn update_multi_table_meta_with_sender(
21272133
&self,
21282134
req: UpdateMultiTableMetaReq,
2129-
txn_id: String,
2135+
txn_sender: &IdempotentKVTxnSender,
21302136
) -> Result<UpdateMultiTableMetaResult, KVAppError> {
21312137
let UpdateMultiTableMetaReq {
21322138
mut update_table_metas,
@@ -2303,47 +2309,27 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
23032309
.push(build_upsert_table_deduplicated_label(deduplicated_label));
23042310
}
23052311

2306-
// Add transaction ID to the transaction with 5-minute expiration
2307-
let txn_id_key = format!("_txn_id/{}", txn_id);
2308-
2309-
// Add condition to check that transaction ID does not exist (empty)
2310-
txn.condition
2311-
.push(TxnCondition::eq_seq(txn_id_key.clone(), 0));
2312-
2313-
txn.if_then.push(TxnOp::put_with_ttl(
2314-
txn_id_key.clone(),
2315-
vec![],
2316-
Some(Duration::from_secs(300)),
2317-
));
2318-
2319-
// Add get operation to check if transaction ID exists in else branch
2320-
// NOTE: Orders of operations matter, please keep this as the last operation of else branch
2321-
txn.else_then.push(TxnOp::get(txn_id_key.clone()));
2312+
let txn_response = txn_sender.send_txn(self, txn).await?;
23222313

2323-
let (succ, responses) = send_txn(self, txn).await?;
2324-
2325-
if succ {
2326-
return Ok(Ok(UpdateTableMetaReply {}));
2327-
}
2328-
2329-
// Check if transaction ID exists in else branch response (idempotency check)
2330-
//
2331-
// Please note that this is a best effort check: the "idempotency check" is only guaranteed
2332-
// roughly within 300 secs, please DO NOT use it for any safety properties.
2333-
if let Some(Response::Get(get_resp)) = responses.last().and_then(|r| r.response.as_ref()) {
2334-
// Defensive check, make sure the get(tx_id_key) is the last operation
2335-
assert_eq!(get_resp.key, txn_id_key, "Transaction ID key mismatch");
2336-
if get_resp.value.is_some() {
2314+
let else_branch_op_responses = match txn_response {
2315+
IdempotentKVTxnResponse::Success(_) => {
2316+
return Ok(Ok(UpdateTableMetaReply {}));
2317+
}
2318+
IdempotentKVTxnResponse::AlreadyCommitted => {
23372319
info!(
2338-
"Transaction ID {} exists, the corresponding transaction has been executed successfully",
2339-
txn_id_key
2320+
"Transaction ID {} exists, the corresponding update_multi_table_meta transaction has been executed successfully",
2321+
txn_sender.get_txn_id()
23402322
);
23412323
return Ok(Ok(UpdateTableMetaReply {}));
23422324
}
2343-
}
2325+
IdempotentKVTxnResponse::Failed(op_responses) => op_responses,
2326+
};
23442327

23452328
let mut mismatched_tbs = vec![];
2346-
for (resp, req) in responses.iter().zip(update_table_metas.iter()) {
2329+
for (resp, req) in else_branch_op_responses
2330+
.iter()
2331+
.zip(update_table_metas.iter())
2332+
{
23472333
let Some(Response::Get(get_resp)) = &resp.response else {
23482334
unreachable!(
23492335
"internal error: expect some TxnGetResponseGet, but got {:?}",
@@ -2957,11 +2943,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
29572943
let lock_key = &req.lock_key;
29582944
let id_generator = IdGenerator::table_lock_id();
29592945

2960-
// ensure idempotent, reference from update_multi_table_meta.
2961-
let txn_id = Uuid::new_v4().to_string();
2962-
let txn_id_key = format!("_txn_id/{}", txn_id);
2963-
29642946
let mut trials = txn_backoff(None, ctx);
2947+
let txn_sender = IdempotentKVTxnSender::with_ttl(req.ttl);
29652948
loop {
29662949
trials.next().unwrap()?.await;
29672950

@@ -2982,34 +2965,27 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
29822965
txn_cond_seq(&id_generator, Eq, current_rev),
29832966
// assumes lock are absent.
29842967
txn_cond_seq(&key, Eq, 0),
2985-
TxnCondition::eq_seq(txn_id_key.clone(), 0),
29862968
];
29872969
let if_then = vec![
29882970
txn_op_put(&id_generator, b"".to_vec()),
29892971
txn_op_put_pb(&key, &lock_meta, Some(req.ttl))?,
2990-
TxnOp::put_with_ttl(txn_id_key.clone(), vec![], Some(req.ttl)),
29912972
];
2992-
let else_then = vec![TxnOp::get(txn_id_key.clone())];
2993-
let txn_req = TxnRequest::new(condition, if_then).with_else(else_then);
2994-
let (succ, responses) = send_txn(self, txn_req).await?;
2995-
2996-
if succ {
2997-
return Ok(CreateLockRevReply { revision });
2998-
}
2999-
3000-
// Check if transaction ID exists in else branch response (idempotency check).
3001-
if let Some(Response::Get(get_resp)) =
3002-
responses.last().and_then(|r| r.response.as_ref())
3003-
{
3004-
// Defensive check, make sure the get(tx_id_key) is the last operation
3005-
assert_eq!(get_resp.key, txn_id_key, "Transaction ID key mismatch");
3006-
if get_resp.value.is_some() {
2973+
let txn_req = TxnRequest::new(condition, if_then);
2974+
let txn_response = txn_sender.send_txn(self, txn_req).await?;
2975+
match txn_response {
2976+
IdempotentKVTxnResponse::Success(_) => {
2977+
return Ok(CreateLockRevReply { revision });
2978+
}
2979+
IdempotentKVTxnResponse::AlreadyCommitted => {
30072980
info!(
30082981
"Transaction ID {} exists, the lock revision has been created successfully",
3009-
txn_id_key
2982+
txn_sender.get_txn_id()
30102983
);
30112984
return Ok(CreateLockRevReply { revision });
30122985
}
2986+
_ => {
2987+
// continue looping
2988+
}
30132989
}
30142990
}
30152991
}

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use std::vec;
2323
use chrono::DateTime;
2424
use chrono::Duration;
2525
use chrono::Utc;
26-
use databend_common_base::base::uuid::Uuid;
2726
use databend_common_base::runtime::Runtime;
2827
use databend_common_base::runtime::TrySpawn;
2928
use databend_common_exception::ErrorCode;
@@ -155,6 +154,7 @@ use crate::kv_pb_api::UpsertPB;
155154
use crate::serialize_struct;
156155
use crate::testing::get_kv_data;
157156
use crate::testing::get_kv_u64_data;
157+
use crate::util::IdempotentKVTxnSender;
158158
use crate::DatamaskApi;
159159
use crate::RowAccessPolicyApi;
160160
use crate::SchemaApi;
@@ -2485,6 +2485,7 @@ impl SchemaApiTestSuite {
24852485
}
24862486

24872487
info!("--- update table meta: simulate kv txn retried after commit");
2488+
let txn_sender = IdempotentKVTxnSender::new();
24882489
{
24892490
// 1. Update test table, bump table version, expect success -- just the normal case
24902491
let table = util.get_table().await.unwrap();
@@ -2498,14 +2499,13 @@ impl SchemaApiTestSuite {
24982499
new_table_meta: new_table_meta.clone(),
24992500
base_snapshot_location: None,
25002501
};
2501-
let uuid = Uuid::new_v4();
25022502
let res = mt
2503-
.update_multi_table_meta_with_txn_id(
2503+
.update_multi_table_meta_with_sender(
25042504
UpdateMultiTableMetaReq {
25052505
update_table_metas: vec![(req.clone(), table.as_ref().clone())],
25062506
..Default::default()
25072507
},
2508-
uuid.to_string(),
2508+
&txn_sender,
25092509
)
25102510
.await?;
25112511

@@ -2530,13 +2530,13 @@ impl SchemaApiTestSuite {
25302530
// Expects no KV api level error, and no app level error.
25312531
// For the convenience of reviewing, using explict type signature
25322532
let _r: databend_common_meta_app::schema::UpdateTableMetaReply = mt
2533-
.update_multi_table_meta_with_txn_id(
2533+
.update_multi_table_meta_with_sender(
25342534
UpdateMultiTableMetaReq {
25352535
update_table_metas: vec![(req, table.as_ref().clone())],
25362536
..Default::default()
25372537
},
2538-
// USING THE SAME UUID as kv txn_id
2539-
uuid.to_string(),
2538+
// USING THE SAME KVTxnSender
2539+
&txn_sender,
25402540
)
25412541
.await?
25422542
.unwrap();

0 commit comments

Comments
 (0)