Skip to content

Commit 8d6a184

Browse files
authored
feat: add node health status into config (#4758)
1 parent 93bd52c commit 8d6a184

File tree

4 files changed

+114
-51
lines changed

4 files changed

+114
-51
lines changed

src/server/cluster/cluster_config.cc

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "cluster_config.h"
66

77
#include <absl/container/flat_hash_set.h>
8+
#include <absl/strings/match.h>
89

910
#include <optional>
1011
#include <string_view>
@@ -161,13 +162,13 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
161162
return SlotRanges(ranges);
162163
}
163164

164-
optional<ClusterNodeInfo> ParseClusterNode(const JsonType& json) {
165+
optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
165166
if (!json.is_object()) {
166167
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
167168
return nullopt;
168169
}
169170

170-
ClusterNodeInfo node;
171+
ClusterExtendedNodeInfo node;
171172

172173
{
173174
auto id = json.at_or_null("id");
@@ -195,6 +196,26 @@ optional<ClusterNodeInfo> ParseClusterNode(const JsonType& json) {
195196
node.port = port.value();
196197
}
197198

199+
{
200+
auto health = json.at_or_null("health");
201+
if (!health.is_null()) {
202+
if (!health.is_string()) {
203+
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node " << json;
204+
} else {
205+
auto health_str = std::move(health).as_string();
206+
if (absl::EqualsIgnoreCase(health_str, "FAIL")) {
207+
node.health = NodeHealth::FAIL;
208+
} else if (absl::EqualsIgnoreCase(health_str, "LOADING")) {
209+
node.health = NodeHealth::LOADING;
210+
} else if (absl::EqualsIgnoreCase(health_str, "ONLINE")) {
211+
node.health = NodeHealth::ONLINE;
212+
} else {
213+
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node: " << health_str;
214+
}
215+
}
216+
}
217+
}
218+
198219
return node;
199220
}
200221

src/server/cluster/cluster_config_test.cc

Lines changed: 72 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -95,41 +95,47 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
9595
}
9696

9797
TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
98-
EXPECT_EQ(ClusterConfig::CreateFromConfig(
99-
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 16000}}),
100-
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
101-
.replicas = {},
102-
.migrations = {}}}),
103-
nullptr);
98+
EXPECT_EQ(
99+
ClusterConfig::CreateFromConfig(
100+
kMyId,
101+
{{.slot_ranges = SlotRanges({{.start = 0, .end = 16000}}),
102+
.master = {{.id = "other", .ip = "192.168.0.100", .port = 7000}, NodeHealth::ONLINE},
103+
.replicas = {},
104+
.migrations = {}}}),
105+
nullptr);
104106
}
105107

106108
TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) {
107109
EXPECT_EQ(ClusterConfig::CreateFromConfig(
108-
kMyId,
109-
ClusterShardInfos({{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
110-
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
111-
.replicas = {},
112-
.migrations = {}},
113-
{.slot_ranges = SlotRanges({{.start = 0, .end = 0}}),
114-
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
115-
.replicas = {},
116-
.migrations = {}}})),
110+
kMyId, ClusterShardInfos(
111+
{{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
112+
.master = {{.id = "other", .ip = "192.168.0.100", .port = 7000},
113+
NodeHealth::ONLINE},
114+
.replicas = {},
115+
.migrations = {}},
116+
{.slot_ranges = SlotRanges({{.start = 0, .end = 0}}),
117+
.master = {{.id = "other2", .ip = "192.168.0.101", .port = 7001},
118+
NodeHealth::ONLINE},
119+
.replicas = {},
120+
.migrations = {}}})),
117121
nullptr);
118122
}
119123

120124
TEST_F(ClusterConfigTest, ConfigSetInvalidSlotId) {
121-
EXPECT_EQ(ClusterConfig::CreateFromConfig(
122-
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF + 1}}),
123-
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
124-
.replicas = {},
125-
.migrations = {}}}),
126-
nullptr);
125+
EXPECT_EQ(
126+
ClusterConfig::CreateFromConfig(
127+
kMyId,
128+
{{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF + 1}}),
129+
.master = {{.id = "other", .ip = "192.168.0.100", .port = 7000}, NodeHealth::ONLINE},
130+
.replicas = {},
131+
.migrations = {}}}),
132+
nullptr);
127133
}
128134

129135
TEST_F(ClusterConfigTest, ConfigSetOk) {
130136
auto config = ClusterConfig::CreateFromConfig(
131137
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
132-
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
138+
.master = {{.id = "other", .ip = "192.168.0.100", .port = 7000}, NodeHealth::ONLINE},
133139
.replicas = {},
134140
.migrations = {}}});
135141
EXPECT_NE(config, nullptr);
@@ -140,30 +146,38 @@ TEST_F(ClusterConfigTest, ConfigSetOk) {
140146

141147
TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) {
142148
auto config = ClusterConfig::CreateFromConfig(
143-
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
144-
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
145-
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
146-
.migrations = {}}});
149+
kMyId,
150+
{{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
151+
.master = {{.id = "other-master", .ip = "192.168.0.100", .port = 7000}, NodeHealth::ONLINE},
152+
.replicas = {{{.id = "other-replica", .ip = "192.168.0.101", .port = 7001},
153+
NodeHealth::ONLINE}},
154+
.migrations = {}}});
147155
EXPECT_NE(config, nullptr);
148156
EXPECT_THAT(config->GetMasterNodeForSlot(0),
149157
NodeMatches(Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000}));
150158
}
151159

