Skip to content

Commit b932b26

Browse files
committed
one to rule them all
1 parent 6ed0f0c commit b932b26

File tree

2 files changed

+35
-29
lines changed

2 files changed

+35
-29
lines changed

src/server/replica.cc

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -505,14 +505,23 @@ error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_mast
505505
// Initialize MultiShardExecution.
506506
multi_shard_exe_.reset(new MultiShardExecution());
507507

508-
// Initialize shard flows.
509-
shard_flows_.resize(master_context_.num_flows);
510-
DCHECK(!shard_flows_.empty());
511-
for (unsigned i = 0; i < shard_flows_.size(); ++i) {
512-
shard_flows_[i].reset(
513-
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
514-
}
515-
thread_flow_map_ = Partition(shard_flows_.size());
508+
// Initialize shard flows. The update to the shard_flows_ should be done by this thread.
509+
// Otherwise, there is a race condition between GetSummary() and the shard_flows_[i].reset()
510+
// below.
511+
decltype(shard_flows_) shard_flows_copy;
512+
shard_flows_copy.resize(master_context_.num_flows);
513+
DCHECK(!shard_flows_copy.empty());
514+
thread_flow_map_ = Partition(shard_flows_copy.size());
515+
const size_t pool_sz = shard_set->pool()->size();
516+
517+
shard_set->pool()->AwaitFiberOnAll([pool_sz, this, &shard_flows_copy](auto index, auto* ctx) {
518+
for (unsigned i = index; i < shard_flows_copy.size(); i += pool_sz) {
519+
shard_flows_copy[i].reset(
520+
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
521+
}
522+
});
523+
// now update shard_flows on proactor thread
524+
shard_flows_ = std::move(shard_flows_copy);
516525

517526
// Blocked on until all flows got full sync cut.
518527
BlockingCounter sync_block{unsigned(shard_flows_.size())};
@@ -1171,6 +1180,7 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d
11711180

11721181
auto Replica::GetSummary() const -> Summary {
11731182
auto f = [this]() {
1183+
DCHECK(Proactor() == ProactorBase::me());
11741184
auto last_io_time = LastIoTime();
11751185

11761186
// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here

src/server/server_family.cc

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1970,11 +1970,11 @@ vector<facade::Listener*> ServerFamily::GetNonPriviligedListeners() const {
19701970
}
19711971

19721972
optional<Replica::Summary> ServerFamily::GetReplicaSummary() const {
1973-
util::fb2::LockGuard lk(replicaof_mu_);
1974-
if (replica_ == nullptr) {
1973+
if (tl_replica == nullptr) {
19751974
return nullopt;
19761975
} else {
1977-
return replica_->GetSummary();
1976+
auto replica = tl_replica;
1977+
return replica->GetSummary();
19781978
}
19791979
}
19801980

@@ -3095,15 +3095,12 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
30953095

30963096
auto add_repl_info = [&] {
30973097
bool is_master = true;
3098-
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
3099-
// ensuring eventual consistency of is_master. When determining if the server is a replica and
3100-
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
3101-
// insufficient in this scenario.
3102-
// Please note that we we do not use Metrics object here.
3103-
{
3104-
fb2::LockGuard lk(replicaof_mu_);
3105-
is_master = !replica_;
3106-
}
3098+
3099+
// Deep copy because tl_replica might be overwritten inbetween
3100+
auto replica = tl_replica;
3101+
auto cluster_replicas = tl_cluster_replicas;
3102+
is_master = !replica;
3103+
31073104
if (is_master) {
31083105
vector<ReplicaRoleInfo> replicas_info = dfly_cmd_->GetReplicasRoleInfo();
31093106
append("role", "master");
@@ -3137,13 +3134,9 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
31373134
append("psync_attempts", rinfo.psync_attempts);
31383135
append("psync_successes", rinfo.psync_successes);
31393136
};
3140-
// Deep copy because tl_replica might be overwritten inbetween
3141-
auto replica = tl_replica;
31423137

31433138
replication_info_cb(replica->GetSummary());
31443139

3145-
// Deep copy because tl_cluster_replicas might be overwritten inbetween
3146-
auto cluster_replicas = tl_cluster_replicas;
31473140
// Special case, when multiple masters replicate to a single replica.
31483141
for (const auto& replica : cluster_replicas) {
31493142
replication_info_cb(replica->GetSummary());
@@ -3721,12 +3714,11 @@ void ServerFamily::ReplConf(CmdArgList args, const CommandContext& cmd_cntx) {
37213714

37223715
void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) {
37233716
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
3724-
util::fb2::LockGuard lk(replicaof_mu_);
37253717
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
37263718
// ensuring eventual consistency of is_master. When determining if the server is a replica and
37273719
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
37283720
// insufficient in this scenario.
3729-
if (!replica_) {
3721+
if (!tl_replica) {
37303722
rb->StartArray(2);
37313723
rb->SendBulkString("master");
37323724
auto vec = dfly_cmd_->GetReplicasRoleInfo();
@@ -3739,6 +3731,10 @@ void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) {
37393731
}
37403732

37413733
} else {
3734+
// Deep copy because tl_replica might be overwritten inbetween
3735+
auto replica = tl_replica;
3736+
auto cluster_replicas = tl_cluster_replicas;
3737+
37423738
rb->StartArray(4 + cluster_replicas_.size() * 3);
37433739
rb->SendBulkString(GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica");
37443740

@@ -3756,9 +3752,9 @@ void ServerFamily::Role(CmdArgList args, const CommandContext& cmd_cntx) {
37563752
rb->SendBulkString("connecting");
37573753
}
37583754
};
3759-
send_replica_info(replica_->GetSummary());
3760-
for (const auto& replica : cluster_replicas_) {
3761-
send_replica_info(replica->GetSummary());
3755+
send_replica_info(replica->GetSummary());
3756+
for (const auto& repl : cluster_replicas) {
3757+
send_replica_info(repl->GetSummary());
37623758
}
37633759
}
37643760
}

0 commit comments

Comments
 (0)