Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ void OutgoingMigration::Finish(const GenericError& error) {
}

bool should_cancel_flows = false;
absl::Cleanup on_exit([this]() { CloseSocket(); });
{
util::fb2::LockGuard lk(state_mu_);
switch (state_) {
Expand Down Expand Up @@ -313,6 +314,8 @@ void OutgoingMigration::SyncFb() {
break;
}

CloseSocket();

VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString();
}

Expand Down
16 changes: 7 additions & 9 deletions src/server/protocol_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ ProtocolClient::ProtocolClient(string host, uint16_t port) {
MaybeInitSslCtx();
#endif
}

ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::move(context)) {
#ifdef DFLY_USE_SSL
MaybeInitSslCtx();
Expand All @@ -119,13 +120,6 @@ ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::mov
ProtocolClient::~ProtocolClient() {
exec_st_.JoinErrorHandler();

// FIXME: We should close the socket explictly outside of the destructor. This currently
Copy link
Contributor Author

@kostasrim kostasrim Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After spending a few hours on this, I still don't understand why, if we keep this code, we get a segfault on the destructor of std::shared_ptr<Replica>. It seems that it happens during the preemption but the sock_ resources are already deallocated so Close() should early return because fd_ < 0.

What is more, the core dump shows that tl_replica and its copy, have a different ref counted object because one shows that it is expired and the other one having a ref count of 7. I added CHECK() before the crash to make sure that both copies of the shared_ptr point to the exact same control block. The checks passed yet the core dump showed otherwise which makes me think that this is somehow a memory corruption error.

The good thing is that we don't need this code anymore, as we handle closing the socket outside of the descturctor now.

While writing this, the only case I can think of is that the last instance of tl_replica gets destructed, but it needs to preempt and and info command comes in and grabs a copy while the shared_ptr is destructing which could lead to a race condition.

I will verify rthis theory once I am back from the holidays.

ps. the test that failed test_cancel_replication_immediately (and every 300 runs so its kinda time consuming to reproduce)

// breaks test_cancel_replication_immediately.
if (sock_) {
std::error_code ec;
sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); });
LOG_IF(ERROR, ec) << "Error closing socket " << ec;
}
#ifdef DFLY_USE_SSL
if (ssl_ctx_) {
SSL_CTX_free(ssl_ctx_);
Expand Down Expand Up @@ -163,6 +157,7 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time
ExecutionState* cntx) {
ProactorBase* mythread = ProactorBase::me();
CHECK(mythread);
socket_thread_ = ProactorBase::me();
{
unique_lock lk(sock_mu_);
// The context closes sock_. So if the context error handler has already
Expand Down Expand Up @@ -235,6 +230,9 @@ void ProtocolClient::CloseSocket() {
auto ec = sock_->Shutdown(SHUT_RDWR);
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
}
auto ec = sock_->Close(); // Quietly close.

LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
});
}
}
Expand Down Expand Up @@ -385,11 +383,11 @@ void ProtocolClient::ResetParser(RedisParser::Mode mode) {
}

uint64_t ProtocolClient::LastIoTime() const {
return last_io_time_;
return last_io_time_.load(std::memory_order_relaxed);
}

void ProtocolClient::TouchIoTime() {
last_io_time_ = Proactor()->GetMonotonicTimeNs();
last_io_time_.store(Proactor()->GetMonotonicTimeNs(), std::memory_order_relaxed);
}

} // namespace dfly
6 changes: 4 additions & 2 deletions src/server/protocol_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class ProtocolClient {
}

auto* Proactor() const {
return sock_->proactor();
DCHECK(socket_thread_);
return socket_thread_;
}

