Skip to content

Commit 53f374f

Browse files
visualYJDyuhaijun999
authored andcommitted
[feat][store] Support batch resolve different transaction.
1 parent 07e0cf0 commit 53f374f

File tree

11 files changed

+144
-83
lines changed

11 files changed

+144
-83
lines changed

dingo-store-proto

src/engine/engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ class Engine : public std::enable_shared_from_this<Engine> {
310310
virtual butil::Status TxnCheckSecondaryLocks(std::shared_ptr<Context> ctx, store::RegionPtr region,
311311
int64_t start_ts, const std::vector<std::string>& keys) = 0;
312312
virtual butil::Status TxnResolveLock(std::shared_ptr<Context> ctx, int64_t start_ts, int64_t commit_ts,
313-
const std::vector<std::string>& keys) = 0;
313+
const std::vector<std::string>& keys, const std::map<int64_t, int64_t>& txn_infos) = 0;
314314
virtual butil::Status TxnBatchRollback(std::shared_ptr<Context> ctx, int64_t start_ts,
315315
const std::vector<std::string>& keys) = 0;
316316
virtual butil::Status TxnHeartBeat(std::shared_ptr<Context> ctx, const std::string& primary_lock, int64_t start_ts,

src/engine/mono_store_engine.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -681,8 +681,8 @@ butil::Status MonoStoreEngine::TxnWriter::TxnCheckSecondaryLocks(std::shared_ptr
681681
}
682682

683683
butil::Status MonoStoreEngine::TxnWriter::TxnResolveLock(std::shared_ptr<Context> ctx, int64_t start_ts,
684-
int64_t commit_ts, const std::vector<std::string>& keys) {
685-
return TxnEngineHelper::ResolveLock(txn_writer_raw_engine_, mono_engine_, ctx, start_ts, commit_ts, keys);
684+
int64_t commit_ts, const std::vector<std::string>& keys, const std::map<int64_t, int64_t>& txn_infos) {
685+
return TxnEngineHelper::ResolveLock(txn_writer_raw_engine_, mono_engine_, ctx, start_ts, commit_ts, keys, txn_infos);
686686
}
687687

688688
butil::Status MonoStoreEngine::TxnWriter::TxnBatchRollback(std::shared_ptr<Context> ctx, int64_t start_ts,

src/engine/mono_store_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ class MonoStoreEngine : public Engine {
224224
butil::Status TxnCheckSecondaryLocks(std::shared_ptr<Context> ctx, store::RegionPtr region, int64_t start_ts,
225225
const std::vector<std::string>& keys) override;
226226
butil::Status TxnResolveLock(std::shared_ptr<Context> ctx, int64_t start_ts, int64_t commit_ts,
227-
const std::vector<std::string>& keys) override;
227+
const std::vector<std::string>& keys, const std::map<int64_t, int64_t>& txn_infos) override;
228228
butil::Status TxnBatchRollback(std::shared_ptr<Context> ctx, int64_t start_ts,
229229
const std::vector<std::string>& keys) override;
230230
butil::Status TxnHeartBeat(std::shared_ptr<Context> ctx, const std::string& primary_lock, int64_t start_ts,

src/engine/raft_store_engine.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -662,8 +662,8 @@ butil::Status RaftStoreEngine::TxnWriter::TxnCheckSecondaryLocks(std::shared_ptr
662662
}
663663

664664
butil::Status RaftStoreEngine::TxnWriter::TxnResolveLock(std::shared_ptr<Context> ctx, int64_t start_ts,
665-
int64_t commit_ts, const std::vector<std::string>& keys) {
666-
return TxnEngineHelper::ResolveLock(txn_writer_raw_engine_, raft_engine_, ctx, start_ts, commit_ts, keys);
665+
int64_t commit_ts, const std::vector<std::string>& keys, const std::map<int64_t, int64_t>& txn_infos) {
666+
return TxnEngineHelper::ResolveLock(txn_writer_raw_engine_, raft_engine_, ctx, start_ts, commit_ts, keys, txn_infos);
667667
}
668668

669669
butil::Status RaftStoreEngine::TxnWriter::TxnBatchRollback(std::shared_ptr<Context> ctx, int64_t start_ts,

src/engine/raft_store_engine.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,13 @@ class RaftStoreEngine : public Engine, public RaftControlAble {
268268
butil::Status TxnCommit(std::shared_ptr<Context> ctx, store::RegionPtr region, int64_t start_ts, int64_t commit_ts,
269269
const std::vector<std::string>& keys) override;
270270
butil::Status TxnCheckTxnStatus(std::shared_ptr<Context> ctx, const std::string& primary_key, int64_t lock_ts,
271-
int64_t caller_start_ts, int64_t current_ts, bool force_sync_commit, bool rollback_if_not_exist) override;
271+
int64_t caller_start_ts, int64_t current_ts, bool force_sync_commit,
272+
bool rollback_if_not_exist) override;
272273
butil::Status TxnCheckSecondaryLocks(std::shared_ptr<Context> ctx, store::RegionPtr region, int64_t start_ts,
273274
const std::vector<std::string>& keys) override;
274275
butil::Status TxnResolveLock(std::shared_ptr<Context> ctx, int64_t start_ts, int64_t commit_ts,
275-
const std::vector<std::string>& keys) override;
276+
const std::vector<std::string>& keys,
277+
const std::map<int64_t, int64_t>& txn_infos) override;
276278
butil::Status TxnBatchRollback(std::shared_ptr<Context> ctx, int64_t start_ts,
277279
const std::vector<std::string>& keys) override;
278280
butil::Status TxnHeartBeat(std::shared_ptr<Context> ctx, const std::string& primary_lock, int64_t start_ts,

src/engine/storage.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,18 +1741,19 @@ butil::Status Storage::TxnCheckSecondaryLocks(std::shared_ptr<Context> ctx, stor
17411741
}
17421742

17431743
butil::Status Storage::TxnResolveLock(std::shared_ptr<Context> ctx, int64_t start_ts, int64_t commit_ts,
1744-
const std::vector<std::string>& keys) {
1744+
const std::vector<std::string>& keys,
1745+
const std::map<int64_t, int64_t>& txn_infos) {
17451746
auto status = ValidateLeader(ctx->RegionId());
17461747
if (BAIDU_UNLIKELY(!status.ok())) {
17471748
return status;
17481749
}
17491750

17501751
DINGO_LOG(DEBUG) << "TxnResolveLock start_ts : " << start_ts << " commit_ts : " << commit_ts
1751-
<< " keys size : " << keys.size();
1752+
<< " keys size : " << keys.size() << " , txn_info size : " << txn_infos.size();
17521753

17531754
auto writer = GetEngineTxnWriter(ctx->StoreEngineType(), ctx->RawEngineType());
17541755

1755-
status = writer->TxnResolveLock(ctx, start_ts, commit_ts, keys);
1756+
status = writer->TxnResolveLock(ctx, start_ts, commit_ts, keys, txn_infos);
17561757
if (BAIDU_UNLIKELY(!status.ok())) {
17571758
return status;
17581759
}

src/engine/storage.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,12 @@ class Storage {
116116
const std::vector<std::string>& keys);
117117
butil::Status TxnBatchRollback(std::shared_ptr<Context> ctx, int64_t start_ts, const std::vector<std::string>& keys);
118118
butil::Status TxnCheckTxnStatus(std::shared_ptr<Context> ctx, const std::string& primary_key, int64_t lock_ts,
119-
int64_t caller_start_ts, int64_t current_ts, bool force_sync_commit, bool rollback_if_not_exist);
119+
int64_t caller_start_ts, int64_t current_ts, bool force_sync_commit,
120+
bool rollback_if_not_exist);
120121
butil::Status TxnCheckSecondaryLocks(std::shared_ptr<Context> ctx, store::RegionPtr region, int64_t start_ts,
121122
const std::vector<std::string>& keys);
122123
butil::Status TxnResolveLock(std::shared_ptr<Context> ctx, int64_t start_ts, int64_t commit_ts,
123-
const std::vector<std::string>& keys);
124+
const std::vector<std::string>& keys, const std::map<int64_t, int64_t>& txn_infos);
124125
butil::Status TxnHeartBeat(std::shared_ptr<Context> ctx, const std::string& primary_lock, int64_t start_ts,
125126
int64_t advise_lock_ttl);
126127
butil::Status TxnGc(std::shared_ptr<Context> ctx, int64_t safe_point_ts);
@@ -167,7 +168,8 @@ class Storage {
167168
butil::Status VectorDump(std::shared_ptr<Engine::VectorReader::Context> ctx, bool dump_all,
168169
std::vector<std::string>& dump_datas);
169170
#if WITH_VECTOR_INDEX_USE_DOCUMENT_SPEEDUP
170-
butil::Status VectorDisplayDocumentDetails(std::shared_ptr<Engine::VectorReader::Context> ctx, store::RegionPtr region,
171+
butil::Status VectorDisplayDocumentDetails(std::shared_ptr<Engine::VectorReader::Context> ctx,
172+
store::RegionPtr region,
171173
pb::index::VectorDisplayDocumentDetailsResponse* response);
172174
#endif
173175

src/engine/txn_engine_helper.cc

Lines changed: 115 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4017,9 +4017,95 @@ butil::Status TxnEngineHelper::TxnCheckSecondaryLocks(RawEnginePtr raw_engine, s
40174017

40184018
bvar::LatencyRecorder g_txn_resolve_lock_latency("dingo_txn_resolve_lock");
40194019

4020+
butil::Status TxnEngineHelper::BatchResolveLock(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
4021+
std::shared_ptr<Context> ctx, store::RegionPtr region,
4022+
const std::map<int64_t, int64_t> &txn_infos,
4023+
pb::store::TxnResultInfo *txn_result) {
4024+
if (txn_infos.empty()) return butil::Status::OK();
4025+
4026+
for (const auto &[start_ts, commit_ts] : txn_infos) {
4027+
std::vector<std::string> keys_to_rollback;
4028+
4029+
std::vector<pb::store::LockInfo> lock_infos_to_commit;
4030+
std::vector<std::string> keys_to_rollback_with_data;
4031+
std::vector<std::string> keys_to_rollback_without_data;
4032+
4033+
auto stream = Stream::New(FLAGS_stream_message_max_limit_size);
4034+
std::vector<pb::store::LockInfo> tmp_lock_infos;
4035+
bool has_more = false;
4036+
std::string end_key{};
4037+
auto ret = ScanLockInfo(stream, raw_engine, start_ts, start_ts + 1, region->Range(false), 0, tmp_lock_infos,
4038+
has_more, end_key);
4039+
if (!ret.ok()) {
4040+
DINGO_LOG(FATAL) << fmt::format("[txn][region({})] BatchResolveLock, ", region->Id())
4041+
<< ", get lock info failed, start_ts: " << start_ts << ", status: " << ret.error_str();
4042+
}
4043+
4044+
for (const auto &lock_info : tmp_lock_infos) {
4045+
// if the lock is a pessimistic lock, can't do resolvelock
4046+
if (lock_info.lock_type() == pb::store::Op::Lock) {
4047+
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] BatchResolveLock,", region->Id())
4048+
<< ", pessimistic lock, can't do resolvelock, key: " << lock_info.key()
4049+
<< ", start_ts: " << start_ts << ", lock_info: " << lock_info.ShortDebugString();
4050+
*txn_result->mutable_locked() = lock_info;
4051+
return butil::Status::OK();
4052+
} else if (lock_info.lock_type() != pb::store::Op::Put && lock_info.lock_type() != pb::store::Op::Delete &&
4053+
lock_info.lock_type() != pb::store::Op::PutIfAbsent) {
4054+
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] BatchResolveLock,", region->Id())
4055+
<< ", invalid lock_type, key: " << lock_info.key() << ", start_ts: " << start_ts
4056+
<< ", lock_info: " << lock_info.ShortDebugString();
4057+
*txn_result->mutable_locked() = lock_info;
4058+
return butil::Status::OK();
4059+
}
4060+
4061+
// prepare to do rollback or commit
4062+
const std::string &key = lock_info.key();
4063+
if (commit_ts > 0) {
4064+
// do commit
4065+
lock_infos_to_commit.push_back(lock_info);
4066+
} else {
4067+
if (lock_info.short_value().empty()) {
4068+
keys_to_rollback_with_data.push_back(key);
4069+
} else {
4070+
keys_to_rollback_without_data.push_back(key);
4071+
}
4072+
}
4073+
}
4074+
4075+
if (!lock_infos_to_commit.empty()) {
4076+
auto ret = DoTxnCommit(raw_engine, raft_engine, ctx, region, lock_infos_to_commit, start_ts, commit_ts);
4077+
if (!ret.ok()) {
4078+
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] BatchResolveLock, ", region->Id())
4079+
<< ", do txn commit failed, start_ts: " << start_ts << ", status: " << ret.error_str();
4080+
return ret;
4081+
}
4082+
}
4083+
4084+
if (!keys_to_rollback_with_data.empty() || !keys_to_rollback_without_data.empty()) {
4085+
for (auto const &key : keys_to_rollback_with_data) {
4086+
DINGO_LOG(INFO) << fmt::format("[txn][region({})] BatchResolveLock, ", region->Id()) << "primary key:" << key
4087+
<< ", do rollback with data, start_ts: " << start_ts;
4088+
}
4089+
for (auto const &key : keys_to_rollback_without_data) {
4090+
DINGO_LOG(INFO) << fmt::format("[txn][region({})] BatchResolveLock, ", region->Id()) << "primary key:" << key
4091+
<< ", do rollback without data, start_ts: " << start_ts;
4092+
}
4093+
auto ret =
4094+
DoRollback(raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data, start_ts);
4095+
if (!ret.ok()) {
4096+
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] BatchResolveLock, ", region->Id())
4097+
<< ", rollback failed, start_ts: " << start_ts << ", status: " << ret.error_str();
4098+
return ret;
4099+
}
4100+
}
4101+
}
4102+
return butil::Status::OK();
4103+
}
4104+
40204105
butil::Status TxnEngineHelper::ResolveLock(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
40214106
std::shared_ptr<Context> ctx, int64_t start_ts, int64_t commit_ts,
4022-
const std::vector<std::string> &keys) {
4107+
const std::vector<std::string> &keys,
4108+
const std::map<int64_t, int64_t> &txn_infos) {
40234109
BvarLatencyGuard bvar_guard(&g_txn_resolve_lock_latency);
40244110

40254111
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
@@ -4041,20 +4127,6 @@ butil::Status TxnEngineHelper::ResolveLock(RawEnginePtr raw_engine, std::shared_
40414127
return butil::Status(pb::error::Errno::EREGION_NOT_FOUND, "region is not found");
40424128
}
40434129

4044-
if (commit_ts < 0) {
4045-
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] ResolveLock", region->Id())
4046-
<< ", commit_ts < 0, region_id: " << ctx->RegionId() << ", start_ts: " << start_ts
4047-
<< ", commit_ts: " << commit_ts;
4048-
return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "commit_ts < 0");
4049-
}
4050-
4051-
if (commit_ts > 0 && commit_ts <= start_ts) {
4052-
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] ResolveLock", region->Id())
4053-
<< ", commit_ts <= start_ts, region_id: " << ctx->RegionId() << ", start_ts: " << start_ts
4054-
<< ", commit_ts: " << commit_ts;
4055-
return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "commit_ts <= start_ts");
4056-
}
4057-
40584130
// if commit_ts = 0, do rollback else do commit
40594131
// scan lock_cf to search if transaction with start_ts is exists, if exists, do rollback or commit
40604132
// if not exists, do nothing
@@ -4087,6 +4159,20 @@ butil::Status TxnEngineHelper::ResolveLock(RawEnginePtr raw_engine, std::shared_
40874159

40884160
// if keys is not empty, we only do resolve lock for these keys
40894161
if (!keys.empty()) {
4162+
if (commit_ts < 0) {
4163+
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] ResolveLock", region->Id())
4164+
<< ", commit_ts < 0, region_id: " << ctx->RegionId() << ", start_ts: " << start_ts
4165+
<< ", commit_ts: " << commit_ts;
4166+
return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "commit_ts < 0");
4167+
}
4168+
4169+
if (commit_ts > 0 && commit_ts <= start_ts) {
4170+
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] ResolveLock", region->Id())
4171+
<< ", commit_ts <= start_ts, region_id: " << ctx->RegionId() << ", start_ts: " << start_ts
4172+
<< ", commit_ts: " << commit_ts;
4173+
return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "commit_ts <= start_ts");
4174+
}
4175+
40904176
for (const auto &key : keys) {
40914177
pb::store::LockInfo lock_info;
40924178
auto ret = txn_reader.GetLockInfo(key, lock_info);
@@ -4146,47 +4232,23 @@ butil::Status TxnEngineHelper::ResolveLock(RawEnginePtr raw_engine, std::shared_
41464232
}
41474233
// scan for keys to rollback
41484234
else {
4149-
auto stream = Stream::New(FLAGS_stream_message_max_limit_size);
4150-
std::vector<pb::store::LockInfo> tmp_lock_infos;
4151-
bool has_more = false;
4152-
std::string end_key{};
4153-
auto ret = ScanLockInfo(stream, raw_engine, start_ts, start_ts + 1, region->Range(false), 0, tmp_lock_infos,
4154-
has_more, end_key);
4155-
if (!ret.ok()) {
4156-
DINGO_LOG(FATAL) << fmt::format("[txn][region({})] ResolveLock, ", region->Id())
4157-
<< ", get lock info failed, start_ts: " << start_ts << ", status: " << ret.error_str();
4158-
}
4159-
4160-
for (const auto &lock_info : tmp_lock_infos) {
4161-
// if the lock is a pessimistic lock, can't do resolvelock
4162-
if (lock_info.lock_type() == pb::store::Op::Lock) {
4163-
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] ResolveLock,", region->Id())
4164-
<< ", pessimistic lock, can't do resolvelock, key: " << lock_info.key()
4165-
<< ", start_ts: " << start_ts << ", lock_info: " << lock_info.ShortDebugString();
4166-
*txn_result->mutable_locked() = lock_info;
4167-
return butil::Status::OK();
4168-
} else if (lock_info.lock_type() != pb::store::Op::Put && lock_info.lock_type() != pb::store::Op::Delete &&
4169-
lock_info.lock_type() != pb::store::Op::PutIfAbsent) {
4170-
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] ResolveLock,", region->Id())
4171-
<< ", invalid lock_type, key: " << lock_info.key() << ", start_ts: " << start_ts
4172-
<< ", lock_info: " << lock_info.ShortDebugString();
4173-
*txn_result->mutable_locked() = lock_info;
4174-
return butil::Status::OK();
4235+
for (const auto &[start_ts, commit_ts] : txn_infos) {
4236+
if (commit_ts < 0) {
4237+
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] ResolveLock", region->Id())
4238+
<< ", commit_ts < 0, region_id: " << ctx->RegionId() << ", start_ts: " << start_ts
4239+
<< ", commit_ts: " << commit_ts;
4240+
return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "commit_ts < 0");
41754241
}
41764242

4177-
// prepare to do rollback or commit
4178-
const std::string &key = lock_info.key();
4179-
if (commit_ts > 0) {
4180-
// do commit
4181-
lock_infos_to_commit.push_back(lock_info);
4182-
} else {
4183-
if (lock_info.short_value().empty()) {
4184-
keys_to_rollback_with_data.push_back(key);
4185-
} else {
4186-
keys_to_rollback_without_data.push_back(key);
4187-
}
4243+
if (commit_ts > 0 && commit_ts <= start_ts) {
4244+
DINGO_LOG(ERROR) << fmt::format("[txn][region({})] ResolveLock", region->Id())
4245+
<< ", commit_ts <= start_ts, region_id: " << ctx->RegionId() << ", start_ts: " << start_ts
4246+
<< ", commit_ts: " << commit_ts;
4247+
return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "commit_ts <= start_ts");
41884248
}
4189-
} // end while iter
4249+
}
4250+
return BatchResolveLock(raw_engine, raft_engine, ctx, region, txn_infos, txn_result);
4251+
41904252
} // end scan lock
41914253

41924254
if (!lock_infos_to_commit.empty()) {

src/engine/txn_engine_helper.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,9 @@ class TxnEngineHelper {
206206

207207
static butil::Status ResolveLock(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
208208
std::shared_ptr<Context> ctx, int64_t start_ts, int64_t commit_ts,
209-
const std::vector<std::string> &keys);
209+
const std::vector<std::string> &keys, const std::map<int64_t, int64_t> &txn_infos);
210+
211+
static butil::Status BatchResolveLock(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine, std::shared_ptr<Context> ctx, store::RegionPtr region, const std::map<int64_t, int64_t> &txn_infos, pb::store::TxnResultInfo* txn_result);
210212

211213
static butil::Status HeartBeat(RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
212214
std::shared_ptr<Context> ctx, const std::string &primary_lock, int64_t start_ts,

0 commit comments

Comments
 (0)