@@ -934,7 +934,7 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) {
934
934
935
935
std::optional<TransactionData> tx_data;
936
936
while ((tx_data = tx_reader.NextTxData (&reader, cntx))) {
937
- DVLOG (3 ) << " Lsn: " << tx_data->lsn ;
937
+ DVLOG (3 ) << " Lsn: " << tx_data->lsn << " flowid: " << flow_id_ ;
938
938
939
939
last_io_time_ = Proactor ()->GetMonotonicTimeNs ();
940
940
if (tx_data->opcode == journal::Op::LSN) {
@@ -953,6 +953,7 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) {
953
953
// inconsistent data because the replica will resume from the next
954
954
// lsn of the master and this lsn entry will be lost.
955
955
journal_rec_executed_.fetch_add (1 , std::memory_order_relaxed);
956
+ DVLOG (3 ) << " journal_rec_executed_: " << journal_rec_executed_ << " flowid: " << flow_id_;
956
957
}
957
958
}
958
959
shard_replica_waker_.notifyAll ();
@@ -996,7 +997,8 @@ void DflyShardReplica::StableSyncDflyAcksFb(ExecutionState* cntx) {
996
997
// Handle ACKs with the master. PING opcodes from the master mean we should immediately
997
998
// answer.
998
999
current_offset = journal_rec_executed_.load (std::memory_order_relaxed);
999
- VLOG (1 ) << " Sending an ACK with offset=" << current_offset << " forced=" << force_ping_;
1000
+ VLOG (1 ) << " Sending an ACK with offset=" << current_offset << " forced=" << force_ping_
1001
+ << " flowid=" << flow_id_;
1000
1002
ack_cmd = absl::StrCat (" REPLCONF ACK " , current_offset);
1001
1003
force_ping_ = false ;
1002
1004
next_ack_tp = std::chrono::steady_clock::now () + ack_time_max_interval;
0 commit comments