@@ -4017,9 +4017,95 @@ butil::Status TxnEngineHelper::TxnCheckSecondaryLocks(RawEnginePtr raw_engine, s
40174017
40184018bvar::LatencyRecorder g_txn_resolve_lock_latency (" dingo_txn_resolve_lock" );
40194019
4020+ butil::Status TxnEngineHelper::BatchResolveLock (RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
4021+ std::shared_ptr<Context> ctx, store::RegionPtr region,
4022+ const std::map<int64_t , int64_t > &txn_infos,
4023+ pb::store::TxnResultInfo *txn_result) {
4024+ if (txn_infos.empty ()) return butil::Status::OK ();
4025+
4026+ for (const auto &[start_ts, commit_ts] : txn_infos) {
4027+ std::vector<std::string> keys_to_rollback;
4028+
4029+ std::vector<pb::store::LockInfo> lock_infos_to_commit;
4030+ std::vector<std::string> keys_to_rollback_with_data;
4031+ std::vector<std::string> keys_to_rollback_without_data;
4032+
4033+ auto stream = Stream::New (FLAGS_stream_message_max_limit_size);
4034+ std::vector<pb::store::LockInfo> tmp_lock_infos;
4035+ bool has_more = false ;
4036+ std::string end_key{};
4037+ auto ret = ScanLockInfo (stream, raw_engine, start_ts, start_ts + 1 , region->Range (false ), 0 , tmp_lock_infos,
4038+ has_more, end_key);
4039+ if (!ret.ok ()) {
4040+ DINGO_LOG (FATAL) << fmt::format (" [txn][region({})] BatchResolveLock, " , region->Id ())
4041+ << " , get lock info failed, start_ts: " << start_ts << " , status: " << ret.error_str ();
4042+ }
4043+
4044+ for (const auto &lock_info : tmp_lock_infos) {
4045+ // if the lock is a pessimistic lock, can't do resolvelock
4046+ if (lock_info.lock_type () == pb::store::Op::Lock) {
4047+ DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] BatchResolveLock," , region->Id ())
4048+ << " , pessimistic lock, can't do resolvelock, key: " << lock_info.key ()
4049+ << " , start_ts: " << start_ts << " , lock_info: " << lock_info.ShortDebugString ();
4050+ *txn_result->mutable_locked () = lock_info;
4051+ return butil::Status::OK ();
4052+ } else if (lock_info.lock_type () != pb::store::Op::Put && lock_info.lock_type () != pb::store::Op::Delete &&
4053+ lock_info.lock_type () != pb::store::Op::PutIfAbsent) {
4054+ DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] BatchResolveLock," , region->Id ())
4055+ << " , invalid lock_type, key: " << lock_info.key () << " , start_ts: " << start_ts
4056+ << " , lock_info: " << lock_info.ShortDebugString ();
4057+ *txn_result->mutable_locked () = lock_info;
4058+ return butil::Status::OK ();
4059+ }
4060+
4061+ // prepare to do rollback or commit
4062+ const std::string &key = lock_info.key ();
4063+ if (commit_ts > 0 ) {
4064+ // do commit
4065+ lock_infos_to_commit.push_back (lock_info);
4066+ } else {
4067+ if (lock_info.short_value ().empty ()) {
4068+ keys_to_rollback_with_data.push_back (key);
4069+ } else {
4070+ keys_to_rollback_without_data.push_back (key);
4071+ }
4072+ }
4073+ }
4074+
4075+ if (!lock_infos_to_commit.empty ()) {
4076+ auto ret = DoTxnCommit (raw_engine, raft_engine, ctx, region, lock_infos_to_commit, start_ts, commit_ts);
4077+ if (!ret.ok ()) {
4078+ DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] BatchResolveLock, " , region->Id ())
4079+ << " , do txn commit failed, start_ts: " << start_ts << " , status: " << ret.error_str ();
4080+ return ret;
4081+ }
4082+ }
4083+
4084+ if (!keys_to_rollback_with_data.empty () || !keys_to_rollback_without_data.empty ()) {
4085+ for (auto const &key : keys_to_rollback_with_data) {
4086+ DINGO_LOG (INFO) << fmt::format (" [txn][region({})] BatchResolveLock, " , region->Id ()) << " primary key:" << key
4087+ << " , do rollback with data, start_ts: " << start_ts;
4088+ }
4089+ for (auto const &key : keys_to_rollback_without_data) {
4090+ DINGO_LOG (INFO) << fmt::format (" [txn][region({})] BatchResolveLock, " , region->Id ()) << " primary key:" << key
4091+ << " , do rollback without data, start_ts: " << start_ts;
4092+ }
4093+ auto ret =
4094+ DoRollback (raw_engine, raft_engine, ctx, keys_to_rollback_with_data, keys_to_rollback_without_data, start_ts);
4095+ if (!ret.ok ()) {
4096+ DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] BatchResolveLock, " , region->Id ())
4097+ << " , rollback failed, start_ts: " << start_ts << " , status: " << ret.error_str ();
4098+ return ret;
4099+ }
4100+ }
4101+ }
4102+ return butil::Status::OK ();
4103+ }
4104+
40204105butil::Status TxnEngineHelper::ResolveLock (RawEnginePtr raw_engine, std::shared_ptr<Engine> raft_engine,
40214106 std::shared_ptr<Context> ctx, int64_t start_ts, int64_t commit_ts,
4022- const std::vector<std::string> &keys) {
4107+ const std::vector<std::string> &keys,
4108+ const std::map<int64_t , int64_t > &txn_infos) {
40234109 BvarLatencyGuard bvar_guard (&g_txn_resolve_lock_latency);
40244110
40254111 DINGO_LOG_IF (INFO, FLAGS_dingo_log_switch_txn_detail)
@@ -4041,20 +4127,6 @@ butil::Status TxnEngineHelper::ResolveLock(RawEnginePtr raw_engine, std::shared_
40414127 return butil::Status (pb::error::Errno::EREGION_NOT_FOUND, " region is not found" );
40424128 }
40434129
4044- if (commit_ts < 0 ) {
4045- DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] ResolveLock" , region->Id ())
4046- << " , commit_ts < 0, region_id: " << ctx->RegionId () << " , start_ts: " << start_ts
4047- << " , commit_ts: " << commit_ts;
4048- return butil::Status (pb::error::Errno::EILLEGAL_PARAMTETERS, " commit_ts < 0" );
4049- }
4050-
4051- if (commit_ts > 0 && commit_ts <= start_ts) {
4052- DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] ResolveLock" , region->Id ())
4053- << " , commit_ts <= start_ts, region_id: " << ctx->RegionId () << " , start_ts: " << start_ts
4054- << " , commit_ts: " << commit_ts;
4055- return butil::Status (pb::error::Errno::EILLEGAL_PARAMTETERS, " commit_ts <= start_ts" );
4056- }
4057-
40584130 // if commit_ts = 0, do rollback else do commit
40594131 // scan lock_cf to search if transaction with start_ts is exists, if exists, do rollback or commit
40604132 // if not exists, do nothing
@@ -4087,6 +4159,20 @@ butil::Status TxnEngineHelper::ResolveLock(RawEnginePtr raw_engine, std::shared_
40874159
40884160 // if keys is not empty, we only do resolve lock for these keys
40894161 if (!keys.empty ()) {
4162+ if (commit_ts < 0 ) {
4163+ DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] ResolveLock" , region->Id ())
4164+ << " , commit_ts < 0, region_id: " << ctx->RegionId () << " , start_ts: " << start_ts
4165+ << " , commit_ts: " << commit_ts;
4166+ return butil::Status (pb::error::Errno::EILLEGAL_PARAMTETERS, " commit_ts < 0" );
4167+ }
4168+
4169+ if (commit_ts > 0 && commit_ts <= start_ts) {
4170+ DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] ResolveLock" , region->Id ())
4171+ << " , commit_ts <= start_ts, region_id: " << ctx->RegionId () << " , start_ts: " << start_ts
4172+ << " , commit_ts: " << commit_ts;
4173+ return butil::Status (pb::error::Errno::EILLEGAL_PARAMTETERS, " commit_ts <= start_ts" );
4174+ }
4175+
40904176 for (const auto &key : keys) {
40914177 pb::store::LockInfo lock_info;
40924178 auto ret = txn_reader.GetLockInfo (key, lock_info);
@@ -4146,47 +4232,23 @@ butil::Status TxnEngineHelper::ResolveLock(RawEnginePtr raw_engine, std::shared_
41464232 }
41474233 // scan for keys to rollback
41484234 else {
4149- auto stream = Stream::New (FLAGS_stream_message_max_limit_size);
4150- std::vector<pb::store::LockInfo> tmp_lock_infos;
4151- bool has_more = false ;
4152- std::string end_key{};
4153- auto ret = ScanLockInfo (stream, raw_engine, start_ts, start_ts + 1 , region->Range (false ), 0 , tmp_lock_infos,
4154- has_more, end_key);
4155- if (!ret.ok ()) {
4156- DINGO_LOG (FATAL) << fmt::format (" [txn][region({})] ResolveLock, " , region->Id ())
4157- << " , get lock info failed, start_ts: " << start_ts << " , status: " << ret.error_str ();
4158- }
4159-
4160- for (const auto &lock_info : tmp_lock_infos) {
4161- // if the lock is a pessimistic lock, can't do resolvelock
4162- if (lock_info.lock_type () == pb::store::Op::Lock) {
4163- DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] ResolveLock," , region->Id ())
4164- << " , pessimistic lock, can't do resolvelock, key: " << lock_info.key ()
4165- << " , start_ts: " << start_ts << " , lock_info: " << lock_info.ShortDebugString ();
4166- *txn_result->mutable_locked () = lock_info;
4167- return butil::Status::OK ();
4168- } else if (lock_info.lock_type () != pb::store::Op::Put && lock_info.lock_type () != pb::store::Op::Delete &&
4169- lock_info.lock_type () != pb::store::Op::PutIfAbsent) {
4170- DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] ResolveLock," , region->Id ())
4171- << " , invalid lock_type, key: " << lock_info.key () << " , start_ts: " << start_ts
4172- << " , lock_info: " << lock_info.ShortDebugString ();
4173- *txn_result->mutable_locked () = lock_info;
4174- return butil::Status::OK ();
4235+ for (const auto &[start_ts, commit_ts] : txn_infos) {
4236+ if (commit_ts < 0 ) {
4237+ DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] ResolveLock" , region->Id ())
4238+ << " , commit_ts < 0, region_id: " << ctx->RegionId () << " , start_ts: " << start_ts
4239+ << " , commit_ts: " << commit_ts;
4240+ return butil::Status (pb::error::Errno::EILLEGAL_PARAMTETERS, " commit_ts < 0" );
41754241 }
41764242
4177- // prepare to do rollback or commit
4178- const std::string &key = lock_info.key ();
4179- if (commit_ts > 0 ) {
4180- // do commit
4181- lock_infos_to_commit.push_back (lock_info);
4182- } else {
4183- if (lock_info.short_value ().empty ()) {
4184- keys_to_rollback_with_data.push_back (key);
4185- } else {
4186- keys_to_rollback_without_data.push_back (key);
4187- }
4243+ if (commit_ts > 0 && commit_ts <= start_ts) {
4244+ DINGO_LOG (ERROR) << fmt::format (" [txn][region({})] ResolveLock" , region->Id ())
4245+ << " , commit_ts <= start_ts, region_id: " << ctx->RegionId () << " , start_ts: " << start_ts
4246+ << " , commit_ts: " << commit_ts;
4247+ return butil::Status (pb::error::Errno::EILLEGAL_PARAMTETERS, " commit_ts <= start_ts" );
41884248 }
4189- } // end while iter
4249+ }
4250+ return BatchResolveLock (raw_engine, raft_engine, ctx, region, txn_infos, txn_result);
4251+
41904252 } // end scan lock
41914253
41924254 if (!lock_infos_to_commit.empty ()) {
0 commit comments