Skip to content

Commit 45cbcf1

Browse files
committed
chore: lock free info command with replicaof v2
Signed-off-by: kostas <[email protected]>
1 parent 7abc6a9 commit 45cbcf1

File tree

7 files changed

+193
-59
lines changed

7 files changed

+193
-59
lines changed

src/server/cluster/outgoing_slot_migration.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ void OutgoingMigration::Finish(const GenericError& error) {
166166
switch (state_) {
167167
case MigrationState::C_FATAL:
168168
case MigrationState::C_FINISHED:
169+
CloseSocket();
169170
return; // Already finished, nothing else to do
170171

171172
case MigrationState::C_CONNECTING:
@@ -192,6 +193,9 @@ void OutgoingMigration::Finish(const GenericError& error) {
192193
});
193194
exec_st_.JoinErrorHandler();
194195
}
196+
197+
// Close socket for clean disconnect.
198+
CloseSocket();
195199
}
196200

197201
MigrationState OutgoingMigration::GetState() const {

src/server/protocol_client.cc

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,23 +109,21 @@ ProtocolClient::ProtocolClient(string host, uint16_t port) {
109109
#ifdef DFLY_USE_SSL
110110
MaybeInitSslCtx();
111111
#endif
112+
// We initialize the proactor thread here such that it never races with Sock().
113+
// ProtocolClient is never migrated to a different thread, so this is safe.
114+
socket_thread_ = ProactorBase::me();
112115
}
116+
113117
ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::move(context)) {
114118
#ifdef DFLY_USE_SSL
115119
MaybeInitSslCtx();
116120
#endif
121+
socket_thread_ = ProactorBase::me();
117122
}
118123

119124
ProtocolClient::~ProtocolClient() {
120125
exec_st_.JoinErrorHandler();
121126

122-
// FIXME: We should close the socket explictly outside of the destructor. This currently
123-
// breaks test_cancel_replication_immediately.
124-
if (sock_) {
125-
std::error_code ec;
126-
sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); });
127-
LOG_IF(ERROR, ec) << "Error closing socket " << ec;
128-
}
129127
#ifdef DFLY_USE_SSL
130128
if (ssl_ctx_) {
131129
SSL_CTX_free(ssl_ctx_);
@@ -162,6 +160,7 @@ error_code ProtocolClient::ResolveHostDns() {
162160
error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms,
163161
ExecutionState* cntx) {
164162
ProactorBase* mythread = ProactorBase::me();
163+
DCHECK(mythread == socket_thread_);
165164
CHECK(mythread);
166165
{
167166
unique_lock lk(sock_mu_);
@@ -235,6 +234,9 @@ void ProtocolClient::CloseSocket() {
235234
auto ec = sock_->Shutdown(SHUT_RDWR);
236235
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
237236
}
237+
auto ec = sock_->Close(); // Quietly close.
238+
239+
LOG_IF(WARNING, ec) << "Error closing socket " << ec << "/" << ec.message();
238240
});
239241
}
240242
}
@@ -385,11 +387,11 @@ void ProtocolClient::ResetParser(RedisParser::Mode mode) {
385387
}
386388

387389
uint64_t ProtocolClient::LastIoTime() const {
388-
return last_io_time_;
390+
return last_io_time_.load(std::memory_order_relaxed);
389391
}
390392

391393
void ProtocolClient::TouchIoTime() {
392-
last_io_time_ = Proactor()->GetMonotonicTimeNs();
394+
last_io_time_.store(Proactor()->GetMonotonicTimeNs(), std::memory_order_relaxed);
393395
}
394396

395397
} // namespace dfly

