Skip to content

Commit 5d9d372

Browse files
authored
fix: make create lock revision idempotent (#18576)
* fix: make create lock revision idempotent * update
1 parent ab86edb commit 5d9d372

File tree

3 files changed

+33
-3
lines changed

3 files changed

+33
-3
lines changed

src/meta/api/src/schema_api_impl.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2957,6 +2957,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
29572957
let lock_key = &req.lock_key;
29582958
let id_generator = IdGenerator::table_lock_id();
29592959

2960+
// ensure idempotent, reference from update_multi_table_meta.
2961+
let txn_id = Uuid::new_v4().to_string();
2962+
let txn_id_key = format!("_txn_id/{}", txn_id);
2963+
29602964
let mut trials = txn_backoff(None, ctx);
29612965
loop {
29622966
trials.next().unwrap()?.await;
@@ -2978,17 +2982,35 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
29782982
txn_cond_seq(&id_generator, Eq, current_rev),
29792983
// assumes lock are absent.
29802984
txn_cond_seq(&key, Eq, 0),
2985+
TxnCondition::eq_seq(txn_id_key.clone(), 0),
29812986
];
29822987
let if_then = vec![
29832988
txn_op_put(&id_generator, b"".to_vec()),
29842989
txn_op_put_pb(&key, &lock_meta, Some(req.ttl))?,
2990+
TxnOp::put_with_ttl(txn_id_key.clone(), vec![], Some(req.ttl)),
29852991
];
2986-
let txn_req = TxnRequest::new(condition, if_then);
2987-
let (succ, _responses) = send_txn(self, txn_req).await?;
2992+
let else_then = vec![TxnOp::get(txn_id_key.clone())];
2993+
let txn_req = TxnRequest::new(condition, if_then).with_else(else_then);
2994+
let (succ, responses) = send_txn(self, txn_req).await?;
29882995

29892996
if succ {
29902997
return Ok(CreateLockRevReply { revision });
29912998
}
2999+
3000+
// Check if transaction ID exists in else branch response (idempotency check).
3001+
if let Some(Response::Get(get_resp)) =
3002+
responses.last().and_then(|r| r.response.as_ref())
3003+
{
3004+
// Defensive check, make sure the get(tx_id_key) is the last operation
3005+
assert_eq!(get_resp.key, txn_id_key, "Transaction ID key mismatch");
3006+
if get_resp.value.is_some() {
3007+
info!(
3008+
"Transaction ID {} exists, the lock revision has been created successfully",
3009+
txn_id_key
3010+
);
3011+
return Ok(CreateLockRevReply { revision });
3012+
}
3013+
}
29923014
}
29933015
}
29943016

src/query/service/src/locks/lock_holder.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl LockHolder {
103103

104104
catalog.extend_lock_revision(extend_table_lock_req).await?;
105105
// metrics.
106-
record_acquired_lock_nums(lock_type, 1);
106+
record_acquired_lock_nums(lock_type.clone(), 1);
107107
break;
108108
}
109109

@@ -156,6 +156,13 @@ impl LockHolder {
156156
}
157157
}
158158

159+
log::info!(
160+
"Acquired table lock successfully(table_id: {}, lock_type: {}, revision: {}, elapsed: {:?})",
161+
table_id,
162+
lock_type,
163+
revision,
164+
start.elapsed()
165+
);
159166
Ok(revision)
160167
}
161168

src/query/service/src/locks/lock_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ impl LockManager {
104104
Ok(revision) => {
105105
self.insert_lock(revision, lock_holder);
106106
let guard = LockGuard::new(self.clone(), revision);
107+
107108
Ok(Some(Arc::new(guard)))
108109
}
109110
Err(err) => {

0 commit comments

Comments
 (0)