Skip to content

Commit e2a282a

Browse files
visualYJDyuhaijun999
authored andcommitted
[fix][store] Solve the snapshot read consistency problem of 1pc transactions using memory locks
1 parent 1321ae5 commit e2a282a

File tree

10 files changed

+423
-153
lines changed

10 files changed

+423
-153
lines changed

src/engine/concurrency_manager.cc

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright (c) 2023 dingodb.com, Inc. All Rights Reserved
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "engine/concurrency_manager.h"
16+
17+
#include "engine/txn_engine_helper.h"
18+
#include "proto/store.pb.h"
19+
20+
namespace dingodb {
21+
22+
// lock_entry.rw_lock has already write locked
23+
void ConcurrencyManager::LockKey(const std::string& key, LockEntryPtr lock_entry) {
24+
RWLockWriteGuard guard(&rw_lock_);
25+
lock_table_[key] = lock_entry;
26+
}
27+
28+
void ConcurrencyManager::UnlockKeys(const std::vector<std::string>& keys) {
29+
RWLockWriteGuard guard(&rw_lock_);
30+
for (auto const& key : keys) {
31+
auto it = lock_table_.find(key);
32+
if (it != lock_table_.end()) {
33+
it->second->is_deleted.store(true, std::memory_order_release);
34+
lock_table_.erase(it);
35+
}
36+
}
37+
}
38+
39+
bool ConcurrencyManager::CheckKeys(const std::vector<std::string>& keys, pb::store::IsolationLevel isolation_level,
40+
int64_t start_ts, const std::set<int64_t>& resolved_locks,
41+
pb::store::TxnResultInfo& txn_result_info) {
42+
std::vector<LockEntryPtr> lock_entrys;
43+
{
44+
RWLockReadGuard guard(&rw_lock_);
45+
for (auto const& key : keys) {
46+
auto it = lock_table_.find(key);
47+
if (it != lock_table_.end()) {
48+
lock_entrys.push_back(it->second);
49+
}
50+
}
51+
}
52+
53+
for (auto& lock_entry : lock_entrys) {
54+
if (lock_entry->is_deleted.load(std::memory_order_acquire)) {
55+
continue;
56+
}
57+
RWLockReadGuard guard(&lock_entry->rw_lock);
58+
if (TxnEngineHelper::CheckLockConflict(lock_entry->lock_info, isolation_level, start_ts, resolved_locks,
59+
txn_result_info)) {
60+
return true;
61+
}
62+
}
63+
64+
return false;
65+
}
66+
67+
bool ConcurrencyManager::CheckRange(const std::string& start_key, const std::string& end_key,
68+
pb::store::IsolationLevel isolation_level, int64_t start_ts,
69+
const std::set<int64_t>& resolved_locks,
70+
pb::store::TxnResultInfo& txn_result_info) {
71+
std::vector<LockEntryPtr> lock_entrys;
72+
{
73+
RWLockReadGuard guard(&rw_lock_);
74+
auto it = lock_table_.lower_bound(start_key);
75+
while (it != lock_table_.end() && it->first <= end_key) {
76+
lock_entrys.push_back(it->second);
77+
}
78+
}
79+
80+
for (auto& lock_entry : lock_entrys) {
81+
if (lock_entry->is_deleted.load(std::memory_order_acquire)) {
82+
continue;
83+
}
84+
RWLockReadGuard guard(&lock_entry->rw_lock);
85+
if (TxnEngineHelper::CheckLockConflict(lock_entry->lock_info, isolation_level, start_ts, resolved_locks,
86+
txn_result_info)) {
87+
return true;
88+
}
89+
}
90+
91+
return false;
92+
}
93+
94+
} // namespace dingodb