src/server/protocol_client.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ class ProtocolClient {
4545
uint64_t LastIoTime() const;
4646
void TouchIoTime();
4747

48+
// Used to set the socket_thread_ prior the initialization of sock_.
49+
// That way Proactor() returns the right thread even when the sock_ is
50+
// not yet initialized
51+
void SetSocketThread(util::fb2::ProactorBase* sock_thread) {
52+
socket_thread_ = sock_thread;
53+
}
54+
4855
protected:
4956
struct ServerContext {
5057
std::string host;
@@ -107,7 +114,7 @@ class ProtocolClient {
107114
}
108115

109116
auto* Proactor() const {
110-
return sock_->proactor();
117+
return socket_thread_;
111118
}
112119

113120
util::FiberSocketBase* Sock() const {
@@ -132,7 +139,7 @@ class ProtocolClient {
132139
std::string last_cmd_;
133140
std::string last_resp_;
134141

135-
uint64_t last_io_time_ = 0; // in ns, monotonic clock.
142+
std::atomic<uint64_t> last_io_time_ = 0; // in ns, monotonic clock.
136143

137144
#ifdef DFLY_USE_SSL
138145

@@ -142,6 +149,7 @@ class ProtocolClient {
142149
#else
143150
void* ssl_ctx_{nullptr};
144151
#endif
152+
util::fb2::ProactorBase* socket_thread_;
145153
};
146154

147155
} // namespace dfly

src/server/replica.cc

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,17 @@ std::optional<Replica::LastMasterSyncData> Replica::Stop() {
170170
sync_fb_.JoinIfNeeded();
171171
DVLOG(1) << "MainReplicationFb stopped " << this;
172172
acks_fb_.JoinIfNeeded();
173-
for (auto& flow : shard_flows_) {
174-
flow.reset();
175-
}
173+
174+
proactor_->Await([this]() {
175+
// Destructor is blocking, so other fibers can observe partial state
176+
// of flows during clean up. To avoid this, we move them and clear the
177+
// member before the preemption point
178+
auto shard_flows = std::move(shard_flows_);
179+
shard_flows_.clear();
180+
for (auto& flow : shard_flows) {
181+
flow.reset();
182+
}
183+
});
176184

177185
if (last_journal_LSNs_.has_value()) {
178186
return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()};
@@ -501,29 +509,45 @@ error_code Replica::InitiatePSync() {
501509
return error_code{};
502510
}
503511

504-
// Initialize and start sub-replica for each flow.
505-
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
506-
auto start_time = absl::Now();
507-
508-
// Initialize MultiShardExecution.
509-
multi_shard_exe_.reset(new MultiShardExecution());
512+
void Replica::InitializeShardFlows() {
513+
decltype(shard_flows_) shard_flows_copy;
514+
shard_flows_copy.resize(master_context_.num_flows);
515+
DCHECK(!shard_flows_copy.empty());
516+
thread_flow_map_ = Partition(shard_flows_copy.size());
517+
const size_t pool_sz = shard_set->pool()->size();
510518

511-
// Initialize shard flows.
512-
shard_flows_.resize(master_context_.num_flows);
513-
DCHECK(!shard_flows_.empty());
514-
for (unsigned i = 0; i < shard_flows_.size(); ++i) {
515-
// Transfer LSN state for partial sync
519+
for (size_t i = 0; i < shard_flows_copy.size(); ++i) {
516520
uint64_t partial_sync_lsn = 0;
517-
if (shard_flows_[i]) {
521+
if (!shard_flows_.empty() && shard_flows_[i]) {
518522
partial_sync_lsn = shard_flows_[i]->JournalExecutedCount();
519523
}
520-
shard_flows_[i].reset(
524+
shard_flows_copy[i].reset(
521525
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
522526
if (partial_sync_lsn > 0) {
523-
shard_flows_[i]->SetRecordsExecuted(partial_sync_lsn);
527+
shard_flows_copy[i]->SetRecordsExecuted(partial_sync_lsn);
524528
}
525529
}
526-
thread_flow_map_ = Partition(shard_flows_.size());
530+
531+
shard_set->pool()->AwaitFiberOnAll([pool_sz, this, &shard_flows_copy](auto index, auto* ctx) {
532+
for (unsigned i = index; i < shard_flows_copy.size(); i += pool_sz) {
533+
shard_flows_copy[i]->SetSocketThread(ProactorBase::me());
534+
}
535+
});
536+
// now update shard_flows on proactor thread
537+
shard_flows_ = std::move(shard_flows_copy);
538+
}
539+
540+
// Initialize and start sub-replica for each flow.
541+
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
542+
auto start_time = absl::Now();
543+
544+
// Initialize MultiShardExecution.
545+
multi_shard_exe_.reset(new MultiShardExecution());
546+
547+
// Initialize shard flows. The update to the shard_flows_ should be done by this thread.
548+
// Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset()
549+
// below.
550+
InitializeShardFlows();
527551

528552
// Blocked on until all flows got full sync cut.
529553
BlockingCounter sync_block{unsigned(shard_flows_.size())};
@@ -1210,11 +1234,12 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d
12101234

12111235
auto Replica::GetSummary() const -> Summary {
12121236
auto f = [this]() {
1237+
DCHECK(this);
12131238
auto last_io_time = LastIoTime();
12141239

1215-
// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
1216-
// it's unlikely to cause a real bug.
1217-
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
1240+
for (const auto& flow : shard_flows_) {
1241+
DCHECK(Proactor() == ProactorBase::me());
1242+
DCHECK(flow);
12181243
last_io_time = std::max(last_io_time, flow->LastIoTime());
12191244
}
12201245

@@ -1246,25 +1271,14 @@ auto Replica::GetSummary() const -> Summary {
12461271
return res;
12471272
};
12481273

1249-
if (Sock())
1250-
return Proactor()->AwaitBrief(f);
1251-
1252-
/**
1253-
* when this branch happens: there is a very short grace period
1254-
* where Sock() is not initialized, yet the server can
1255-
* receive ROLE/INFO commands. That period happens when launching
1256-
* an instance with '--replicaof' and then immediately
1257-
* sending a command.
1258-
*
1259-
* In that instance, we have to run f() on the current fiber.
1260-
*/
1261-
return f();
1274+
return Proactor()->AwaitBrief(f);
12621275
}
12631276

12641277
std::vector<uint64_t> Replica::GetReplicaOffset() const {
12651278
std::vector<uint64_t> flow_rec_count;
12661279
flow_rec_count.resize(shard_flows_.size());
12671280
for (const auto& flow : shard_flows_) {
1281+
DCHECK(flow.get());
12681282
uint32_t flow_id = flow->FlowId();
12691283
uint64_t rec_count = flow->JournalExecutedCount();
12701284
DCHECK_LT(flow_id, shard_flows_.size());

src/server/replica.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ class Replica : ProtocolClient {
154154
size_t GetRecCountExecutedPerShard(const std::vector<unsigned>& indexes) const;
155155

156156
private:
157+
void InitializeShardFlows();
158+
157159
util::fb2::ProactorBase* proactor_ = nullptr;
158160
Service& service_;
159161
MasterContext master_context_;

0 commit comments

Comments
 (0)