Skip to content

Commit 2023a64

Browse files
authored
chore: code redundancies in replication metrics (#5926)
Signed-off-by: kostas <[email protected]>
1 parent 54ea59c commit 2023a64

File tree

3 files changed

+52
-51
lines changed

3 files changed

+52
-51
lines changed

src/server/cluster/cluster_family.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
116116
.replicas = {},
117117
.migrations = {}};
118118

119-
optional<Replica::Summary> replication_info = server_family_->GetReplicaSummary();
119+
optional<Metrics::ReplicaInfo> repl_info = server_family_->GetReplicaSummary();
120120
ServerState& etl = *ServerState::tlocal();
121-
if (!replication_info.has_value()) {
121+
if (!repl_info) {
122122
DCHECK(etl.is_master);
123123
std::string cluster_announce_ip = absl::GetFlag(FLAGS_cluster_announce_ip);
124124
std::string preferred_endpoint =
@@ -141,7 +141,7 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
141141
}
142142
} else {
143143
// TODO: We currently don't save the master's ID in the replica
144-
info.master = {{.id = "", .ip = replication_info->host, .port = replication_info->port},
144+
info.master = {{.id = "", .ip = repl_info->summary.host, .port = repl_info->summary.port},
145145
NodeHealth::ONLINE};
146146
info.replicas.push_back({{.id = id_,
147147
.ip = cntx->conn()->LocalBindAddress(),

src/server/server_family.cc

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,15 @@ GenericError RewriteConfigFile() {
888888
return {};
889889
}
890890

891+
bool IsMaster() {
892+
// We call this function on startup where tlocal() == nullptr. We handle
893+
// this case below.
894+
if (!ServerState::tlocal()) {
895+
return true;
896+
}
897+
return ServerState::tlocal()->is_master;
898+
}
899+
891900
} // namespace
892901

893902
void SlowLogGet(dfly::CmdArgList args, std::string_view sub_cmd, util::ProactorPool* pp,
@@ -1303,7 +1312,7 @@ std::optional<fb2::Future<GenericError>> ServerFamily::Load(const std::string& p
13031312
return future;
13041313
};
13051314

1306-
if (ServerState::tlocal() && !ServerState::tlocal()->is_master) {
1315+
if (!IsMaster()) {
13071316
return immediate(string("Replica cannot load data"));
13081317
}
13091318

@@ -1533,9 +1542,7 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
15331542
AppendMetricHeader("version", "", MetricType::GAUGE, &resp->body());
15341543
AppendMetricValue("version", 1, {"version"}, {GetVersion()}, &resp->body());
15351544

1536-
bool is_master = ServerState::tlocal()->is_master;
1537-
1538-
AppendMetricWithoutLabels("master", "1 if master 0 if replica", is_master ? 1 : 0,
1545+
AppendMetricWithoutLabels("master", "1 if master 0 if replica", IsMaster() ? 1 : 0,
15391546
MetricType::GAUGE, &resp->body());
15401547
AppendMetricWithoutLabels("uptime_in_seconds", "", uptime, MetricType::COUNTER, &resp->body());
15411548

@@ -1828,10 +1835,10 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
18281835
absl::StrAppend(&resp->body(), command_metrics);
18291836
}
18301837

1831-
if (m.replica_side_info) { // slave side
1832-
auto& replica_info = *m.replica_side_info;
1838+
if (m.replica_side_info) { // replica side
1839+
const auto reconnect_count = m.replica_side_info->summary.reconnect_count;
18331840
AppendMetricWithoutLabels("replica_reconnect_count", "Number of replica reconnects",
1834-
replica_info.reconnect_count, MetricType::COUNTER, &resp->body());
1841+
reconnect_count, MetricType::COUNTER, &resp->body());
18351842
} else { // Master side
18361843
string replication_lag_metrics;
18371844
vector<ReplicaRoleInfo> replicas_info = dfly_cmd->GetReplicasRoleInfo();
@@ -1961,8 +1968,8 @@ void ServerFamily::ConfigureMetrics(util::HttpListenerBase* http_base) {
19611968
StringResponse resp = util::http::MakeStringResponse(boost::beast::http::status::ok);
19621969
util::http::SetMime(util::http::kTextMime, &resp);
19631970
uint64_t uptime = time(NULL) - start_time_;
1964-
PrintPrometheusMetrics(uptime, this->GetMetrics(&namespaces->GetDefaultNamespace()),
1965-
this->dfly_cmd_.get(), &resp, legacy_format_metrics_);
1971+
PrintPrometheusMetrics(uptime, GetMetrics(&namespaces->GetDefaultNamespace()), dfly_cmd_.get(),
1972+
&resp, legacy_format_metrics_);
19661973
return send->Invoke(std::move(resp));
19671974
};
19681975

@@ -1973,7 +1980,7 @@ void ServerFamily::PauseReplication(bool pause) {
19731980
util::fb2::LockGuard lk(replicaof_mu_);
19741981

19751982
// Switch to primary mode.
1976-
if (!ServerState::tlocal()->is_master) {
1983+
if (!IsMaster()) {
19771984
auto repl_ptr = replica_;
19781985
CHECK(repl_ptr);
19791986
repl_ptr->Pause(pause);
@@ -1984,7 +1991,7 @@ std::optional<ReplicaOffsetInfo> ServerFamily::GetReplicaOffsetInfo() {
19841991
util::fb2::LockGuard lk(replicaof_mu_);
19851992

19861993
// Switch to primary mode.
1987-
if (!ServerState::tlocal()->is_master) {
1994+
if (!IsMaster()) {
19881995
auto repl_ptr = replica_;
19891996
CHECK(repl_ptr);
19901997
return ReplicaOffsetInfo{repl_ptr->GetSyncId(), repl_ptr->GetReplicaOffset()};
@@ -2003,13 +2010,19 @@ vector<facade::Listener*> ServerFamily::GetNonPriviligedListeners() const {
20032010
return listeners;
20042011
}
20052012

2006-
optional<Replica::Summary> ServerFamily::GetReplicaSummary() const {
2013+
optional<Metrics::ReplicaInfo> ServerFamily::GetReplicaSummary() const {
20072014
util::fb2::LockGuard lk(replicaof_mu_);
20082015
if (replica_ == nullptr) {
20092016
return nullopt;
2010-
} else {
2011-
return replica_->GetSummary();
20122017
}
2018+
2019+
Metrics::ReplicaInfo info;
2020+
info.summary = replica_->GetSummary();
2021+
for (const auto& cl_repl : cluster_replicas_) {
2022+
info.cl_repl_summary.push_back(cl_repl->GetSummary());
2023+
}
2024+
2025+
return info;
20132026
}
20142027

20152028
void ServerFamily::OnClose(ConnectionContext* cntx) {
@@ -2766,15 +2779,8 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
27662779
result.traverse_ttl_per_sec /= 6;
27672780
result.delete_ttl_per_sec /= 6;
27682781

2769-
bool is_master = ServerState::tlocal() && ServerState::tlocal()->is_master;
2770-
2771-
if (!is_master) {
2772-
auto info = GetReplicaSummary();
2773-
if (info) {
2774-
result.replica_side_info = {
2775-
.reconnect_count = info->reconnect_count,
2776-
};
2777-
}
2782+
if (!IsMaster()) {
2783+
result.replica_side_info = GetReplicaSummary();
27782784
}
27792785

27802786
{
@@ -2950,7 +2956,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
29502956
append("maxmemory_policy", "noeviction");
29512957
}
29522958

2953-
if (!m.replica_side_info) { // master
2959+
// master
2960+
if (!m.replica_side_info) {
29542961
ReplicationMemoryStats repl_mem;
29552962
dfly_cmd_->GetReplicationMemoryStats(&repl_mem);
29562963
append("replication_streaming_buffer_bytes", repl_mem.streamer_buf_capacity_bytes);
@@ -3126,17 +3133,7 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
31263133
};
31273134

31283135
auto add_repl_info = [&] {
3129-
bool is_master = true;
3130-
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
3131-
// ensuring eventual consistency of is_master. When determining if the server is a replica and
3132-
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
3133-
// insufficient in this scenario.
3134-
// Please note that we we do not use Metrics object here.
3135-
{
3136-
fb2::LockGuard lk(replicaof_mu_);
3137-
is_master = !replica_;
3138-
}
3139-
if (is_master) {
3136+
if (!m.replica_side_info) {
31403137
vector<ReplicaRoleInfo> replicas_info = dfly_cmd_->GetReplicasRoleInfo();
31413138
append("role", "master");
31423139
append("connected_slaves", replicas_info.size());
@@ -3169,13 +3166,13 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
31693166
append("psync_attempts", rinfo.psync_attempts);
31703167
append("psync_successes", rinfo.psync_successes);
31713168
};
3172-
fb2::LockGuard lk(replicaof_mu_);
31733169

3174-
replication_info_cb(replica_->GetSummary());
3170+
const auto& info = *m.replica_side_info;
31753171

3172+
replication_info_cb(info.summary);
31763173
// Special case, when multiple masters replicate to a single replica.
3177-
for (const auto& replica : cluster_replicas_) {
3178-
replication_info_cb(replica->GetSummary());
3174+
for (const auto& summary : info.cl_repl_summary) {
3175+
replication_info_cb(summary);
31793176
}
31803177
}
31813178
};
@@ -3343,6 +3340,8 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) {
33433340
// Save time by not calculating metrics if we don't need them.
33443341
if (!(section == "SERVER" || section == "REPLICATION")) {
33453342
metrics = GetMetrics(cmd_cntx.conn_cntx->ns);
3343+
} else if (!IsMaster()) {
3344+
metrics.replica_side_info = GetReplicaSummary();
33463345
}
33473346

33483347
string info = FormatInfoMetrics(metrics, section, cmd_cntx.conn_cntx->conn()->IsPrivileged());
@@ -3435,7 +3434,7 @@ void ServerFamily::Hello(CmdArgList args, const CommandContext& cmd_cntx) {
34353434
rb->SendBulkString("mode");
34363435
rb->SendBulkString(GetRedisMode());
34373436
rb->SendBulkString("role");
3438-
rb->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
3437+
rb->SendBulkString(IsMaster() ? "master" : "slave");
34393438

34403439
// Add availability_zone to the response if flag is explicitly set and not empty
34413440
if (!az.empty()) {
@@ -3446,7 +3445,7 @@ void ServerFamily::Hello(CmdArgList args, const CommandContext& cmd_cntx) {
34463445

34473446
void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx) {
34483447
util::fb2::LockGuard lk(replicaof_mu_);
3449-
if (ServerState::tlocal()->is_master) {
3448+
if (IsMaster()) {
34503449
cmd_cntx.rb->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica");
34513450
return;
34523451
}
@@ -3617,9 +3616,8 @@ void ServerFamily::Replicate(string_view host, string_view port) {
36173616

36183617
void ServerFamily::ReplicaOfNoOne(SinkReplyBuilder* builder) {
36193618
util::fb2::LockGuard lk(replicaof_mu_);
3620-
ServerState* ss = ServerState::tlocal();
36213619

3622-
if (!ss->is_master) {
3620+
if (!IsMaster()) {
36233621
CHECK(replica_);
36243622

36253623
// flip flag before clearing replica_
@@ -3658,7 +3656,7 @@ void ServerFamily::ReplicaOfInternalV2(CmdArgList args, Transaction* tx, SinkRep
36583656
// well defined semantics.
36593657
ServerState* ss = ServerState::tlocal();
36603658

3661-
if (ss->is_master && ss->gstate() == GlobalState::LOADING) {
3659+
if (IsMaster() && ss->gstate() == GlobalState::LOADING) {
36623660
builder->SendError(kLoadingErr);
36633661
return;
36643662
}
@@ -3733,7 +3731,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
37333731
}
37343732

37353733
// We return OK, to support idempotency semantics.
3736-
if (ServerState::tlocal()->is_master)
3734+
if (IsMaster())
37373735
return builder->SendOk();
37383736

37393737
util::fb2::LockGuard lk(replicaof_mu_);
@@ -3775,7 +3773,7 @@ void ServerFamily::ReplConf(CmdArgList args, const CommandContext& cmd_cntx) {
37753773
auto* builder = cmd_cntx.rb;
37763774
{
37773775
util::fb2::LockGuard lk(replicaof_mu_);
3778-
if (!ServerState::tlocal()->is_master) {
3776+
if (!IsMaster()) {
37793777
return builder->SendError("Replicating a replica is unsupported");
37803778
}
37813779
}

src/server/server_family.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,10 @@ struct Metrics {
128128
absl::flat_hash_map<std::string, uint64_t> connections_lib_name_ver_map;
129129

130130
struct ReplicaInfo {
131-
uint32_t reconnect_count;
131+
Replica::Summary summary;
132+
133+
// cluster
134+
std::vector<Replica::Summary> cl_repl_summary;
132135
};
133136

134137
// Replica reconnect stats on the replica side. Undefined for master
@@ -294,7 +297,7 @@ class ServerFamily {
294297

295298
// Replica-side method. Returns replication summary if this server is a replica,
296299
// nullopt otherwise.
297-
std::optional<Replica::Summary> GetReplicaSummary() const;
300+
std::optional<Metrics::ReplicaInfo> GetReplicaSummary() const;
298301

299302
// Master-side acces method to replication info of that connection.
300303
std::shared_ptr<DflyCmd::ReplicaInfo> GetReplicaInfoFromConnection(ConnectionState* state) const {

0 commit comments

Comments
 (0)