Skip to content

Commit b6ebcdf

Browse files
authored
fix(master): avoid rebalancing to disconnected CS (#763)
At the instant of looking for a server to replicate a part when rebalancing, the current implementation uses the sortedServers_ or labeledSortedServers_ variables. These containers may have stale chunkserver entries that have been recently switched to KILL mode or entries completely released. Those entries must not be used as a destination for the replicate parts, if the master does not crashes. These commit fixes those issues. The test test_kill_cs_while_writing_small_files should stop being flaky after the merge. The master crash was one of the causes for the falure of the test. Another possible outcome of those replication targetting disconnected CSs is that the replication read counter of the source CS (the remaining alive CS) remain increased indefinately, thus blocking future replications that needed to retrieve data from those servers. This very dangerous behavior was also causing failures in the previously mentioned test. Signed-off-by: Dave <dave@leil.io>
1 parent 882ea90 commit b6ebcdf

File tree

3 files changed

+44
-2
lines changed

3 files changed

+44
-2
lines changed

src/master/chunks.cc

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2306,6 +2306,7 @@ class ChunkWorker : public coroutine {
23062306
ChunkCopiesCalculator& calc, const IpCounter &ip_counter);
23072307
bool rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator& calc, bool only_todel, const IpCounter &ip_counter);
23082308
bool rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator &calc, const IpCounter &ip_counter);
2309+
void updateSortedServers();
23092310

23102311
loop_info inforec_;
23112312
uint32_t deleteNotDone_;
@@ -2364,14 +2365,18 @@ void ChunkWorker::doEveryLoopTasks() {
23642365
chunksinfo_loopend = eventloop_time();
23652366
}
23662367

