Skip to content

Commit 7cee991

Browse files
committed
chore: lock free info command with replicaof v2
Signed-off-by: kostas <[email protected]>
1 parent 7abc6a9 commit 7cee991

File tree

7 files changed

+180
-44
lines changed

7 files changed

+180
-44
lines changed

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ void OutgoingMigration::Finish(const GenericError& error) {
166166
switch (state_) {
167167
case MigrationState::C_FATAL:
168168
case MigrationState::C_FINISHED:
169+
CloseSocket();
169170
return; // Already finished, nothing else to do
170171

171172
case MigrationState::C_CONNECTING:
@@ -192,6 +193,9 @@ void OutgoingMigration::Finish(const GenericError& error) {
192193
});
193194
exec_st_.JoinErrorHandler();
194195
}
196+
197+
// Close socket for clean disconnect.
198+
CloseSocket();
195199
}
196200

197201
MigrationState OutgoingMigration::GetState() const {

src/server/protocol_client.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,16 @@ ProtocolClient::ProtocolClient(string host, uint16_t port) {
109109
#ifdef DFLY_USE_SSL
110110
MaybeInitSslCtx();
111111
#endif
112+
// We initialize the proactor thread here such that it never races with Sock().
113+
// ProtocolClient is never migrated to a different thread, so this is safe.
114+
socket_thread_ = ProactorBase::me();
112115
}
116+
113117
ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::move(context)) {
114118
#ifdef DFLY_USE_SSL
115119
MaybeInitSslCtx();
116120
#endif
121+
socket_thread_ = ProactorBase::me();
117122
}
118123

119124
ProtocolClient::~ProtocolClient() {
@@ -162,6 +167,7 @@ error_code ProtocolClient::ResolveHostDns() {
162167
error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms,
163168
ExecutionState* cntx) {
164169
ProactorBase* mythread = ProactorBase::me();
170+
DCHECK(mythread == socket_thread_);
165171
CHECK(mythread);
166172
{
167173
unique_lock lk(sock_mu_);
@@ -235,6 +241,9 @@ void ProtocolClient::CloseSocket() {
235241
auto ec = sock_->Shutdown(SHUT_RDWR);
236242
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
237243
}
244+
auto ec = sock_->Close(); // Quietly close.
245+
246+
LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
238247
});
239248
}
240249
}

