@@ -146,6 +146,12 @@ class TestReplicatedDB : public homestore::ReplDevListener {
146146 void on_rollback (int64_t lsn, const sisl::blob& header, const sisl::blob& key,
147147 cintrusive< repl_req_ctx >& ctx) override {
148148 LOGINFOMOD (replication, " [Replica={}] Received rollback on lsn={}" , g_helper->replica_num (), lsn);
149+ {
150+ std::unique_lock lk (db_mtx_);
151+ rollback_count_++;
152+ }
153+ // continue the test
154+ if (ctx->is_proposer ()) { g_helper->runner ().next_task (); }
149155 }
150156
151157 void on_error (ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
@@ -224,6 +230,11 @@ class TestReplicatedDB : public homestore::ReplDevListener {
224230 return commit_count_;
225231 }
226232
233+ uint64_t db_rollback_count () const {
234+ std::shared_lock lk (db_mtx_);
235+ return rollback_count_;
236+ }
237+
227238 uint64_t db_size () const {
228239 std::shared_lock lk (db_mtx_);
229240 return inmem_db_.size ();
@@ -232,6 +243,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
232243private:
233244 std::map< Key, Value > inmem_db_;
234245 uint64_t commit_count_{0 };
246+ uint64_t rollback_count_{0 };
235247 std::shared_mutex db_mtx_;
236248};
237249
@@ -273,6 +285,22 @@ class RaftReplDevTest : public testing::Test {
273285
274286 void wait_for_all_commits () { wait_for_commits (written_entries_); }
275287
288+ uint64_t total_committed_cnt () {
289+ uint64_t total_writes{0 };
290+ for (auto const & db : dbs_) {
291+ total_writes += db->db_commit_count ();
292+ }
293+ return total_writes;
294+ }
295+
296+ uint64_t total_rollback_cnt () {
297+ uint64_t total_rollback{0 };
298+ for (auto const & db : dbs_) {
299+ total_rollback += db->db_rollback_count ();
300+ }
301+ return total_rollback;
302+ }
303+
276304 void wait_for_commits (uint64_t exp_writes) {
277305 uint64_t total_writes{0 };
278306 while (true ) {
@@ -346,27 +374,47 @@ class RaftReplDevTest : public testing::Test {
346374 LOGINFO (" Waiting for leader to be elected" );
347375 std::this_thread::sleep_for (std::chrono::milliseconds{500 });
348376 } else if (leader_uuid == g_helper->my_replica_id ()) {
349- LOGINFO (" Writing {} entries since I am the leader my_uuid={}" , num_entries,
350- boost::uuids::to_string (g_helper->my_replica_id ()));
377+ // LEADER ROLE
378+ auto batch_size = wait_for_commit ? g_helper->runner ().qdepth_ * 10 : num_entries;
379+ // cap batch_size but should be larger than QD.
380+ // It is possible after leader switch the writes run on previous leader will fail
381+ // so we need to do more IOs to have num_entries committed.
382+ if (batch_size > num_entries - written_entries_)
383+ batch_size = std::max (num_entries - written_entries_, g_helper->runner ().qdepth_ );
384+ LOGINFO (" Writing {} entries since I am the leader my_uuid={}, target_total {}, written {}" , batch_size,
385+ boost::uuids::to_string (g_helper->my_replica_id ()), num_entries, written_entries_);
351386 auto const block_size = SISL_OPTIONS[" block_size" ].as < uint32_t >();
352- g_helper->runner ().set_num_tasks (num_entries);
353-
387+ g_helper->runner ().set_num_tasks (batch_size);
354388 LOGINFO (" Run on worker threads to schedule append on repldev for {} Bytes." , block_size);
355389 g_helper->runner ().set_task ([this , block_size, db]() {
356390 static std::normal_distribution<> num_blks_gen{3.0 , 2.0 };
357391 this ->generate_writes (std::abs (std::round (num_blks_gen (g_re))) * block_size, block_size, db);
358392 });
359- if (wait_for_commit) { g_helper->runner ().execute ().get (); }
360- break ;
393+ written_entries_ += batch_size;
394+ if (wait_for_commit) {
395+ g_helper->runner ().execute ().get ();
396+ if (total_committed_cnt () >= num_entries) { break ; }
397+ } else {
398+ if (written_entries_ >= num_entries) { break ; }
399+ }
361400 } else {
362- LOGINFO (" {} entries were written on the leader_uuid={} my_uuid={}" , num_entries,
401+ // FOLLOWER ROLE
402+ LOGINFO (" {} entries are expected to be written on the leader_uuid={}, my_uuid={}" , num_entries,
363403 boost::uuids::to_string (leader_uuid), boost::uuids::to_string (g_helper->my_replica_id ()));
364- break ;
404+ if (wait_for_commit) {
405+ LOGINFO (" {} entries are expected to be written, now I committed {}, my_uuid={}" , num_entries,
406+ total_committed_cnt (), boost::uuids::to_string (leader_uuid),
407+ boost::uuids::to_string (g_helper->my_replica_id ()));
408+ if (total_committed_cnt () >= num_entries) { break ; }
409+ std::this_thread::sleep_for (std::chrono::milliseconds{5000 });
410+ } else {
411+ break ;
412+ }
365413 }
366414 } while (true );
367-
368- written_entries_ += num_entries;
369- if (wait_for_commit) { this -> wait_for_all_commits (); }
415+ LOGINFO ( " my_uuid={}, {} entries are expected to be written, I wrote {}, committed {}, rollback {} " ,
416+ boost::uuids::to_string (g_helper-> my_replica_id ()), num_entries, written_entries_,
417+ total_committed_cnt (), total_rollback_cnt ());
370418 }
371419
372420 void remove_db (std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) {
0 commit comments