src/engine/concurrency_manager.h

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright (c) 2023 dingodb.com, Inc. All Rights Reserved
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef DINGODB_COMMON_CONCURRENCY_MANAGER_H_
16+
#define DINGODB_COMMON_CONCURRENCY_MANAGER_H_
17+
18+
#include <map>
19+
#include <memory>
20+
#include <string>
21+
#include <vector>
22+
23+
#include "common/synchronization.h"
24+
#include "proto/store.pb.h"
25+
26+
namespace dingodb {
27+
class ConcurrencyManager {
28+
public:
29+
ConcurrencyManager() = default;
30+
~ConcurrencyManager() = default;
31+
32+
ConcurrencyManager(const ConcurrencyManager&) = delete;
33+
void operator=(const ConcurrencyManager&) = delete;
34+
35+
struct LockEntry {
36+
LockEntry() = default;
37+
~LockEntry() = default;
38+
39+
LockEntry(const LockEntry&) = delete;
40+
LockEntry& operator=(const LockEntry&) = delete;
41+
42+
pb::store::LockInfo lock_info;
43+
RWLock rw_lock;
44+
std::atomic<bool> is_deleted{false}; // Whether the marker has been is_deleted (i.e., removed from the map)
45+
};
46+
47+
using LockEntryPtr = std::shared_ptr<LockEntry>;
48+
49+
void LockKey(const std::string& key, LockEntryPtr lock_entry);
50+
51+
void UnlockKeys(const std::vector<std::string>& keys);
52+
53+
bool CheckKeys(const std::vector<std::string>& keys, pb::store::IsolationLevel isolation_level, int64_t start_ts,
54+
const std::set<int64_t>& resolved_locks, pb::store::TxnResultInfo& txn_result_info);
55+
56+
// Range read operation: Check whether the keys within the range are locked
57+
bool CheckRange(const std::string& start_key, const std::string& end_key, pb::store::IsolationLevel isolation_level,
58+
int64_t start_ts, const std::set<int64_t>& resolved_locks, pb::store::TxnResultInfo& txn_result_info);
59+
60+
private:
61+
// key->lock_info pb::store::LockInfo
62+
std::map<std::string, LockEntryPtr>
63+
lock_table_; // Ordered storage of locked keys (supporting range queries)
64+
RWLock rw_lock_;
65+
};
66+
67+
} // namespace dingodb
68+
69+
#endif

src/engine/txn_engine_helper.cc

