@@ -149,7 +149,7 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
149149 RD_LOGE (trace_id, " repl dev is not ready, stage={}" , static_cast < int >(get_stage ()));
150150 return make_async_error<>(ReplServiceError::UNREADY_STATE);
151151 }
152- incr_pending_request_num ( );
152+ init_req_counter counter (pending_request_num );
153153
154154 RD_LOGI (trace_id, " Start replace member, task_id={}, member_out={} member_in={}" , task_id,
155155 boost::uuids::to_string (member_out.id ), boost::uuids::to_string (member_in.id ));
@@ -161,7 +161,6 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
161161 if (!existing_task_id.empty () && existing_task_id != task_id) {
162162 RD_LOGE (trace_id, " Step1. Replace member, task_id={} is not the same as existing task_id={}" , task_id,
163163 existing_task_id);
164- decr_pending_request_num ();
165164 return make_async_error<>(ReplServiceError::REPLACE_MEMBER_TASK_MISMATCH);
166165 }
167166
@@ -173,15 +172,12 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
173172 " Step1. Replace member, the intent has already been fulfilled, ignore it, task_id={}, "
174173 " member_out={} member_in={}" ,
175174 task_id, boost::uuids::to_string (member_out.id ), boost::uuids::to_string (member_in.id ));
176- decr_pending_request_num ();
177175 return make_async_success<>();
178176 }
179177 RD_LOGE (trace_id, " Step1. Replace member invalid parameter, out member is not found, task_id={}" , task_id);
180- decr_pending_request_num ();
181178 return make_async_error<>(ReplServiceError::SERVER_NOT_FOUND);
182179 }
183180 if (m_my_repl_id != get_leader_id ()) {
184- decr_pending_request_num ();
185181 return make_async_error<>(ReplServiceError::NOT_LEADER);
186182 }
187183 // Check if leader itself is requested to move out.
@@ -191,7 +187,6 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
191187 // client retry.
192188 raft_server ()->yield_leadership (false /* immediate */ , -1 /* successor */ );
193189 RD_LOGI (trace_id, " Step1. Replace member, leader is the member_out so yield leadership, task_id={}" , task_id);
194- decr_pending_request_num ();
195190 return make_async_error<>(ReplServiceError::NOT_LEADER);
196191 }
197192 // quorum safety check. TODO currently only consider lsn, need to check last response time.
@@ -215,7 +210,6 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
215210 " Step1. Replace member, quorum safety check failed, active_peers={}, "
216211 " active_peers_exclude_out/in_member={}, required_quorum={}, commit_quorum={}, task_id={}" ,
217212 active_peers.size (), active_num, quorum, commit_quorum, task_id);
218- decr_pending_request_num ();
219213 return make_async_error<>(ReplServiceError::QUORUM_NOT_MET);
220214 }
221215
@@ -237,7 +231,6 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
237231 RD_LOGE (trace_id, " Step2. Replace member, failed to flip out member to learner {}, task_id={}" , learner_ret,
238232 task_id);
239233 reset_quorum_size (0 , trace_id);
240- decr_pending_request_num ();
241234 return make_async_error (std::move (learner_ret));
242235 }
243236 RD_LOGI (trace_id, " Step2. Replace member, flip out member to learner and set priority to 0, task_id={}" , task_id);
@@ -261,7 +254,6 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
261254 if (status != ReplServiceError::OK) {
262255 RD_LOGE (trace_id, " Initializing rreq failed, rreq=[{}], error={}" , rreq->to_string (), status);
263256 reset_quorum_size (0 , trace_id);
264- decr_pending_request_num ();
265257 return make_async_error<>(ReplServiceError::RETRY_REQUEST);
266258 }
267259
@@ -274,7 +266,6 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
274266 " Step3. Replace member, propose to raft for HS_CTRL_START_REPLACE req failed, task_id={}, err={}" ,
275267 task_id, err);
276268 reset_quorum_size (0 , trace_id);
277- decr_pending_request_num ();
278269 return make_async_error<>(std::move (err));
279270 }
280271
@@ -293,13 +284,11 @@ AsyncReplResult<> RaftReplDev::start_replace_member(std::string& task_id, const
293284 if (ret != ReplServiceError::OK) {
294285 RD_LOGE (trace_id, " Step4. Replace member, add member failed, err={}, task_id={}" , ret, task_id);
295286 reset_quorum_size (0 , trace_id);
296- decr_pending_request_num ();
297287 return make_async_error<>(std::move (ret));
298288 }
299289 RD_LOGI (trace_id, " Step4. Replace member, proposed to raft to add member, task_id={}, member={}" , task_id,
300290 boost::uuids::to_string (member_in.id ));
301291 reset_quorum_size (0 , trace_id);
302- decr_pending_request_num ();
303292 return make_async_success<>();
304293}
305294
@@ -314,7 +303,7 @@ AsyncReplResult<> RaftReplDev::complete_replace_member(std::string& task_id, con
314303 RD_LOGE (trace_id, " repl dev is not ready, stage={}" , static_cast < int >(get_stage ()));
315304 return make_async_error<>(ReplServiceError::UNREADY_STATE);
316305 }
317- incr_pending_request_num ( );
306+ init_req_counter counter (pending_request_num );
318307
319308 RD_LOGI (trace_id, " Complete replace member, task_id={}, member_out={}, member_in={}" , task_id,
320309 boost::uuids::to_string (member_out.id ), boost::uuids::to_string (member_in.id ));
@@ -338,7 +327,6 @@ AsyncReplResult<> RaftReplDev::complete_replace_member(std::string& task_id, con
338327 RD_LOGE (trace_id, " Step5. Replace member, failed to remove member, task_id={}, member={}, err={}" , task_id,
339328 boost::uuids::to_string (member_out.id ), ret);
340329 reset_quorum_size (0 , trace_id);
341- decr_pending_request_num ();
342330 return make_async_error<>(std::move (ret));
343331 }
344332 RD_LOGI (trace_id, " Step5. Replace member, proposed to raft to remove member, task_id={}, member={}" , task_id,
@@ -386,7 +374,6 @@ AsyncReplResult<> RaftReplDev::complete_replace_member(std::string& task_id, con
386374 if (status != ReplServiceError::OK) {
387375 RD_LOGE (trace_id, " Initializing rreq failed, rreq=[{}], error={}" , rreq->to_string (), status);
388376 reset_quorum_size (0 , trace_id);
389- decr_pending_request_num ();
390377 return make_async_error<>(ReplServiceError::RETRY_REQUEST);
391378 }
392379
@@ -399,12 +386,10 @@ AsyncReplResult<> RaftReplDev::complete_replace_member(std::string& task_id, con
399386 " Step6. Replace member, propose to raft for HS_CTRL_COMPLETE_REPLACE req failed , task_id={}, err={}" ,
400387 task_id, err);
401388 reset_quorum_size (0 , trace_id);
402- decr_pending_request_num ();
403389 return make_async_error<>(std::move (err));
404390 }
405391
406392 reset_quorum_size (0 , trace_id);
407- decr_pending_request_num ();
408393 RD_LOGI (trace_id, " Complete replace member done, group_id={}, task_id={}, member_out={} member_in={}" ,
409394 group_id_str (), task_id, boost::uuids::to_string (member_out.id ), boost::uuids::to_string (member_in.id ));
410395 return make_async_success<>();
@@ -418,10 +403,9 @@ ReplaceMemberStatus RaftReplDev::get_replace_member_status(std::string& task_id,
418403 RD_LOGI (trace_id, " repl dev is being shutdown!" );
419404 return ReplaceMemberStatus::UNKNOWN;
420405 }
421- incr_pending_request_num ( );
406+ init_req_counter counter (pending_request_num );
422407
423408 if (!m_repl_svc_ctx || !is_leader ()) {
424- decr_pending_request_num ();
425409 return ReplaceMemberStatus::NOT_LEADER;
426410 }
427411
@@ -458,19 +442,15 @@ ReplaceMemberStatus RaftReplDev::get_replace_member_status(std::string& task_id,
458442 " others.size={}, "
459443 " all_peers.size={}" ,
460444 task_id, detail, others.size (), peers.size ());
461- decr_pending_request_num ();
462445 return ReplaceMemberStatus::UNKNOWN;
463446 }
464- decr_pending_request_num ();
465447 return ReplaceMemberStatus::COMPLETED;
466448 }
467- decr_pending_request_num ();
468449 return ReplaceMemberStatus::TASK_NOT_FOUND;
469450 }
470451 if (m_rd_sb->replace_member_task .task_id != task_id) {
471452 RD_LOGE (trace_id, " get_replace_member_status failed, task_id mismatch, persisted={}, received={}" ,
472453 persisted_task_id, task_id);
473- decr_pending_request_num ();
474454 return ReplaceMemberStatus::TASK_ID_MISMATCH;
475455 }
476456 // If the first attempt to remove out_member fails because out_member is down or leader crashes between Step5(remove
@@ -485,7 +465,6 @@ ReplaceMemberStatus RaftReplDev::get_replace_member_status(std::string& task_id,
485465 }
486466 RD_LOGD (trace_id, " Member replacement is in progress. task_id={}, out_member={}, in_member={}" , task_id,
487467 boost::uuids::to_string (member_out.id ), boost::uuids::to_string (member_in.id ));
488- decr_pending_request_num ();
489468 return ReplaceMemberStatus::IN_PROGRESS;
490469}
491470
@@ -564,7 +543,7 @@ AsyncReplResult<> RaftReplDev::flip_learner_flag(const replica_member_info& memb
564543 RD_LOGI (trace_id, " repl dev is being shutdown!" );
565544 return make_async_error<>(ReplServiceError::STOPPING);
566545 }
567- incr_pending_request_num ( );
546+ init_req_counter counter (pending_request_num );
568547
569548 if (commit_quorum >= 1 ) {
570549 // Two members are down and leader cant form the quorum. Reduce the quorum size.
@@ -574,7 +553,6 @@ AsyncReplResult<> RaftReplDev::flip_learner_flag(const replica_member_info& memb
574553 if (ret != ReplServiceError::OK) {
575554 RD_LOGE (trace_id, " Flip learner flag failed {}, member={}" , ret, boost::uuids::to_string (member.id ));
576555 reset_quorum_size (0 , trace_id);
577- decr_pending_request_num ();
578556 return make_async_error<>(std::move (ret));
579557 }
580558 RD_LOGI (trace_id, " Learner flag has been set to {}, member={}" , target, boost::uuids::to_string (member.id ));
@@ -738,7 +716,7 @@ void RaftReplDev::on_create_snapshot(nuraft::snapshot& s, nuraft::async_result<
738716}
739717
740718void RaftReplDev::propose_truncate_boundary () {
741- incr_pending_request_num ( );
719+ init_req_counter counter (pending_request_num );
742720 auto repl_status = get_replication_status ();
743721 repl_lsn_t leader_commit_idx = m_commit_upto_lsn.load ();
744722 repl_lsn_t minimum_repl_idx = leader_commit_idx;
@@ -770,7 +748,6 @@ void RaftReplDev::propose_truncate_boundary() {
770748 if (status != ReplServiceError::OK) {
771749 RD_LOGE (std::numeric_limits< uint64_t >::max (), " Initializing rreq failed, rreq=[{}], error={}" ,
772750 rreq->to_string (), status);
773- decr_pending_request_num ();
774751 return ;
775752 }
776753
@@ -784,7 +761,6 @@ void RaftReplDev::propose_truncate_boundary() {
784761 RD_LOGW (NO_TRACE_ID, " propose to raft for HS_CTRL_UPDATE_TRUNCATION_BOUNDARY req failed, err={}" , err);
785762 }
786763 }
787- decr_pending_request_num ();
788764}
789765
790766// 1 before repl_dev.stop() is called, the upper layer should make sure that there is no pending request. so graceful
@@ -1714,15 +1690,13 @@ AsyncReplResult<> RaftReplDev::become_leader() {
17141690 LOGINFO (" repl dev is being shutdown!" );
17151691 return make_async_error<>(ReplServiceError::STOPPING);
17161692 }
1717- incr_pending_request_num ( );
1693+ init_req_counter counter (pending_request_num );
17181694
1719- return m_msg_mgr.become_leader (m_group_id).via (&folly::InlineExecutor::instance ()).thenValue ([this ](auto && e) {
1695+ return m_msg_mgr.become_leader (m_group_id).via (&folly::InlineExecutor::instance ()).thenValue ([this , counter = std::move (counter) ](auto && e) {
17201696 if (e.hasError ()) {
17211697 RD_LOGE (NO_TRACE_ID, " Error in becoming leader: {}" , e.error ());
1722- decr_pending_request_num ();
17231698 return make_async_error<>(RaftReplService::to_repl_error (e.error ()));
17241699 }
1725- decr_pending_request_num ();
17261700 return make_async_success<>();
17271701 });
17281702}
0 commit comments