152160
TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
153161
auto config = ClusterConfig::CreateFromConfig(
154-
kMyId, ClusterShardInfos(
155-
{{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}),
156-
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
157-
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
158-
.migrations = {}},
159-
{.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}),
160-
.master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002},
161-
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}},
162-
.migrations = {}},
163-
{.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}),
164-
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
165-
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}},
166-
.migrations = {}}}));
162+
kMyId,
163+
ClusterShardInfos(
164+
{{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}),
165+
.master = {{.id = "other-master", .ip = "192.168.0.100", .port = 7000},
166+
NodeHealth::ONLINE},
167+
.replicas = {{{.id = "other-replica", .ip = "192.168.0.101", .port = 7001},
168+
NodeHealth::ONLINE}},
169+
.migrations = {}},
170+
{.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}),
171+
.master = {{.id = kMyId, .ip = "192.168.0.102", .port = 7002}, NodeHealth::ONLINE},
172+
.replicas = {{{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003},
173+
NodeHealth::ONLINE}},
174+
.migrations = {}},
175+
{.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}),
176+
.master = {{.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
177+
NodeHealth::ONLINE},
178+
.replicas = {{{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005},
179+
NodeHealth::ONLINE}},
180+
.migrations = {}}}));
167181
EXPECT_NE(config, nullptr);
168182
SlotSet owned_slots = config->GetOwnedSlots();
169183
EXPECT_EQ(owned_slots.ToSlotRanges().Size(), 1);
@@ -705,4 +719,23 @@ TEST_F(ClusterConfigTest, ConfigComparison) {
705719
EXPECT_EQ(config5->GetConfig(), config5->GetConfig());
706720
}
707721

722+
TEST_F(ClusterConfigTest, NodesHealth) {
723+
auto config1 = ClusterConfig::CreateFromConfig("id0", R"json(
724+
[
725+
{
726+
"slot_ranges": [ { "start": 0, "end": 16383 } ],
727+
"master": { "id": "id0", "ip": "localhost", "port": 3000, "health" : "online" },
728+
"replicas": [{ "id": "id1", "ip": "localhost", "port": 3001, "health" : "loading" },
729+
{ "id": "id2", "ip": "localhost", "port": 3002, "health" : "fail" }],
730+
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
731+
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }]
732+
}
733+
734+
])json");
735+
736+
EXPECT_EQ(config1->GetConfig().begin()->master.health, NodeHealth::ONLINE);
737+
EXPECT_EQ(config1->GetConfig().begin()->replicas.front().health, NodeHealth::LOADING);
738+
EXPECT_EQ(config1->GetConfig().begin()->replicas.back().health, NodeHealth::FAIL);
739+
}
740+
708741
} // namespace dfly::cluster

src/server/cluster/cluster_defs.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ struct ClusterNodeInfo {
9494
}
9595
};
9696

97+
enum class NodeHealth : std::uint8_t { NONE, FAIL, LOADING, ONLINE };
98+
99+
struct ClusterExtendedNodeInfo : ClusterNodeInfo {
100+
NodeHealth health = NodeHealth::NONE;
101+
};
102+
97103
struct MigrationInfo {
98104
SlotRanges slot_ranges;
99105
ClusterNodeInfo node_info;
@@ -111,8 +117,8 @@ struct MigrationInfo {
111117

112118
struct ClusterShardInfo {
113119
SlotRanges slot_ranges;
114-
ClusterNodeInfo master;
115-
std::vector<ClusterNodeInfo> replicas;
120+
ClusterExtendedNodeInfo master;
121+
std::vector<ClusterExtendedNodeInfo> replicas;
116122
std::vector<MigrationInfo> migrations;
117123

118124
bool operator==(const ClusterShardInfo& r) const;

src/server/cluster/cluster_family.cc

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,21 +149,24 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
149149
? static_cast<uint16_t>(absl::GetFlag(FLAGS_port))
150150
: cluster_announce_port;
151151

152-
info.master = {.id = id_, .ip = preferred_endpoint, .port = preferred_port};
152+
info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port}, NodeHealth::NONE};
153153

154154
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
155155
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
156-
info.replicas.push_back({.id = replica.id,
157-
.ip = replica.address,
158-
.port = static_cast<uint16_t>(replica.listening_port)});
156+
info.replicas.push_back({{.id = replica.id,
157+
.ip = replica.address,
158+
.port = static_cast<uint16_t>(replica.listening_port)},
159+
NodeHealth::NONE});
159160
}
160161
}
161162
} else {
162163
// TODO: We currently don't save the master's ID in the replica
163-
info.master = {.id = "", .ip = replication_info->host, .port = replication_info->port};
164-
info.replicas.push_back({.id = id_,
165-
.ip = cntx->conn()->LocalBindAddress(),
166-
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))});
164+
info.master = {{.id = "", .ip = replication_info->host, .port = replication_info->port},
165+
NodeHealth::ONLINE};
166+
info.replicas.push_back({{.id = id_,
167+
.ip = cntx->conn()->LocalBindAddress(),
168+
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))},
169+
NodeHealth::NONE});
167170
}
168171

169172
return info;

0 commit comments

Comments
 (0)