Skip to content

Commit 59b3350

Browse files
committed
chore: lock free info command with replicaof v2
Signed-off-by: kostas <[email protected]>
1 parent 7abc6a9 commit 59b3350

File tree

7 files changed

+193
-51
lines changed

7 files changed

+193
-51
lines changed

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ void OutgoingMigration::Finish(const GenericError& error) {
166166
switch (state_) {
167167
case MigrationState::C_FATAL:
168168
case MigrationState::C_FINISHED:
169+
CloseSocket();
169170
return; // Already finished, nothing else to do
170171

171172
case MigrationState::C_CONNECTING:
@@ -192,6 +193,9 @@ void OutgoingMigration::Finish(const GenericError& error) {
192193
});
193194
exec_st_.JoinErrorHandler();
194195
}
196+
197+
// Close socket for clean disconnect.
198+
CloseSocket();
195199
}
196200

197201
MigrationState OutgoingMigration::GetState() const {

src/server/protocol_client.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,16 @@ ProtocolClient::ProtocolClient(string host, uint16_t port) {
109109
#ifdef DFLY_USE_SSL
110110
MaybeInitSslCtx();
111111
#endif
112+
// We initialize the proactor thread here such that it never races with Sock().
113+
// ProtocolClient is never migrated to a different thread, so this is safe.
114+
socket_thread_ = ProactorBase::me();
112115
}
116+
113117
ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::move(context)) {
114118
#ifdef DFLY_USE_SSL
115119
MaybeInitSslCtx();
116120
#endif
121+
socket_thread_ = ProactorBase::me();
117122
}
118123

119124
ProtocolClient::~ProtocolClient() {
@@ -162,6 +167,7 @@ error_code ProtocolClient::ResolveHostDns() {
162167
error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms,
163168
ExecutionState* cntx) {
164169
ProactorBase* mythread = ProactorBase::me();
170+
DCHECK(mythread == socket_thread_);
165171
CHECK(mythread);
166172
{
167173
unique_lock lk(sock_mu_);
@@ -235,6 +241,9 @@ void ProtocolClient::CloseSocket() {
235241
auto ec = sock_->Shutdown(SHUT_RDWR);
236242
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
237243
}
244+
auto ec = sock_->Close(); // Quietly close.
245+
246+
LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
238247
});
239248
}
240249
}
@@ -385,11 +394,11 @@ void ProtocolClient::ResetParser(RedisParser::Mode mode) {
385394
}
386395

387396
uint64_t ProtocolClient::LastIoTime() const {
388-
return last_io_time_;
397+
return last_io_time_.load(std::memory_order_relaxed);
389398
}
390399

391400
void ProtocolClient::TouchIoTime() {
392-
last_io_time_ = Proactor()->GetMonotonicTimeNs();
401+
last_io_time_.store(Proactor()->GetMonotonicTimeNs(), std::memory_order_relaxed);
393402
}
394403

395404
} // namespace dfly

