Skip to content

Commit b1d4b20

Browse files
committed
refactor(FQDN): feather refactor on meta server side
1 parent 022854b commit b1d4b20

15 files changed

+336
-206
lines changed

src/client/partition_resolver_simple.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,14 +413,18 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
413413
host_port partition_resolver_simple::get_host_port(const partition_configuration &pc) const
414414
{
415415
if (_app_is_stateful) {
416-
return pc.hp_primary;
416+
host_port primary;
417+
GET_HOST_PORT(pc, primary, primary);
418+
return primary;
417419
}
418420

419-
if (pc.hp_last_drops.empty()) {
421+
std::vector<host_port> last_drops;
422+
GET_HOST_PORTS(pc, last_drops, last_drops);
423+
if (last_drops.empty()) {
420424
return host_port();
421425
}
422426

423-
return pc.hp_last_drops[rand::next_u32(0, pc.last_drops.size() - 1)];
427+
return last_drops[rand::next_u32(0, last_drops.size() - 1)];
424428
}
425429

426430
error_code partition_resolver_simple::get_host_port(int partition_index, /*out*/ host_port &hp)

src/client/replication_ddl_client.cpp

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,19 @@ dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_na
168168
int ready_count = 0;
169169
for (int i = 0; i < partition_count; i++) {
170170
const auto &pc = query_resp.partitions[i];
171-
if (pc.hp_primary && (pc.hp_secondaries.size() + 1 >= max_replica_count)) {
172-
ready_count++;
171+
host_port primary;
172+
GET_HOST_PORT(pc, primary, primary);
173+
if (!primary) {
174+
continue;
175+
}
176+
177+
std::vector<host_port> secondaries;
178+
GET_HOST_PORTS(pc, secondaries, secondaries);
179+
if (secondaries.size() + 1 < max_replica_count) {
180+
continue;
173181
}
182+
183+
ready_count++;
174184
}
175185
if (ready_count == partition_count) {
176186
std::cout << app_name << " is ready now: (" << ready_count << "/" << partition_count
@@ -429,11 +439,16 @@ error_s replication_ddl_client::list_apps(bool detailed,
429439
int read_unhealthy = 0;
430440
for (const auto &pc : pcs) {
431441
int replica_count = 0;
432-
if (pc.hp_primary) {
442+
host_port primary;
443+
GET_HOST_PORT(pc, primary, primary);
444+
if (primary) {
433445
replica_count++;
434446
}
435-
replica_count += pc.hp_secondaries.size();
436-
if (pc.hp_primary) {
447+
448+
std::vector<host_port> secondaries;
449+
GET_HOST_PORTS(pc, secondaries, secondaries);
450+
replica_count += static_cast<int>(secondaries.size());
451+
if (primary) {
437452
if (replica_count >= pc.max_replica_count) {
438453
fully_healthy++;
439454
} else if (replica_count < 2) {

src/common/replication_common.cpp

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,16 @@ int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
180180
rc.learner_signature = invalid_signature;
181181
SET_OBJ_IP_AND_HOST_PORT(rc, primary, pc, primary);
182182

183-
if (node == pc.hp_primary) {
183+
host_port primary;
184+
GET_HOST_PORT(pc, primary, primary);
185+
if (node == primary) {
184186
rc.status = partition_status::PS_PRIMARY;
185187
return true;
186188
}
187189

188-
if (utils::contains(pc.hp_secondaries, node)) {
190+
std::vector<host_port> secondaries;
191+
GET_HOST_PORTS(pc, secondaries, secondaries);
192+
if (utils::contains(secondaries, node)) {
189193
rc.status = partition_status::PS_SECONDARY;
190194
return true;
191195
}
@@ -383,15 +387,19 @@ void add_app_info(const std::string &app_name,
383387
int read_unhealthy = 0;
384388
for (const auto &pc : pcs) {
385389
int replica_count = 0;
386-
if (pc.hp_primary) {
390+
host_port primary;
391+
GET_HOST_PORT(pc, primary, primary);
392+
if (primary) {
387393
++replica_count;
388-
++node_stats[pc.hp_primary].primary_count;
394+
++node_stats[primary].primary_count;
389395
++total_prim_count;
390396
}
391-
replica_count += static_cast<int>(pc.hp_secondaries.size());
392-
total_sec_count += static_cast<int>(pc.hp_secondaries.size());
397+
std::vector<host_port> secondaries;
398+
GET_HOST_PORTS(pc, secondaries, secondaries);
399+
replica_count += static_cast<int>(secondaries.size());
400+
total_sec_count += static_cast<int>(secondaries.size());
393401

394-
if (pc.hp_primary) {
402+
if (primary) {
395403
if (replica_count >= pc.max_replica_count) {
396404
++fully_healthy;
397405
} else if (replica_count < 2) {
@@ -405,10 +413,10 @@ void add_app_info(const std::string &app_name,
405413
partitions_printer.add_row(pc.pid.get_partition_index());
406414
partitions_printer.append_data(pc.ballot);
407415
partitions_printer.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count));
408-
partitions_printer.append_data(pc.hp_primary ? pc.hp_primary.to_string() : "-");
409-
partitions_printer.append_data(fmt::format("[{}]", fmt::join(pc.hp_secondaries, ",")));
416+
partitions_printer.append_data(primary ? primary.to_string() : "-");
417+
partitions_printer.append_data(fmt::format("[{}]", fmt::join(secondaries, ",")));
410418

411-
for (const auto &secondary : pc.hp_secondaries) {
419+
for (const auto &secondary : secondaries) {
412420
++node_stats[secondary].secondary_count;
413421
}
414422
}

src/common/replication_other_types.h

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "consensus_types.h"
3737
#include "replica_admin_types.h"
3838
#include "common/replication_enums.h"
39+
#include "rpc/dns_resolver.h"
3940
#include "rpc/rpc_address.h"
4041
#include "rpc/rpc_host_port.h"
4142

@@ -78,18 +79,33 @@ inline bool is_member(const partition_configuration &pc, const rpc_address &node
7879
inline bool is_partition_config_equal(const partition_configuration &pc1,
7980
const partition_configuration &pc2)
8081
{
81-
// secondaries no need to be same order
82-
for (const auto &pc1_secondary : pc1.hp_secondaries) {
82+
if (pc1.ballot != pc2.ballot || pc1.pid != pc2.pid ||
83+
pc1.max_replica_count != pc2.max_replica_count ||
84+
pc1.last_committed_decree != pc2.last_committed_decree) {
85+
return false;
86+
}
87+
88+
host_port pc1_primary;
89+
GET_HOST_PORT(pc1, primary, pc1_primary);
90+
host_port pc2_primary;
91+
GET_HOST_PORT(pc2, primary, pc2_primary);
92+
if (pc1_primary != pc2_primary) {
93+
return false;
94+
}
95+
96+
// secondaries no need to be in the same order.
97+
std::vector<host_port> pc1_secondaries;
98+
GET_HOST_PORTS(pc1, secondaries, pc1_secondaries);
99+
for (const auto &pc1_secondary : pc1_secondaries) {
83100
if (!is_secondary(pc2, pc1_secondary)) {
84101
return false;
85102
}
86103
}
104+
105+
std::vector<host_port> pc2_secondaries;
106+
GET_HOST_PORTS(pc2, secondaries, pc2_secondaries);
87107
// last_drops is not considered into equality check
88-
return pc1.ballot == pc2.ballot && pc1.pid == pc2.pid &&
89-
pc1.max_replica_count == pc2.max_replica_count && pc1.primary == pc2.primary &&
90-
pc1.hp_primary == pc2.hp_primary && pc1.secondaries.size() == pc2.secondaries.size() &&
91-
pc1.hp_secondaries.size() == pc2.hp_secondaries.size() &&
92-
pc1.last_committed_decree == pc2.last_committed_decree;
108+
return pc1_secondaries.size() == pc2_secondaries.size();
93109
}
94110

95111
class replica_helper

src/meta/cluster_balance_policy.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,12 +229,17 @@ bool cluster_balance_policy::get_app_migration_info(std::shared_ptr<app_state> a
229229
info.partitions.reserve(app->pcs.size());
230230
for (const auto &pc : app->pcs) {
231231
std::map<host_port, partition_status::type> pstatus_map;
232-
pstatus_map[pc.hp_primary] = partition_status::PS_PRIMARY;
233-
if (pc.hp_secondaries.size() != pc.max_replica_count - 1) {
232+
host_port primary;
233+
GET_HOST_PORT(pc, primary, primary);
234+
pstatus_map[primary] = partition_status::PS_PRIMARY;
235+
236+
std::vector<host_port> secondaries;
237+
GET_HOST_PORTS(pc, secondaries, secondaries);
238+
if (secondaries.size() != pc.max_replica_count - 1) {
234239
// partition is unhealthy
235240
return false;
236241
}
237-
for (const auto &secondary : pc.hp_secondaries) {
242+
for (const auto &secondary : secondaries) {
238243
pstatus_map[secondary] = partition_status::PS_SECONDARY;
239244
}
240245
info.partitions.push_back(std::move(pstatus_map));

src/meta/load_balance_policy.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "meta_admin_types.h"
3636
#include "rpc/dns_resolver.h" // IWYU pragma: keep
3737
#include "rpc/rpc_address.h"
38+
#include "rpc/rpc_host_port.h"
3839
#include "utils/command_manager.h"
3940
#include "utils/fail_point.h"
4041
#include "utils/flags.h"
@@ -172,14 +173,16 @@ generate_balancer_request(const app_mapper &apps,
172173
new_proposal_action(to, to, config_type::CT_UPGRADE_TO_PRIMARY));
173174
result.action_list.emplace_back(new_proposal_action(to, from, config_type::CT_REMOVE));
174175
break;
175-
case balance_type::COPY_SECONDARY:
176+
case balance_type::COPY_SECONDARY: {
176177
ans = "copy_secondary";
177178
result.balance_type = balancer_request_type::copy_secondary;
179+
host_port primary;
180+
GET_HOST_PORT(pc, primary, primary);
178181
result.action_list.emplace_back(
179-
new_proposal_action(pc.hp_primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
180-
result.action_list.emplace_back(
181-
new_proposal_action(pc.hp_primary, from, config_type::CT_REMOVE));
182+
new_proposal_action(primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
183+
result.action_list.emplace_back(new_proposal_action(primary, from, config_type::CT_REMOVE));
182184
break;
185+
}
183186
default:
184187
CHECK(false, "");
185188
}
@@ -566,7 +569,9 @@ void ford_fulkerson::update_decree(int node_id, const node_state &ns)
566569
{
567570
ns.for_each_primary(_app->app_id, [&, this](const gpid &pid) {
568571
const auto &pc = _app->pcs[pid.get_partition_index()];
569-
for (const auto &secondary : pc.hp_secondaries) {
572+
std::vector<host_port> secondaries;
573+
GET_HOST_PORTS(pc, secondaries, secondaries);
574+
for (const auto &secondary : secondaries) {
570575
auto i = _host_port_id.find(secondary);
571576
CHECK(i != _host_port_id.end(), "invalid secondary: {}", secondary);
572577
_network[node_id][i->second]++;

src/meta/meta_data.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
* THE SOFTWARE.
2525
*/
2626

27-
#include <boost/lexical_cast.hpp>
2827
#include <algorithm>
28+
#include <boost/lexical_cast.hpp>
2929
#include <cstdint>
3030
#include <ostream>
3131

@@ -134,13 +134,16 @@ bool construct_replica(meta_view view, const gpid &pid, int max_replica_count)
134134
// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
135135
// only primary dead
136136
// when add node to pc.last_drops, we don't remove it from our cc.drop_list
137-
CHECK(pc.hp_last_drops.empty(), "last_drops of partition({}) must be empty", pid);
137+
std::vector<host_port> last_drops;
138+
GET_HOST_PORTS(pc, last_drops, last_drops);
139+
CHECK(last_drops.empty(), "last_drops of partition({}) must be empty", pid);
138140
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
139-
if (pc.hp_last_drops.size() + 1 >= max_replica_count) {
141+
// hp_last_drops is added in the steps bellow.
142+
if (last_drops.size() + 1 >= max_replica_count) {
140143
break;
141144
}
142145
// similar to cc.drop_list, pc.last_drop is also a stack structure
143-
HEAD_INSERT_IP_AND_HOST_PORT_BY_DNS(pc, last_drops, iter->node);
146+
HEAD_INSERT_IP_AND_HOST_PORT_BY_DNS(pc, last_drops, iter->node, last_drops);
144147
LOG_INFO("construct for ({}), select {} into last_drops, ballot({}), "
145148
"committed_decree({}), prepare_decree({})",
146149
pid,
@@ -538,6 +541,11 @@ app_state::app_state(const app_info &info) : app_info(info), helpers(new app_sta
538541
CLEAR_IP_AND_HOST_PORT(pc, secondaries);
539542
CLEAR_IP_AND_HOST_PORT(pc, last_drops);
540543

544+
// TODO(yujingwei): use marco simplify the code, and the logical may should
545+
// change
546+
pc.__set_hp_secondaries({});
547+
pc.__set_hp_last_drops({});
548+
541549
pcs.assign(app_info::partition_count, pc);
542550
for (int i = 0; i != app_info::partition_count; ++i) {
543551
pcs[i].pid.set_partition_index(i);

src/meta/meta_data.h

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626

2727
#pragma once
2828

29-
#include <stdint.h>
3029
#include <algorithm>
3130
#include <atomic>
31+
#include <cstdint>
3232
#include <functional>
3333
#include <map>
3434
#include <memory>
@@ -265,12 +265,31 @@ class config_context
265265
struct partition_configuration_stateless
266266
{
267267
partition_configuration &pc;
268-
partition_configuration_stateless(partition_configuration &_pc) : pc(_pc) {}
269-
std::vector<dsn::host_port> &workers() { return pc.hp_last_drops; }
270-
std::vector<dsn::host_port> &hosts() { return pc.hp_secondaries; }
271-
bool is_host(const host_port &node) const { return utils::contains(pc.hp_secondaries, node); }
272-
bool is_worker(const host_port &node) const { return utils::contains(pc.hp_last_drops, node); }
273-
bool is_member(const host_port &node) const { return is_host(node) || is_worker(node); }
268+
explicit partition_configuration_stateless(partition_configuration &_pc) : pc(_pc) {}
269+
std::vector<dsn::host_port> &workers()
270+
{
271+
DCHECK(pc.__isset.hp_last_drops, "");
272+
return pc.hp_last_drops;
273+
}
274+
std::vector<dsn::host_port> &hosts()
275+
{
276+
DCHECK(pc.__isset.hp_secondaries, "");
277+
return pc.hp_secondaries;
278+
}
279+
[[nodiscard]] bool is_host(const host_port &node) const
280+
{
281+
DCHECK(pc.__isset.hp_secondaries, "");
282+
return utils::contains(pc.hp_secondaries, node);
283+
}
284+
[[nodiscard]] bool is_worker(const host_port &node) const
285+
{
286+
DCHECK(pc.__isset.hp_last_drops, "");
287+
return utils::contains(pc.hp_last_drops, node);
288+
}
289+
[[nodiscard]] bool is_member(const host_port &node) const
290+
{
291+
return is_host(node) || is_worker(node);
292+
}
274293
};
275294

276295
struct restore_state
@@ -294,30 +313,25 @@ struct restore_state
294313
// in `status`.
295314
struct split_state
296315
{
297-
int32_t splitting_count;
316+
int32_t splitting_count{0};
298317
// partition_index -> split_status
299318
std::map<int32_t, split_status::type> status;
300-
split_state() : splitting_count(0) {}
319+
split_state() = default;
301320
};
302321

303322
class app_state;
304323

305324
class app_state_helper
306325
{
307326
public:
308-
app_state *owner;
309-
std::atomic_int partitions_in_progress;
327+
app_state *owner{nullptr};
328+
std::atomic_int partitions_in_progress{0};
310329
std::vector<config_context> contexts;
311-
dsn::message_ex *pending_response;
330+
dsn::message_ex *pending_response{nullptr};
312331
std::vector<restore_state> restore_states;
313332
split_state split_states;
314333

315-
public:
316-
app_state_helper() : owner(nullptr), partitions_in_progress(0)
317-
{
318-
contexts.clear();
319-
pending_response = nullptr;
320-
}
334+
app_state_helper() = default;
321335
void on_init_partitions();
322336
void clear_proposals()
323337
{

src/meta/meta_http_service.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#include <fmt/core.h>
19-
#include <rapidjson/ostreamwrapper.h>
2018
#include <algorithm>
2119
#include <cstddef>
20+
#include <fmt/core.h>
2221
#include <map>
2322
#include <memory>
23+
#include <rapidjson/ostreamwrapper.h>
2424
#include <set>
2525
#include <sstream>
2626
#include <string>
@@ -251,11 +251,15 @@ void meta_http_service::list_app_handler(const http_request &req, http_response
251251
int read_unhealthy = 0;
252252
for (const auto &pc : response.partitions) {
253253
int replica_count = 0;
254-
if (pc.hp_primary) {
254+
host_port primary;
255+
GET_HOST_PORT(pc, primary, primary);
256+
if (primary) {
255257
replica_count++;
256258
}
257-
replica_count += pc.hp_secondaries.size();
258-
if (pc.hp_primary) {
259+
std::vector<host_port> secondaries;
260+
GET_HOST_PORTS(pc, secondaries, secondaries);
261+
replica_count += static_cast<int>(secondaries.size());
262+
if (primary) {
259263
if (replica_count >= pc.max_replica_count) {
260264
fully_healthy++;
261265
} else if (replica_count < 2) {

0 commit comments

Comments
 (0)