@@ -85,6 +85,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
85
85
shard->journal ()->RecordEntry (0 , journal::Op::PING, 0 , 0 , nullopt , {});
86
86
87
87
const FlowInfo* flow = &replica->flows [shard->shard_id ()];
88
+
88
89
while (flow->last_acked_lsn < shard->journal ()->GetLsn ()) {
89
90
if (absl::Now () > end_time) {
90
91
LOG (WARNING) << " Couldn't synchronize with replica for takeover in time: " << replica->address
@@ -95,8 +96,9 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
95
96
if (!replica->exec_st .IsRunning ()) {
96
97
return false ;
97
98
}
98
- VLOG (1 ) << " Replica lsn:" << flow->last_acked_lsn
99
- << " master lsn:" << shard->journal ()->GetLsn ();
99
+ LOG_EVERY_T (INFO, 1 ) << " Replica lsn:" << flow->last_acked_lsn
100
+ << " master lsn:" << shard->journal ()->GetLsn ()
101
+ << " ; Journal streamer state: " << flow->streamer ->FormatInternalState ();
100
102
ThisFiber::SleepFor (1ms);
101
103
}
102
104
@@ -512,7 +514,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext
512
514
atomic_bool catchup_success = true ;
513
515
if (*status == OpStatus::OK) {
514
516
dfly::SharedLock lk{replica_ptr->shared_mu };
515
- auto cb = [replica_ptr, end_time, &catchup_success](EngineShard* shard) {
517
+ auto cb = [replica_ptr = replica_ptr , end_time, &catchup_success](EngineShard* shard) {
516
518
if (!WaitReplicaFlowToCatchup (end_time, replica_ptr.get (), shard)) {
517
519
catchup_success.store (false );
518
520
}
0 commit comments