Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ void OutgoingMigration::Finish(const GenericError& error) {
switch (state_) {
case MigrationState::C_FATAL:
case MigrationState::C_FINISHED:
CloseSocket();
return; // Already finished, nothing else to do

case MigrationState::C_CONNECTING:
Expand All @@ -192,6 +193,9 @@ void OutgoingMigration::Finish(const GenericError& error) {
});
exec_st_.JoinErrorHandler();
}

// Close socket for clean disconnect.
CloseSocket();
}

MigrationState OutgoingMigration::GetState() const {
Expand Down Expand Up @@ -314,6 +318,7 @@ void OutgoingMigration::SyncFb() {
}

VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString();
CloseSocket();
}

bool OutgoingMigration::FinalizeMigration(long attempt) {
Expand Down
17 changes: 10 additions & 7 deletions src/server/protocol_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,22 @@ ProtocolClient::ProtocolClient(string host, uint16_t port) {
#ifdef DFLY_USE_SSL
MaybeInitSslCtx();
#endif

// We initialize the proactor thread here such that it never races with Sock().
// ProtocolClient is never migrated to a different thread, so this is safe.
socket_thread_ = ProactorBase::me();
}

ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::move(context)) {
#ifdef DFLY_USE_SSL
MaybeInitSslCtx();
#endif
socket_thread_ = ProactorBase::me();
}

ProtocolClient::~ProtocolClient() {
exec_st_.JoinErrorHandler();

// FIXME: We should close the socket explictly outside of the destructor. This currently
// breaks test_cancel_replication_immediately.
if (sock_) {
std::error_code ec;
sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); });
LOG_IF(ERROR, ec) << "Error closing socket " << ec;
}
#ifdef DFLY_USE_SSL
if (ssl_ctx_) {
SSL_CTX_free(ssl_ctx_);
Expand Down Expand Up @@ -162,6 +161,7 @@ error_code ProtocolClient::ResolveHostDns() {
error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms,
ExecutionState* cntx) {
ProactorBase* mythread = ProactorBase::me();
DCHECK(mythread == socket_thread_);
CHECK(mythread);
{
unique_lock lk(sock_mu_);
Expand Down Expand Up @@ -235,6 +235,9 @@ void ProtocolClient::CloseSocket() {
auto ec = sock_->Shutdown(SHUT_RDWR);
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
}
error_code ec = sock_->Close(); // Quietly close.

LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
});
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/server/protocol_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ProtocolClient {
}

auto* Proactor() const {
return sock_->proactor();
return socket_thread_;
}

util::FiberSocketBase* Sock() const {
Expand Down Expand Up @@ -142,6 +142,7 @@ class ProtocolClient {
#else
void* ssl_ctx_{nullptr};
#endif
util::fb2::ProactorBase* socket_thread_;
};

} // namespace dfly
Expand Down
58 changes: 27 additions & 31 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,13 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
sync_fb_.JoinIfNeeded();
DVLOG(1) << "MainReplicationFb stopped " << this;
acks_fb_.JoinIfNeeded();
for (auto& flow : shard_flows_) {
flow.reset();
}

proactor_->Await([this]() {
for (auto& flow : shard_flows_) {
flow.reset();
}
shard_flows_.clear();
});

if (last_journal_LSNs_.has_value()) {
return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()};
Expand Down Expand Up @@ -505,14 +509,23 @@ error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_mast
// Initialize MultiShardExecution.
multi_shard_exe_.reset(new MultiShardExecution());

// Initialize shard flows.
shard_flows_.resize(master_context_.num_flows);
DCHECK(!shard_flows_.empty());
for (unsigned i = 0; i < shard_flows_.size(); ++i) {
shard_flows_[i].reset(
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
}
thread_flow_map_ = Partition(shard_flows_.size());
// Initialize shard flows. The update to the shard_flows_ should be done by this thread.
// Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset()
// below.
decltype(shard_flows_) shard_flows_copy;
shard_flows_copy.resize(master_context_.num_flows);
DCHECK(!shard_flows_copy.empty());
thread_flow_map_ = Partition(shard_flows_copy.size());
const size_t pool_sz = shard_set->pool()->size();

shard_set->pool()->AwaitFiberOnAll([pool_sz, this, &shard_flows_copy](auto index, auto* ctx) {
for (unsigned i = index; i < shard_flows_copy.size(); i += pool_sz) {
shard_flows_copy[i].reset(
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
}
});
// now update shard_flows on proactor thread
shard_flows_ = std::move(shard_flows_copy);

// Blocked on until all flows got full sync cut.
BlockingCounter sync_block{unsigned(shard_flows_.size())};
Expand Down Expand Up @@ -542,13 +555,6 @@ error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_mast
JoinDflyFlows();
if (sync_type == "full") {
service_.RemoveLoadingState();
} else if (service_.IsLoadingExclusively()) {
// We need this check. We originally set the state unconditionally to LOADING
// when we call ReplicaOf command. If for some reason we fail to start full sync below
// or cancel the context, we still need to switch to ACTIVE state.
// TODO(kostasrim) we can remove this once my proposed changes for replication move forward
// as the state transitions for ReplicaOf command will be much clearer.
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
}
state_mask_ &= ~R_SYNCING;
last_journal_LSNs_.reset();
Expand Down Expand Up @@ -1178,6 +1184,7 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d

auto Replica::GetSummary() const -> Summary {
auto f = [this]() {
DCHECK(Proactor() == ProactorBase::me());
auto last_io_time = LastIoTime();

// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
Expand Down Expand Up @@ -1214,25 +1221,14 @@ auto Replica::GetSummary() const -> Summary {
return res;
};

if (Sock())
return Proactor()->AwaitBrief(f);

/**
* when this branch happens: there is a very short grace period
* where Sock() is not initialized, yet the server can
* receive ROLE/INFO commands. That period happens when launching
* an instance with '--replicaof' and then immediately
* sending a command.
*
* In that instance, we have to run f() on the current fiber.
*/
return f();
return Proactor()->AwaitBrief(f);
}

std::vector<uint64_t> Replica::GetReplicaOffset() const {
std::vector<uint64_t> flow_rec_count;
flow_rec_count.resize(shard_flows_.size());
for (const auto& flow : shard_flows_) {
DCHECK(flow.get());
uint32_t flow_id = flow->FlowId();
uint64_t rec_count = flow->JournalExecutedCount();
DCHECK_LT(flow_id, shard_flows_.size());
Expand Down
Loading
Loading