util::FiberSocketBase* Sock() const {
Expand All @@ -132,7 +133,7 @@ class ProtocolClient {
std::string last_cmd_;
std::string last_resp_;

uint64_t last_io_time_ = 0; // in ns, monotonic clock.
std::atomic<uint64_t> last_io_time_ = 0; // in ns, monotonic clock.

#ifdef DFLY_USE_SSL

Expand All @@ -142,6 +143,7 @@ class ProtocolClient {
#else
void* ssl_ctx_{nullptr};
#endif
util::fb2::ProactorBase* socket_thread_;
};

} // namespace dfly
Expand Down
73 changes: 37 additions & 36 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ ABSL_DECLARE_FLAG(uint16_t, announce_port);
ABSL_FLAG(
int, replica_priority, 100,
"Published by info command for sentinel to pick replica based on score during a failover");
ABSL_FLAG(bool, experimental_replicaof_v2, true,
ABSL_FLAG(bool, experimental_replicaof_v2, false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will revert back to true, just want to make sure I did not break anything in case we want to switch back to the old implemntation

"Use ReplicaOfV2 algorithm for initiating replication");

namespace dfly {
Expand Down Expand Up @@ -152,6 +152,8 @@ void Replica::StartMainReplicationFiber(std::optional<LastMasterSyncData> last_m
void Replica::EnableReplication() {
VLOG(1) << "Enabling replication";

socket_thread_ = ProactorBase::me();

state_mask_ = R_ENABLED; // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this, nullopt); // call replication fiber
}
Expand All @@ -170,9 +172,17 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
sync_fb_.JoinIfNeeded();
DVLOG(1) << "MainReplicationFb stopped " << this;
acks_fb_.JoinIfNeeded();
for (auto& flow : shard_flows_) {
flow.reset();
}

proactor_->Await([this]() {
// Destructor is blocking, so other fibers can observe partial state
// of flows during clean up. To avoid this, we move them and clear the
// member before the preemption point
auto shard_flows = std::move(shard_flows_);
shard_flows_.clear();
for (auto& flow : shard_flows) {
flow.reset();
}
});

if (last_journal_LSNs_.has_value()) {
return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()};
Expand Down Expand Up @@ -501,18 +511,12 @@ error_code Replica::InitiatePSync() {
return error_code{};
}

// Initialize and start sub-replica for each flow.
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
auto start_time = absl::Now();

// Initialize MultiShardExecution.
multi_shard_exe_.reset(new MultiShardExecution());

// Initialize shard flows.
void Replica::InitializeShardFlows() {
shard_flows_.resize(master_context_.num_flows);
DCHECK(!shard_flows_.empty());
for (unsigned i = 0; i < shard_flows_.size(); ++i) {
// Transfer LSN state for partial sync
thread_flow_map_ = Partition(shard_flows_.size());

for (size_t i = 0; i < shard_flows_.size(); ++i) {
uint64_t partial_sync_lsn = 0;
if (shard_flows_[i]) {
partial_sync_lsn = shard_flows_[i]->JournalExecutedCount();
Expand All @@ -523,7 +527,19 @@ error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_mast
shard_flows_[i]->SetRecordsExecuted(partial_sync_lsn);
}
}
thread_flow_map_ = Partition(shard_flows_.size());
}

// Initialize and start sub-replica for each flow.
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
auto start_time = absl::Now();

// Initialize MultiShardExecution.
multi_shard_exe_.reset(new MultiShardExecution());

// Initialize shard flows. The update to the shard_flows_ should be done by this thread.
// Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset()
// below.
InitializeShardFlows();

// Blocked on until all flows got full sync cut.
BlockingCounter sync_block{unsigned(shard_flows_.size())};
Expand Down Expand Up @@ -754,11 +770,6 @@ error_code Replica::ConsumeDflyStream() {
};
RETURN_ON_ERR(exec_st_.SwitchErrorHandler(std::move(err_handler)));

size_t total_flows_to_finish_partial = 0;
for (const auto& flow : thread_flow_map_) {
total_flows_to_finish_partial += flow.size();
}

LOG(INFO) << "Transitioned into stable sync";
// Transition flows into stable sync.
{
Expand Down Expand Up @@ -1210,11 +1221,12 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d

auto Replica::GetSummary() const -> Summary {
auto f = [this]() {
DCHECK(this);
auto last_io_time = LastIoTime();

// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
// it's unlikely to cause a real bug.
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
for (const auto& flow : shard_flows_) {
DCHECK(Proactor() == ProactorBase::me());
DCHECK(flow);
last_io_time = std::max(last_io_time, flow->LastIoTime());
}

Expand Down Expand Up @@ -1246,25 +1258,14 @@ auto Replica::GetSummary() const -> Summary {
return res;
};

if (Sock())
return Proactor()->AwaitBrief(f);

/**
* when this branch happens: there is a very short grace period
* where Sock() is not initialized, yet the server can
* receive ROLE/INFO commands. That period happens when launching
* an instance with '--replicaof' and then immediately
* sending a command.
*
* In that instance, we have to run f() on the current fiber.
*/
return f();
return Proactor()->AwaitBrief(f);
}

std::vector<uint64_t> Replica::GetReplicaOffset() const {
std::vector<uint64_t> flow_rec_count;
flow_rec_count.resize(shard_flows_.size());
for (const auto& flow : shard_flows_) {
DCHECK(flow.get());
uint32_t flow_id = flow->FlowId();
uint64_t rec_count = flow->JournalExecutedCount();
DCHECK_LT(flow_id, shard_flows_.size());
Expand Down
2 changes: 2 additions & 0 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ class Replica : ProtocolClient {
size_t GetRecCountExecutedPerShard(const std::vector<unsigned>& indexes) const;

private:
void InitializeShardFlows();

util::fb2::ProactorBase* proactor_ = nullptr;
Service& service_;
MasterContext master_context_;
Expand Down
Loading
Loading