src/server/protocol_client.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ class ProtocolClient {
4545
uint64_t LastIoTime() const;
4646
void TouchIoTime();
4747

48+
// Used to set the socket_thread_ prior the initialization of sock_.
49+
// That way Proactor() returns the right thread even when the sock_ is
50+
// not yet initialized
51+
void SetSocketThread(util::fb2::ProactorBase* sock_thread) {
52+
socket_thread_ = sock_thread;
53+
}
54+
4855
protected:
4956
struct ServerContext {
5057
std::string host;
@@ -107,7 +114,7 @@ class ProtocolClient {
107114
}
108115

109116
auto* Proactor() const {
110-
return sock_->proactor();
117+
return socket_thread_;
111118
}
112119

113120
util::FiberSocketBase* Sock() const {
@@ -142,6 +149,7 @@ class ProtocolClient {
142149
#else
143150
void* ssl_ctx_{nullptr};
144151
#endif
152+
util::fb2::ProactorBase* socket_thread_;
145153
};
146154

147155
} // namespace dfly

src/server/replica.cc

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,12 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
170170
sync_fb_.JoinIfNeeded();
171171
DVLOG(1) << "MainReplicationFb stopped " << this;
172172
acks_fb_.JoinIfNeeded();
173-
for (auto& flow : shard_flows_) {
174-
flow.reset();
175-
}
173+
174+
proactor_->Await([this]() {
175+
for (auto& flow : shard_flows_) {
176+
flow.reset();
177+
}
178+
});
176179

177180
if (last_journal_LSNs_.has_value()) {
178181
return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()};
@@ -501,29 +504,45 @@ error_code Replica::InitiatePSync() {
501504
return error_code{};
502505
}
503506

504-
// Initialize and start sub-replica for each flow.
505-
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
506-
auto start_time = absl::Now();
507-
508-
// Initialize MultiShardExecution.
509-
multi_shard_exe_.reset(new MultiShardExecution());
507+
void Replica::InitializeShardFlows() {
508+
decltype(shard_flows_) shard_flows_copy;
509+
shard_flows_copy.resize(master_context_.num_flows);
510+
DCHECK(!shard_flows_copy.empty());
511+
thread_flow_map_ = Partition(shard_flows_copy.size());
512+
const size_t pool_sz = shard_set->pool()->size();
510513

511-
// Initialize shard flows.
512-
shard_flows_.resize(master_context_.num_flows);
513-
DCHECK(!shard_flows_.empty());
514-
for (unsigned i = 0; i < shard_flows_.size(); ++i) {
515-
// Transfer LSN state for partial sync
514+
for (size_t i = 0; i < shard_flows_copy.size(); ++i) {
516515
uint64_t partial_sync_lsn = 0;
517-
if (shard_flows_[i]) {
516+
if (!shard_flows_.empty() && shard_flows_[i]) {
518517
partial_sync_lsn = shard_flows_[i]->JournalExecutedCount();
519518
}
520-
shard_flows_[i].reset(
519+
shard_flows_copy[i].reset(
521520
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
522521
if (partial_sync_lsn > 0) {
523522
shard_flows_[i]->SetRecordsExecuted(partial_sync_lsn);
524523
}
525524
}
526-
thread_flow_map_ = Partition(shard_flows_.size());
525+
526+
shard_set->pool()->AwaitFiberOnAll([pool_sz, this, &shard_flows_copy](auto index, auto* ctx) {
527+
for (unsigned i = index; i < shard_flows_copy.size(); i += pool_sz) {
528+
shard_flows_copy[i]->SetSocketThread(ProactorBase::me());
529+
}
530+
});
531+
// now update shard_flows on proactor thread
532+
shard_flows_ = std::move(shard_flows_copy);
533+
}
534+
535+
// Initialize and start sub-replica for each flow.
536+
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
537+
auto start_time = absl::Now();
538+
539+
// Initialize MultiShardExecution.
540+
multi_shard_exe_.reset(new MultiShardExecution());
541+
542+
// Initialize shard flows. The update to the shard_flows_ should be done by this thread.
543+
// Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset()
544+
// below.
545+
InitializeShardFlows();
527546

528547
// Blocked on until all flows got full sync cut.
529548
BlockingCounter sync_block{unsigned(shard_flows_.size())};
@@ -1215,6 +1234,7 @@ auto Replica::GetSummary() const -> Summary {
12151234
// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
12161235
// it's unlikely to cause a real bug.
12171236
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
1237+
DCHECK(Proactor() == ProactorBase::me());
12181238
last_io_time = std::max(last_io_time, flow->LastIoTime());
12191239
}
12201240

@@ -1246,25 +1266,14 @@ auto Replica::GetSummary() const -> Summary {
12461266
return res;
12471267
};
12481268

1249-
if (Sock())
1250-
return Proactor()->AwaitBrief(f);
1251-
1252-
/**
1253-
* when this branch happens: there is a very short grace period
1254-
* where Sock() is not initialized, yet the server can
1255-
* receive ROLE/INFO commands. That period happens when launching
1256-
* an instance with '--replicaof' and then immediately
1257-
* sending a command.
1258-
*
1259-
* In that instance, we have to run f() on the current fiber.
1260-
*/
1261-
return f();
1269+
return Proactor()->AwaitBrief(f);
12621270
}
12631271

12641272
std::vector<uint64_t> Replica::GetReplicaOffset() const {
12651273
std::vector<uint64_t> flow_rec_count;
12661274
flow_rec_count.resize(shard_flows_.size());
12671275
for (const auto& flow : shard_flows_) {
1276+
DCHECK(flow.get());
12681277
uint32_t flow_id = flow->FlowId();
12691278
uint64_t rec_count = flow->JournalExecutedCount();
12701279
DCHECK_LT(flow_id, shard_flows_.size());

src/server/replica.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ class Replica : ProtocolClient {
154154
size_t GetRecCountExecutedPerShard(const std::vector<unsigned>& indexes) const;
155155

156156
private:
157+
void InitializeShardFlows();
158+
157159
util::fb2::ProactorBase* proactor_ = nullptr;
158160
Service& service_;
159161
MasterContext master_context_;

0 commit comments

Comments
 (0)