diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 404aec9bff93..47709746993e 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -518,7 +518,8 @@ static void RunFPeriodically(std::function f, std::chrono::milliseconds int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; if (now_ms - 5 * period_ms.count() > last_heartbeat_ms) { - VLOG(1) << "This " << error_msg << " step took " << now_ms - last_heartbeat_ms << "ms"; + VLOG(1) << "This " << error_msg << " step was stalled for " << now_ms - last_heartbeat_ms + << "ms"; } f(); last_heartbeat_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index b35801864a56..fbb0f219fc80 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -171,6 +171,7 @@ void JournalStreamer::AsyncWrite(bool force_send) { v[i] = IoVec(io::Bytes(uptr, cur_buf.buf[i].size())); } + DVLOG(3) << "calling AsyncWrite with buff size:" << v.size(); dest_->AsyncWrite(v.data(), v.size(), [this, len = in_flight_bytes_](std::error_code ec) { OnCompletion(ec, len); }); } diff --git a/src/server/replica.cc b/src/server/replica.cc index aa5a1b65b590..b13832ef7d64 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -934,7 +934,7 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) { std::optional tx_data; while ((tx_data = tx_reader.NextTxData(&reader, cntx))) { - DVLOG(3) << "Lsn: " << tx_data->lsn; + DVLOG(3) << "Lsn: " << tx_data->lsn << " flowid: " << flow_id_; last_io_time_ = Proactor()->GetMonotonicTimeNs(); if (tx_data->opcode == journal::Op::LSN) { @@ -953,6 +953,7 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) { // inconsistent data because the replica will resume from the next // lsn of the master and this lsn entry will be lost. journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); + DVLOG(3) << "journal_rec_executed_: " << journal_rec_executed_ << " flowid: " << flow_id_; } } shard_replica_waker_.notifyAll(); @@ -996,7 +997,8 @@ void DflyShardReplica::StableSyncDflyAcksFb(ExecutionState* cntx) { // Handle ACKs with the master. PING opcodes from the master mean we should immediately // answer. current_offset = journal_rec_executed_.load(std::memory_order_relaxed); - VLOG(1) << "Sending an ACK with offset=" << current_offset << " forced=" << force_ping_; + VLOG(1) << "Sending an ACK with offset=" << current_offset << " forced=" << force_ping_ + << " flowid=" << flow_id_; ack_cmd = absl::StrCat("REPLCONF ACK ", current_offset); force_ping_ = false; next_ack_tp = std::chrono::steady_clock::now() + ack_time_max_interval; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 28245e110b52..b139896e59e8 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1263,9 +1263,7 @@ async def delayed_takeover(): @pytest.mark.exclude_epoll @pytest.mark.parametrize("master_threads, replica_threads", take_over_cases) -async def test_take_over_seeder( - request, df_factory, df_seeder_factory, master_threads, replica_threads -): +async def test_take_over_seeder(df_factory, df_seeder_factory, master_threads, replica_threads): master = df_factory.create( proactor_threads=master_threads, dbfilename=f"dump_{tmp_file_name()}", admin_port=ADMIN_PORT )