src/server/protocol_client.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ class ProtocolClient {
4545
uint64_t LastIoTime() const;
4646
void TouchIoTime();
4747

48+
// Used to set the socket_thread_ prior the initialization of sock_.
49+
// That way Proactor() returns the right thread even when the sock_ is
50+
// not yet initialized
51+
void SetSocketThread(util::fb2::ProactorBase* sock_thread) {
52+
socket_thread_ = sock_thread;
53+
}
54+
4855
protected:
4956
struct ServerContext {
5057
std::string host;
@@ -107,7 +114,7 @@ class ProtocolClient {
107114
}
108115

109116
auto* Proactor() const {
110-
return sock_->proactor();
117+
return socket_thread_;
111118
}
112119

113120
util::FiberSocketBase* Sock() const {
@@ -132,7 +139,7 @@ class ProtocolClient {
132139
std::string last_cmd_;
133140
std::string last_resp_;
134141

135-
uint64_t last_io_time_ = 0; // in ns, monotonic clock.
142+
std::atomic<uint64_t> last_io_time_ = 0; // in ns, monotonic clock.
136143

137144
#ifdef DFLY_USE_SSL
138145

@@ -142,6 +149,7 @@ class ProtocolClient {
142149
#else
143150
void* ssl_ctx_{nullptr};
144151
#endif
152+
util::fb2::ProactorBase* socket_thread_;
145153
};
146154

147155
} // namespace dfly

src/server/replica.cc

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,18 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
170170
sync_fb_.JoinIfNeeded();
171171
DVLOG(1) << "MainReplicationFb stopped " << this;
172172
acks_fb_.JoinIfNeeded();
173-
for (auto& flow : shard_flows_) {
174-
flow.reset();
175-
}
173+
174+
proactor_->Await([this]() {
175+
// Destructor is blocking, so other fibers can observe partial state
176+
// of flows during clean up. To avoid this, we move them and clear the
177+
// member before the preemption point
178+
auto shard_flows = std::move(shard_flows_);
179+
shard_flows_.clear();
180+
for (auto& flow : shard_flows) {
181+
flow.reset();
182+
}
183+
shard_flows_.clear();
184+
});
176185

177186
if (last_journal_LSNs_.has_value()) {
178187
return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()};
@@ -501,29 +510,45 @@ error_code Replica::InitiatePSync() {
501510
return error_code{};
502511
}
503512

504-
// Initialize and start sub-replica for each flow.
505-
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
506-
auto start_time = absl::Now();
507-
508-
// Initialize MultiShardExecution.
509-
multi_shard_exe_.reset(new MultiShardExecution());
513+
void Replica::InitializeShardFlows() {
514+
decltype(shard_flows_) shard_flows_copy;
515+
shard_flows_copy.resize(master_context_.num_flows);
516+
DCHECK(!shard_flows_copy.empty());
517+
thread_flow_map_ = Partition(shard_flows_copy.size());
518+
const size_t pool_sz = shard_set->pool()->size();
510519

511-
// Initialize shard flows.
512-
shard_flows_.resize(master_context_.num_flows);
513-
DCHECK(!shard_flows_.empty());
514-
for (unsigned i = 0; i < shard_flows_.size(); ++i) {
515-
// Transfer LSN state for partial sync
520+
for (size_t i = 0; i < shard_flows_copy.size(); ++i) {
516521
uint64_t partial_sync_lsn = 0;
517-
if (shard_flows_[i]) {
522+
if (!shard_flows_.empty() && shard_flows_[i]) {
518523
partial_sync_lsn = shard_flows_[i]->JournalExecutedCount();
519524
}
520-
shard_flows_[i].reset(
525+
shard_flows_copy[i].reset(
521526
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
522527
if (partial_sync_lsn > 0) {
523-
shard_flows_[i]->SetRecordsExecuted(partial_sync_lsn);
528+
shard_flows_copy[i]->SetRecordsExecuted(partial_sync_lsn);
524529
}
525530
}
526-
thread_flow_map_ = Partition(shard_flows_.size());
531+
532+
shard_set->pool()->AwaitFiberOnAll([pool_sz, this, &shard_flows_copy](auto index, auto* ctx) {
533+
for (unsigned i = index; i < shard_flows_copy.size(); i += pool_sz) {
534+
shard_flows_copy[i]->SetSocketThread(ProactorBase::me());
535+
}
536+
});
537+
// now update shard_flows on proactor thread
538+
shard_flows_ = std::move(shard_flows_copy);
539+
}
540+
541+
// Initialize and start sub-replica for each flow.
542+
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
543+
auto start_time = absl::Now();
544+
545+
// Initialize MultiShardExecution.
546+
multi_shard_exe_.reset(new MultiShardExecution());
547+
548+
// Initialize shard flows. The update to the shard_flows_ should be done by this thread.
549+
// Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset()
550+
// below.
551+
InitializeShardFlows();
527552

528553
// Blocked on until all flows got full sync cut.
529554
BlockingCounter sync_block{unsigned(shard_flows_.size())};
@@ -1210,11 +1235,12 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d
12101235

12111236
auto Replica::GetSummary() const -> Summary {
12121237
auto f = [this]() {
1238+
DCHECK(this);
12131239
auto last_io_time = LastIoTime();
12141240

1215-
// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
1216-
// it's unlikely to cause a real bug.
1217-
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
1241+
for (const auto& flow : shard_flows_) {
1242+
DCHECK(Proactor() == ProactorBase::me());
1243+
DCHECK(flow);
12181244
last_io_time = std::max(last_io_time, flow->LastIoTime());
12191245
}
12201246

@@ -1246,25 +1272,14 @@ auto Replica::GetSummary() const -> Summary {
12461272
return res;
12471273
};
12481274

1249-
if (Sock())
1250-
return Proactor()->AwaitBrief(f);
1251-
1252-
/**
1253-
* when this branch happens: there is a very short grace period
1254-
* where Sock() is not initialized, yet the server can
1255-
* receive ROLE/INFO commands. That period happens when launching
1256-
* an instance with '--replicaof' and then immediately
1257-
* sending a command.
1258-
*
1259-
* In that instance, we have to run f() on the current fiber.
1260-
*/
1261-
return f();
1275+
return Proactor()->AwaitBrief(f);
12621276
}
12631277

12641278
std::vector<uint64_t> Replica::GetReplicaOffset() const {
12651279
std::vector<uint64_t> flow_rec_count;
12661280
flow_rec_count.resize(shard_flows_.size());
12671281
for (const auto& flow : shard_flows_) {
1282+
DCHECK(flow.get());
12681283
uint32_t flow_id = flow->FlowId();
12691284
uint64_t rec_count = flow->JournalExecutedCount();
12701285
DCHECK_LT(flow_id, shard_flows_.size());

src/server/replica.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ class Replica : ProtocolClient {
154154
size_t GetRecCountExecutedPerShard(const std::vector<unsigned>& indexes) const;
155155

156156
private:
157+
void InitializeShardFlows();
158+
157159
util::fb2::ProactorBase* proactor_ = nullptr;
158160
Service& service_;
159161
MasterContext master_context_;

0 commit comments

Comments
 (0)