Skip to content

Commit 423e621

Browse files
visualYJDketor
authored andcommitted
[fix][store] Check whether the prewritten transaction has been committed.
1 parent ab91978 commit 423e621

File tree

3 files changed

+168
-6
lines changed

3 files changed

+168
-6
lines changed

dingo-store-proto

src/engine/txn_engine_helper.cc

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,54 @@ butil::Status TxnReader::GetWriteInfo(int64_t min_commit_ts, int64_t max_commit_
262262
return butil::Status::OK();
263263
}
264264

265+
butil::Status TxnReader::CheckCommittedRecord(int64_t start_ts, const std::string &key,
266+
pb::store::WriteInfo &write_info, int64_t &commit_ts, bool &find_record) {
267+
if (!is_initialized_) {
268+
return butil::Status(pb::error::Errno::EINTERNAL, "txn reader is not initialized");
269+
}
270+
271+
IteratorOptions iter_options;
272+
iter_options.lower_bound = mvcc::Codec::EncodeKey(key, Constant::kMaxVer);
273+
iter_options.upper_bound = mvcc::Codec::EncodeKey(key, start_ts);
274+
275+
pb::store::WriteInfo tmp_write_info;
276+
write_iter_->Seek(iter_options.lower_bound);
277+
while (write_iter_->Valid() && write_iter_->Key() <= iter_options.upper_bound) {
278+
if (write_iter_->Key().length() <= 8) {
279+
DINGO_LOG(ERROR) << "invalid write_key, key: " << Helper::StringToHex(write_iter_->Key())
280+
<< ", write_key is less than 8 bytes: " << Helper::StringToHex(write_iter_->Key());
281+
return butil::Status(pb::error::Errno::EINTERNAL, "invalid write_key");
282+
}
283+
284+
std::string write_key;
285+
int64_t write_ts;
286+
mvcc::Codec::DecodeKey(write_iter_->Key(), write_key, write_ts);
287+
288+
auto ret = tmp_write_info.ParseFromArray(write_iter_->Value().data(), write_iter_->Value().size());
289+
if (!ret) {
290+
DINGO_LOG(ERROR) << "cannot parse tmp_write_info, key: " << Helper::StringToHex(write_iter_->Key())
291+
<< ", write_ts: " << write_ts << ", write_key: " << Helper::StringToHex(write_iter_->Key())
292+
<< ", write_value(hex): " << Helper::StringToHex(write_iter_->Value());
293+
return butil::Status(pb::error::Errno::EINTERNAL, "cannot parse tmp_write_info");
294+
}
295+
296+
if (tmp_write_info.start_ts() != start_ts) {
297+
write_iter_->Next();
298+
continue;
299+
}
300+
301+
if (tmp_write_info.op() == pb::store::Op::Rollback) {
302+
break;
303+
}
304+
305+
write_info = tmp_write_info;
306+
commit_ts = write_ts;
307+
find_record = true;
308+
break;
309+
}
310+
311+
return butil::Status::OK();
312+
}
265313
butil::Status TxnReader::GetRollbackInfo(int64_t start_ts, const std::string &key, pb::store::WriteInfo &write_info) {
266314
if (!is_initialized_) {
267315
return butil::Status(pb::error::Errno::EINTERNAL, "txn reader is not initialized");
@@ -2539,6 +2587,34 @@ butil::Status TxnEngineHelper::Prewrite(
25392587
// for optimistic prewrite
25402588
if (!need_check_pessimistic_lock) {
25412589
if (prev_lock_info.lock_type() == pb::store::Op::Lock) {
2590+
pb::store::WriteInfo prev_write_info;
2591+
int64_t prev_commit_ts = 0;
2592+
bool find_record = false;
2593+
auto status =
2594+
txn_reader.CheckCommittedRecord(start_ts, mutation.key(), prev_write_info, prev_commit_ts, find_record);
2595+
if (!status.ok()) {
2596+
DINGO_LOG(FATAL) << fmt::format("[txn][region({})] Prewrite,", region->Id())
2597+
<< ", CheckCommittedRecord failed, key: " << Helper::StringToHex(mutation.key())
2598+
<< ", start_ts: " << start_ts << ", status: " << status.error_str();
2599+
}
2600+
2601+
if (find_record) {
2602+
if (use_async_commit) {
2603+
response->set_min_commit_ts(prev_commit_ts);
2604+
}
2605+
2606+
if (try_one_pc) {
2607+
response->set_one_pc_commit_ts(prev_commit_ts);
2608+
}
2609+
2610+
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail) << fmt::format(
2611+
"[txn][region({})] transaction has been committed,just return. start_ts:{}, commit_ts:{}, "
2612+
"write_info:{}",
2613+
region->Id(), start_ts, prev_commit_ts, prev_write_info.ShortDebugString());
2614+
2615+
return butil::Status::OK();
2616+
}
2617+
25422618
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
25432619
<< fmt::format("[txn][region({})] Prewrite, start_ts: {}", region->Id(), start_ts)
25442620
<< ", optimistic prewrite meet pessimistic lock, key: " << Helper::StringToHex(mutation.key())
@@ -2560,6 +2636,34 @@ butil::Status TxnEngineHelper::Prewrite(
25602636
// this is a repeated prewrite, will skip write to raft state machine
25612637
is_repeated_prewrite = true;
25622638
} else {
2639+
pb::store::WriteInfo prev_write_info;
2640+
int64_t prev_commit_ts = 0;
2641+
bool find_record = false;
2642+
auto status =
2643+
txn_reader.CheckCommittedRecord(start_ts, mutation.key(), prev_write_info, prev_commit_ts, find_record);
2644+
if (!status.ok()) {
2645+
DINGO_LOG(FATAL) << fmt::format("[txn][region({})] Prewrite,", region->Id())
2646+
<< ", CheckCommittedRecord failed, key: " << Helper::StringToHex(mutation.key())
2647+
<< ", start_ts: " << start_ts << ", status: " << status.error_str();
2648+
}
2649+
2650+
if (find_record) {
2651+
if (use_async_commit) {
2652+
response->set_min_commit_ts(prev_commit_ts);
2653+
}
2654+
2655+
if (try_one_pc) {
2656+
response->set_one_pc_commit_ts(prev_commit_ts);
2657+
}
2658+
2659+
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail) << fmt::format(
2660+
"[txn][region({})] transaction has been committed,just return. start_ts:{}, commit_ts:{}, "
2661+
"write_info:{}",
2662+
region->Id(), start_ts, prev_commit_ts, prev_write_info.ShortDebugString());
2663+
2664+
return butil::Status::OK();
2665+
}
2666+
25632667
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
25642668
<< fmt::format("[txn][region({})] Prewrite,", region->Id())
25652669
<< ", key: " << Helper::StringToHex(mutation.key())
@@ -2616,6 +2720,34 @@ butil::Status TxnEngineHelper::Prewrite(
26162720
}
26172721
}
26182722
} else {
2723+
pb::store::WriteInfo prev_write_info;
2724+
int64_t prev_commit_ts = 0;
2725+
bool find_record = false;
2726+
auto status =
2727+
txn_reader.CheckCommittedRecord(start_ts, mutation.key(), prev_write_info, prev_commit_ts, find_record);
2728+
if (!status.ok()) {
2729+
DINGO_LOG(FATAL) << fmt::format("[txn][region({})] Prewrite,", region->Id())
2730+
<< ", CheckCommittedRecord failed, key: " << Helper::StringToHex(mutation.key())
2731+
<< ", start_ts: " << start_ts << ", status: " << status.error_str();
2732+
}
2733+
2734+
if (find_record) {
2735+
if (use_async_commit) {
2736+
response->set_min_commit_ts(prev_commit_ts);
2737+
}
2738+
2739+
if (try_one_pc) {
2740+
response->set_one_pc_commit_ts(prev_commit_ts);
2741+
}
2742+
2743+
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail) << fmt::format(
2744+
"[txn][region({})] transaction has been committed,just return. start_ts:{}, commit_ts:{}, "
2745+
"write_info:{}",
2746+
region->Id(), start_ts, prev_commit_ts, prev_write_info.ShortDebugString());
2747+
2748+
return butil::Status::OK();
2749+
}
2750+
26192751
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
26202752
<< fmt::format("[txn][region({})] Prewrite,", region->Id())
26212753
<< ", key: " << Helper::StringToHex(mutation.key())
@@ -2692,6 +2824,34 @@ butil::Status TxnEngineHelper::Prewrite(
26922824

26932825
if (commit_ts >= start_ts) {
26942826
if (!need_check_pessimistic_lock) {
2827+
pb::store::WriteInfo prev_write_info;
2828+
int64_t prev_commit_ts = 0;
2829+
bool find_record = false;
2830+
auto status =
2831+
txn_reader.CheckCommittedRecord(start_ts, mutation.key(), prev_write_info, prev_commit_ts, find_record);
2832+
if (!status.ok()) {
2833+
DINGO_LOG(FATAL) << fmt::format("[txn][region({})] Prewrite,", region->Id())
2834+
<< ", CheckCommittedRecord failed, key: " << Helper::StringToHex(mutation.key())
2835+
<< ", start_ts: " << start_ts << ", status: " << status.error_str();
2836+
}
2837+
2838+
if (find_record) {
2839+
if (use_async_commit) {
2840+
response->set_min_commit_ts(prev_commit_ts);
2841+
}
2842+
2843+
if (try_one_pc) {
2844+
response->set_one_pc_commit_ts(prev_commit_ts);
2845+
}
2846+
2847+
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail) << fmt::format(
2848+
"[txn][region({})] transaction has been committed,just return. start_ts:{}, commit_ts:{}, "
2849+
"write_info:{}",
2850+
region->Id(), start_ts, prev_commit_ts, prev_write_info.ShortDebugString());
2851+
2852+
return butil::Status::OK();
2853+
}
2854+
26952855
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
26962856
<< "Optimistic Prewrite find this transaction is committed after start_ts,return "
26972857
"WriteConflict start_ts: "

src/engine/txn_engine_helper.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class TxnReader {
5252
std::shared_ptr<Iterator> GetWriteIter() { return write_iter_; }
5353
SnapshotPtr GetSnapshot() { return snapshot_; }
5454

55+
butil::Status CheckCommittedRecord(int64_t start_ts, const std::string &key, pb::store::WriteInfo &write_info, int64_t &commit_ts, bool &find_record);
56+
5557
private:
5658
bool is_initialized_{false};
5759
RawEnginePtr raw_engine_;
@@ -304,8 +306,8 @@ class TxnEngineHelper {
304306
static void RegularUpdateSafePointTsHandler(void *arg);
305307
static void RegularDoGcHandler(void *arg);
306308

307-
static int64_t GenFinalMinCommitTs(store::RegionPtr region, pb::store::LockInfo &lock_info, std::string key, int64_t start_ts,
308-
int64_t for_update_ts, int64_t max_commit_ts);
309+
static int64_t GenFinalMinCommitTs(store::RegionPtr region, pb::store::LockInfo &lock_info, std::string key,
310+
int64_t start_ts, int64_t for_update_ts, int64_t max_commit_ts);
309311

310312
static butil::Status GenPrewriteDataAndLock(
311313
store::RegionPtr region, const pb::store::Mutation &mutation, const pb::store::LockInfo &prev_lock_info,
@@ -467,15 +469,15 @@ class TxnEngineHelper {
467469
const std::vector<pb::common::KeyValue> &kv_scalar,
468470
const std::vector<pb::common::KeyValue> &kv_table,
469471
const std::vector<std::string> &scalar_speed_up_keys,
470-
std::vector<pb::common::VectorWithId> &vector_with_ids,
471-
std::vector<int64_t>& vector_delete_ids);
472+
std::vector<pb::common::VectorWithId> &vector_with_ids,
473+
std::vector<int64_t> &vector_delete_ids);
472474

473475
static butil::Status PreProcessVectorIndex(const std::vector<pb::common::KeyValue> &kv_default,
474476
const std::vector<pb::common::KeyValue> &kv_scalar,
475477
const std::vector<pb::common::KeyValue> &kv_table,
476478
const std::vector<std::string> &scalar_speed_up_keys,
477479
std::vector<pb::common::VectorWithId> &vector_with_ids,
478-
std::vector<int64_t>& vector_delete_ids);
480+
std::vector<int64_t> &vector_delete_ids);
479481

480482
static butil::Status RestoreNonTxnIndex(std::shared_ptr<Context> ctx, store::RegionPtr region,
481483
std::shared_ptr<Engine> raft_engine,

0 commit comments

Comments
 (0)