Lines changed: 37 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -814,7 +814,8 @@ butil::Status TxnIterator::GetCurrentValue() {
814814
if (is_lock_conflict) {
815815
DINGO_LOG(WARNING) << "[txn]Scan CheckLockConflict return conflict, key: " << Helper::StringToHex(lock_info.key())
816816
<< ", isolation_level: " << isolation_level_ << ", start_ts: " << start_ts_
817-
<< ", seek_ts: " << seek_ts_ << ", lock_info: " << lock_info.ShortDebugString();
817+
<< ", seek_ts: " << seek_ts_ << ", lock_info: " << lock_info.ShortDebugString()
818+
<< ", txn_result_info: " << txn_result_info_.ShortDebugString();
818819
key_.clear();
819820
value_.clear();
820821
return butil::Status(pb::error::Errno::ETXN_LOCK_CONFLICT, "lock conflict");
@@ -878,26 +879,18 @@ std::string TxnIterator::Value() { return value_; }
878879
bool TxnEngineHelper::CheckLockConflict(const pb::store::LockInfo &lock_info, pb::store::IsolationLevel isolation_level,
879880
int64_t start_ts, const std::set<int64_t> &resolved_locks,
880881
pb::store::TxnResultInfo &txn_result_info) {
881-
DINGO_LOG(DEBUG) << "[txn]CheckLockConflict lock_info: " << lock_info.ShortDebugString()
882-
<< ", isolation_level: " << isolation_level << ", start_ts: " << start_ts
883-
<< ", resolved_locks size: " << resolved_locks.size();
882+
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
883+
<< "[txn]CheckLockConflict lock_info: " << lock_info.ShortDebugString()
884+
<< ", isolation_level: " << isolation_level << ", start_ts: " << start_ts
885+
<< ", resolved_locks size: " << resolved_locks.size();
884886

885887
// if lock_info is resolved, return false, means the executor has used CheckTxnStatus to check the lock_info and
886888
// updated the min_commit_ts of the primary lock.
887889
if (!resolved_locks.empty() && resolved_locks.find(lock_info.lock_ts()) != resolved_locks.end()) {
888-
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
889-
<< "[txn]CheckLockConflict lock_info: " << lock_info.ShortDebugString()
890-
<< ", isolation_level: " << isolation_level << ", start_ts: " << start_ts
891-
<< ", resolved_locks size: " << resolved_locks.size() << ", lock_ts: " << lock_info.lock_ts()
892-
<< " is resolved, return false";
893890
return false;
894891
}
895892
// Ignore lock when min_commit_ts > ts
896893
if (lock_info.min_commit_ts() > start_ts) {
897-
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
898-
<< "[txn]CheckLockConflict lock_info: " << lock_info.ShortDebugString()
899-
<< ", isolation_level: " << isolation_level << ", start_ts: " << start_ts
900-
<< ", min_commit_ts: " << lock_info.lock_ts() << " ignore lock when min_commit_ts > ts";
901894
return false;
902895
}
903896

@@ -906,18 +899,12 @@ bool TxnEngineHelper::CheckLockConflict(const pb::store::LockInfo &lock_info, pb
906899
// for pessimistic, check for_update_ts
907900
if (lock_info.for_update_ts() > 0) {
908901
if (lock_info.for_update_ts() < start_ts) {
909-
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
910-
<< "[txn]CheckLockConflict SI lock_info.for_update_ts() > 0, it's conflict, lock_info: "
911-
<< lock_info.ShortDebugString() << ", start_ts: " << start_ts;
912902
// for_update_ts < start_ts, return lock_info
913903
*(txn_result_info.mutable_locked()) = lock_info;
914904
return true;
915905
}
916906
} else {
917907
if (lock_info.lock_ts() < start_ts) {
918-
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
919-
<< "[txn]CheckLockConflict SI lock_info.for_update_ts() == 0, it's conflict, lock_info: "
920-
<< lock_info.ShortDebugString() << ", start_ts: " << start_ts;
921908
// lock_ts < start_ts, return lock_info
922909
*(txn_result_info.mutable_locked()) = lock_info;
923910
return true;
@@ -931,18 +918,10 @@ bool TxnEngineHelper::CheckLockConflict(const pb::store::LockInfo &lock_info, pb
931918
// for optimistic lock, need to check the lock_ts
932919
if (lock_info.for_update_ts() > 0) {
933920
if (lock_info.lock_type() == pb::store::Lock) {
934-
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
935-
<< "[txn]CheckLockConflict RC lock_info.for_update_ts() > 0, but only on LOCK stage, it's ok, "
936-
"lock_info: "
937-
<< lock_info.ShortDebugString() << ", start_ts: " << start_ts;
938921
return false;
939922
}
940923

941924
if (lock_info.for_update_ts() < start_ts) {
942-
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
943-
<< "[txn]CheckLockConflict RC lock_info.for_update_ts() > 0, on PREWRITE stage, it's "
944-
"conflict, lock_info: "
945-
<< lock_info.ShortDebugString() << ", start_ts: " << start_ts;
946925
// for_update_ts < start_ts, return lock_info
947926
*(txn_result_info.mutable_locked()) = lock_info;
948927
return true;
@@ -1129,7 +1108,8 @@ butil::Status TxnEngineHelper::BatchGet(RawEnginePtr engine, const pb::store::Is
11291108
if (is_lock_conflict) {
11301109
DINGO_LOG(WARNING) << "[txn]BatchGet CheckLockConflict return conflict, key: " << Helper::StringToHex(key)
11311110
<< ", isolation_level: " << isolation_level << ", start_ts: " << start_ts
1132-
<< ", lock_info: " << lock_info.ShortDebugString();
1111+
<< ", lock_info: " << lock_info.ShortDebugString()
1112+
<< ", txn_result_info: " << txn_result_info.ShortDebugString();
11331113
return butil::Status::OK();
11341114
}
11351115

@@ -1866,23 +1846,30 @@ butil::Status TxnEngineHelper::PessimisticRollback(RawEnginePtr raw_engine, std:
18661846

18671847
bvar::LatencyRecorder g_txn_prewrite_latency("dingo_txn_prewrite");
18681848

1869-
void TxnEngineHelper::GenFinalMinCommitTs(int64_t region_id, std::string key, int64_t region_max_ts, int64_t start_ts,
1870-
int64_t for_update_ts, int64_t lock_min_commit_ts, int64_t max_commit_ts,
1871-
int64_t &final_min_commit_ts) {
1872-
int64_t min_commit_ts = std::max(std::max(region_max_ts, start_ts), for_update_ts) + 1;
1873-
final_min_commit_ts = std::max(min_commit_ts, lock_min_commit_ts);
1849+
int64_t TxnEngineHelper::GenFinalMinCommitTs(store::RegionPtr region, pb::store::LockInfo &lock_info, std::string key,
1850+
int64_t start_ts, int64_t for_update_ts, int64_t max_commit_ts) {
1851+
int64_t region_id = region->Id();
1852+
auto new_entry = std::make_shared<ConcurrencyManager::LockEntry>();
1853+
RWLockWriteGuard guard(&new_entry->rw_lock);
1854+
new_entry->lock_info = lock_info;
1855+
region->LockKey(key, new_entry);
1856+
1857+
int64_t region_max_ts = region->TxnAccessMaxTs();
1858+
int64_t min_commit_ts = std::max({region_max_ts, start_ts, for_update_ts}) + 1;
1859+
int64_t final_min_commit_ts = std::max(min_commit_ts, lock_info.min_commit_ts());
1860+
final_min_commit_ts = (max_commit_ts != 0 && min_commit_ts > max_commit_ts) ? 0 : final_min_commit_ts;
1861+
1862+
if (final_min_commit_ts > 0) {
1863+
lock_info.set_min_commit_ts(final_min_commit_ts);
1864+
new_entry->lock_info.set_min_commit_ts(final_min_commit_ts);
1865+
}
1866+
18741867
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
18751868
<< fmt::format("[txn][region({})]", region_id) << " GenFinalMinCommitTs, region_max_ts:" << region_max_ts
18761869
<< " , start_ts:" << start_ts << " , for_update_ts:" << for_update_ts
1877-
<< ", lock_min_commit_ts:" << lock_min_commit_ts << ", final_min_commit_ts:" << final_min_commit_ts;
1878-
if (max_commit_ts != 0 && min_commit_ts > max_commit_ts) {
1879-
DINGO_LOG(WARNING) << fmt::format(
1880-
"[txn][region({})] Prewrite 1pc commit_ts({}) is too large, fallback to normal 2PC",
1881-
region_id, min_commit_ts)
1882-
<< ", key: " << key << ", start_ts: " << start_ts << " , for_update_ts: " << for_update_ts
1883-
<< " , lock_min_commit_ts: " << lock_min_commit_ts << " , max_commit_ts: " << max_commit_ts;
1884-
final_min_commit_ts = 0;
1885-
}
1870+
<< ", lock_min_commit_ts:" << lock_info.min_commit_ts() << ", final_min_commit_ts:" << final_min_commit_ts;
1871+
1872+
return final_min_commit_ts;
18861873
}
18871874

18881875
butil::Status TxnEngineHelper::GenPrewriteDataAndLock(
@@ -1895,7 +1882,6 @@ butil::Status TxnEngineHelper::GenPrewriteDataAndLock(
18951882
std::vector<std::tuple<std::string, std::string, pb::store::LockInfo, bool>> &locks_for_1pc,
18961883
int64_t &final_min_commit_ts) {
18971884
int64_t region_id = region->Id();
1898-
int64_t region_max_ts = region->TxnAppliedMaxTs();
18991885

19001886
// do Put/Delete/PutIfAbsent
19011887
if (mutation.op() == pb::store::Op::Put) {
@@ -1937,8 +1923,8 @@ butil::Status TxnEngineHelper::GenPrewriteDataAndLock(
19371923
}
19381924
}
19391925
if (try_one_pc || use_async_commit) {
1940-
GenFinalMinCommitTs(region_id, mutation.key(), region_max_ts, start_ts, for_update_ts,
1941-
lock_info.min_commit_ts(), max_commit_ts, final_min_commit_ts);
1926+
final_min_commit_ts =
1927+
GenFinalMinCommitTs(region, lock_info, mutation.key(), start_ts, for_update_ts, max_commit_ts);
19421928
if (final_min_commit_ts == 0) {
19431929
// fallback_to_2PC
19441930
try_one_pc = false;
@@ -1986,8 +1972,8 @@ butil::Status TxnEngineHelper::GenPrewriteDataAndLock(
19861972
}
19871973

19881974
if (try_one_pc || use_async_commit) {
1989-
GenFinalMinCommitTs(region_id, mutation.key(), region_max_ts, start_ts, for_update_ts,
1990-
lock_info.min_commit_ts(), max_commit_ts, final_min_commit_ts);
1975+
final_min_commit_ts =
1976+
GenFinalMinCommitTs(region, lock_info, mutation.key(), start_ts, for_update_ts, max_commit_ts);
19911977
if (final_min_commit_ts == 0) {
19921978
// fallback_to_2PC
19931979
try_one_pc = false;
@@ -2047,8 +2033,8 @@ butil::Status TxnEngineHelper::GenPrewriteDataAndLock(
20472033
}
20482034

20492035
if (try_one_pc || use_async_commit) {
2050-
GenFinalMinCommitTs(region_id, mutation.key(), region_max_ts, start_ts, for_update_ts,
2051-
lock_info.min_commit_ts(), max_commit_ts, final_min_commit_ts);
2036+
final_min_commit_ts =
2037+
GenFinalMinCommitTs(region, lock_info, mutation.key(), start_ts, for_update_ts, max_commit_ts);
20522038
if (final_min_commit_ts == 0) {
20532039
// fallback_to_2PC
20542040
try_one_pc = false;
@@ -2103,8 +2089,8 @@ butil::Status TxnEngineHelper::GenPrewriteDataAndLock(
21032089
}
21042090

21052091
if (try_one_pc || use_async_commit) {
2106-
GenFinalMinCommitTs(region_id, mutation.key(), region_max_ts, start_ts, for_update_ts,
2107-
lock_info.min_commit_ts(), max_commit_ts, final_min_commit_ts);
2092+
final_min_commit_ts =
2093+
GenFinalMinCommitTs(region, lock_info, mutation.key(), start_ts, for_update_ts, max_commit_ts);
21082094
if (final_min_commit_ts == 0) {
21092095
// fallback_to_2PC
21102096
try_one_pc = false;

src/engine/txn_engine_helper.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,8 @@ class TxnEngineHelper {
304304
static void RegularUpdateSafePointTsHandler(void *arg);
305305
static void RegularDoGcHandler(void *arg);
306306

307-
static void GenFinalMinCommitTs(int64_t region_id, std::string key, int64_t region_max_ts, int64_t start_ts,
308-
int64_t for_update_ts, int64_t lock_min_commit_ts, int64_t max_commit_ts,
309-
int64_t &final_min_commit_ts);
307+
static int64_t GenFinalMinCommitTs(store::RegionPtr region, pb::store::LockInfo &lock_info, std::string key, int64_t start_ts,
308+
int64_t for_update_ts, int64_t max_commit_ts);
310309

311310
static butil::Status GenPrewriteDataAndLock(
312311
store::RegionPtr region, const pb::store::Mutation &mutation, const pb::store::LockInfo &prev_lock_info,

src/handler/raft_apply_handler.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ bool HandlePreCreateRegionSplit(const pb::raft::SplitRequest &request, store::Re
291291
to_region->SetRawAppliedMaxTs(from_region->RawAppliedMaxTs());
292292

293293
// Set txn apply max ts
294-
to_region->SetTxnAppliedMaxTs(from_region->TxnAppliedMaxTs());
294+
to_region->SetTxnAccessMaxTs(from_region->TxnAccessMaxTs());
295295

296296
// Note: full heartbeat do not report region metrics when the region is in SPLITTING or MERGING
297297
store_region_meta->UpdateState(to_region, pb::common::StoreRegionState::SPLITTING);
@@ -532,7 +532,7 @@ bool HandlePostCreateRegionSplit(const pb::raft::SplitRequest &request, store::R
532532
child_region->SetRawAppliedMaxTs(parent_region->RawAppliedMaxTs());
533533

534534
// Set txn apply max ts
535-
child_region->SetTxnAppliedMaxTs(parent_region->TxnAppliedMaxTs());
535+
child_region->SetTxnAccessMaxTs(parent_region->TxnAccessMaxTs());
536536

537537
// Set parent/child range/epoch
538538
pb::common::Range parent_range;
@@ -920,8 +920,8 @@ int CommitMergeHandler::Handle(std::shared_ptr<Context>, store::RegionPtr target
920920
target_region->SetRawAppliedMaxTs(raw_max_ts);
921921

922922
// Set apply max ts
923-
int64_t txn_max_ts = std::max(target_region->TxnAppliedMaxTs(), source_region->TxnAppliedMaxTs());
924-
target_region->SetTxnAppliedMaxTs(txn_max_ts);
923+
int64_t txn_max_ts = std::max(target_region->TxnAccessMaxTs(), source_region->TxnAccessMaxTs());
924+
target_region->SetTxnAccessMaxTs(txn_max_ts);
925925

926926
// Set source region TOMBSTONE state
927927
store_region_meta->UpdateState(source_region, pb::common::StoreRegionState::TOMBSTONE);

0 commit comments

Comments
 (0)