2367-
void ChunkWorker::doEverySecondTasks() {
2368+
void ChunkWorker::updateSortedServers() {
23682369
sortedServers_ = matocsserv_getservers_sorted();
23692370
labeledSortedServers_.clear();
23702371
for (const ServerWithUsage& sw : sortedServers_) {
23712372
labeledSortedServers_[sw.label].push_back(sw);
23722373
}
23732374
}
23742375

2376+
void ChunkWorker::doEverySecondTasks() {
2377+
updateSortedServers();
2378+
}
2379+
23752380
static bool chunkPresentOnServer(Chunk *c, matocsserventry *server) {
23762381
auto server_csid = matocsserv_get_csdb(server)->csid;
23772382
return std::any_of(c->parts.begin(), c->parts.end(), [server_csid](const ChunkPart &part) {
@@ -2712,6 +2717,8 @@ bool ChunkWorker::rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator &calc, boo
27122717
: labeledSortedServers_[current_copy_label];
27132718

27142719
for (const auto &empty_server : sorted_servers) {
2720+
if (matocsserv_is_killed(empty_server.server)) { continue; }
2721+
27152722
if (!only_todel && gAvoidSameIpChunkservers) {
27162723
auto empty_server_ip = matocsserv_get_servip(empty_server.server);
27172724
auto it = ip_counter.find(empty_server_ip);
@@ -2784,6 +2791,8 @@ bool ChunkWorker::rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator
27842791
});
27852792

27862793
for (const auto &empty_server : sorted_by_ip_count) {
2794+
if (matocsserv_is_killed(empty_server.server)) { continue; }
2795+
27872796
auto empty_server_ip = matocsserv_get_servip(empty_server.server);
27882797
auto it = ip_counter.find(empty_server_ip);
27892798
auto empty_server_ip_count = it != ip_counter.end() ? it->second : 0;
@@ -2942,7 +2951,6 @@ void ChunkWorker::doChunkJobs(Chunk *c, uint16_t serverCount) {
29422951
if (rebalanceChunkParts(c, calc, false, ip_occurrence)) {
29432952
return;
29442953
}
2945-
29462954
}
29472955

29482956
bool ChunkWorker::deleteUnusedChunks() {
@@ -2989,6 +2997,13 @@ bool ChunkWorker::deleteUnusedChunks() {
29892997
void ChunkWorker::mainLoop() {
29902998
Chunk *c;
29912999

3000+
auto updateSortedServersIfNeeded = [&]() {
3001+
if (matocsserv_sorted_servers_need_refresh()) {
3002+
updateSortedServers();
3003+
matocsserv_sorted_servers_refresh_done();
3004+
}
3005+
};
3006+
29923007
reenter(this) {
29933008
stack_.work_limit.setMaxDuration(std::chrono::milliseconds(ChunksLoopTimeout));
29943009
stack_.work_limit.start();
@@ -3026,6 +3041,7 @@ void ChunkWorker::mainLoop() {
30263041
if (stack_.watchdog.expired()) {
30273042
yield;
30283043
stack_.watchdog.start();
3044+
updateSortedServersIfNeeded();
30293045
}
30303046
}
30313047
}
@@ -3052,6 +3068,7 @@ void ChunkWorker::mainLoop() {
30523068
// regenerate usable_server_count
30533069
matocsserv_usagedifference(nullptr, nullptr, &stack_.usable_server_count,
30543070
nullptr);
3071+
updateSortedServersIfNeeded();
30553072

30563073
stack_.node = gChunksMetadata->chunkhash[stack_.current_bucket];
30573074
while (stack_.node) {
@@ -3065,6 +3082,7 @@ void ChunkWorker::mainLoop() {
30653082
matocsserv_usagedifference(nullptr, nullptr,
30663083
&stack_.usable_server_count,
30673084
nullptr);
3085+
updateSortedServersIfNeeded();
30683086
}
30693087
}
30703088

src/master/matocsserv.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ enum class ChunkserverConnectionMode : std::uint8_t {
7676

7777
double gLoadFactorPenalty = 0.;
7878
bool gPrioritizeDataParts = true;
79+
bool gSortedServersNeedsRefresh = false;
7980

8081
// A safe threshold of 15 seconds to determine whether chunks registration is in progress and
8182
// prevent dumping metadata.
@@ -169,6 +170,14 @@ csdbentry *matocsserv_get_csdb(matocsserventry *eptr) {
169170
return eptr->csdb;
170171
}
171172

173+
bool matocsserv_sorted_servers_need_refresh() {
174+
return gSortedServersNeedsRefresh;
175+
}
176+
177+
void matocsserv_sorted_servers_refresh_done() {
178+
gSortedServersNeedsRefresh = false;
179+
}
180+
172181
/* replications DB */
173182

174183
// Replication hash parameters
@@ -1586,6 +1595,8 @@ void matocsserv_serve(const std::vector<pollfd> &pdesc) {
15861595
tcpclose(eptr->sock);
15871596

15881597
entriesIterator = matocsservList.erase(entriesIterator);
1598+
1599+
gSortedServersNeedsRefresh = true;
15891600
} else {
15901601
++entriesIterator;
15911602
}
@@ -1655,6 +1666,10 @@ void matocsserv_reload() {
16551666
lsock = newlsock;
16561667
}
16571668

1669+
bool matocsserv_is_killed(matocsserventry* eptr) {
1670+
return eptr != nullptr && eptr->mode == ChunkserverConnectionMode::KILL;
1671+
}
1672+
16581673
uint32_t matocsserv_get_version(matocsserventry *eptr) {
16591674
return eptr->version;
16601675
}

src/master/matocsserv.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,19 @@ double matocsserv_get_usage(matocsserventry* eptr);
8686
/*! \brief Get chunkservers ordered by disk usage. */
8787
std::vector<ServerWithUsage> matocsserv_getservers_sorted();
8888

89+
/*! \brief Check if chunkserver is killed. */
90+
bool matocsserv_is_killed(matocsserventry* eptr);
91+
8992
uint32_t matocsserv_get_version(matocsserventry* eptr);
9093
void matocsserv_usagedifference(double *minusage, double *maxusage, uint16_t *usablescount,
9194
uint16_t *totalscount);
9295

96+
/*! \brief Check if sorted servers need refresh. */
97+
bool matocsserv_sorted_servers_need_refresh();
98+
99+
/*! \brief Acknowledge that sorted servers have been refreshed. */
100+
void matocsserv_sorted_servers_refresh_done();
101+
93102
/*! \brief Get chunkservers for a new chunk.
94103
*
95104
* This function returns a list of chunkservers that can be used for a new chunk creation. The

0 commit comments

Comments
 (0)