From 79c796d365c95f805175d274470ff9276db711d1 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 24 Feb 2026 16:42:13 +0100 Subject: [PATCH] fix(master): avoid rebalancing to disconnected CS At the instant of looking for a server to replicate a part when rebalancing, the current implementation uses the sortedServers_ or labeledSortedServers_ variables. These containers may have stale chunkserver entries that have been recently switched to KILL mode or entries completely released. Those entries must not be used as a destination for the replicate parts, if the master does not crashes. These commit fixes those issues. The test test_kill_cs_while_writing_small_files should stop being flaky after the merge. The master crash was one of the causes for the falure of the test. Another possible outcome of those replication targetting disconnected CSs is that the replication read counter of the source CS (the remaining alive CS) remain increased indefinately, thus blocking future replications that needed to retrieve data from those servers. This very dangerous behavior was also causing failures in the previously mentioned test. Signed-off-by: Dave --- src/master/chunks.cc | 22 ++++++++++++++++++++-- src/master/matocsserv.cc | 15 +++++++++++++++ src/master/matocsserv.h | 9 +++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/master/chunks.cc b/src/master/chunks.cc index 4002d0f82..d86e5a8f5 100644 --- a/src/master/chunks.cc +++ b/src/master/chunks.cc @@ -2306,6 +2306,7 @@ class ChunkWorker : public coroutine { ChunkCopiesCalculator& calc, const IpCounter &ip_counter); bool rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator& calc, bool only_todel, const IpCounter &ip_counter); bool rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator &calc, const IpCounter &ip_counter); + void updateSortedServers(); loop_info inforec_; uint32_t deleteNotDone_; @@ -2364,7 +2365,7 @@ void ChunkWorker::doEveryLoopTasks() { chunksinfo_loopend = eventloop_time(); } -void ChunkWorker::doEverySecondTasks() { +void ChunkWorker::updateSortedServers() { sortedServers_ = matocsserv_getservers_sorted(); labeledSortedServers_.clear(); for (const ServerWithUsage& sw : sortedServers_) { @@ -2372,6 +2373,10 @@ void ChunkWorker::doEverySecondTasks() { } } +void ChunkWorker::doEverySecondTasks() { + updateSortedServers(); +} + static bool chunkPresentOnServer(Chunk *c, matocsserventry *server) { auto server_csid = matocsserv_get_csdb(server)->csid; return std::any_of(c->parts.begin(), c->parts.end(), [server_csid](const ChunkPart &part) { @@ -2712,6 +2717,8 @@ bool ChunkWorker::rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator &calc, boo : labeledSortedServers_[current_copy_label]; for (const auto &empty_server : sorted_servers) { + if (matocsserv_is_killed(empty_server.server)) { continue; } + if (!only_todel && gAvoidSameIpChunkservers) { auto empty_server_ip = matocsserv_get_servip(empty_server.server); auto it = ip_counter.find(empty_server_ip); @@ -2784,6 +2791,8 @@ bool ChunkWorker::rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator }); for (const auto &empty_server : sorted_by_ip_count) { + if (matocsserv_is_killed(empty_server.server)) { continue; } + auto empty_server_ip = matocsserv_get_servip(empty_server.server); auto it = ip_counter.find(empty_server_ip); auto empty_server_ip_count = it != ip_counter.end() ? it->second : 0; @@ -2942,7 +2951,6 @@ void ChunkWorker::doChunkJobs(Chunk *c, uint16_t serverCount) { if (rebalanceChunkParts(c, calc, false, ip_occurrence)) { return; } - } bool ChunkWorker::deleteUnusedChunks() { @@ -2989,6 +2997,13 @@ bool ChunkWorker::deleteUnusedChunks() { void ChunkWorker::mainLoop() { Chunk *c; + auto updateSortedServersIfNeeded = [&]() { + if (matocsserv_sorted_servers_need_refresh()) { + updateSortedServers(); + matocsserv_sorted_servers_refresh_done(); + } + }; + reenter(this) { stack_.work_limit.setMaxDuration(std::chrono::milliseconds(ChunksLoopTimeout)); stack_.work_limit.start(); @@ -3026,6 +3041,7 @@ void ChunkWorker::mainLoop() { if (stack_.watchdog.expired()) { yield; stack_.watchdog.start(); + updateSortedServersIfNeeded(); } } } @@ -3052,6 +3068,7 @@ void ChunkWorker::mainLoop() { // regenerate usable_server_count matocsserv_usagedifference(nullptr, nullptr, &stack_.usable_server_count, nullptr); + updateSortedServersIfNeeded(); stack_.node = gChunksMetadata->chunkhash[stack_.current_bucket]; while (stack_.node) { @@ -3065,6 +3082,7 @@ void ChunkWorker::mainLoop() { matocsserv_usagedifference(nullptr, nullptr, &stack_.usable_server_count, nullptr); + updateSortedServersIfNeeded(); } } diff --git a/src/master/matocsserv.cc b/src/master/matocsserv.cc index f1999b95b..0c632646e 100644 --- a/src/master/matocsserv.cc +++ b/src/master/matocsserv.cc @@ -76,6 +76,7 @@ enum class ChunkserverConnectionMode : std::uint8_t { double gLoadFactorPenalty = 0.; bool gPrioritizeDataParts = true; +bool gSortedServersNeedsRefresh = false; // A safe threshold of 15 seconds to determine whether chunks registration is in progress and // prevent dumping metadata. @@ -169,6 +170,14 @@ csdbentry *matocsserv_get_csdb(matocsserventry *eptr) { return eptr->csdb; } +bool matocsserv_sorted_servers_need_refresh() { + return gSortedServersNeedsRefresh; +} + +void matocsserv_sorted_servers_refresh_done() { + gSortedServersNeedsRefresh = false; +} + /* replications DB */ // Replication hash parameters @@ -1586,6 +1595,8 @@ void matocsserv_serve(const std::vector &pdesc) { tcpclose(eptr->sock); entriesIterator = matocsservList.erase(entriesIterator); + + gSortedServersNeedsRefresh = true; } else { ++entriesIterator; } @@ -1655,6 +1666,10 @@ void matocsserv_reload() { lsock = newlsock; } +bool matocsserv_is_killed(matocsserventry* eptr) { + return eptr != nullptr && eptr->mode == ChunkserverConnectionMode::KILL; +} + uint32_t matocsserv_get_version(matocsserventry *eptr) { return eptr->version; } diff --git a/src/master/matocsserv.h b/src/master/matocsserv.h index a5f5df5cd..da64d1565 100644 --- a/src/master/matocsserv.h +++ b/src/master/matocsserv.h @@ -86,10 +86,19 @@ double matocsserv_get_usage(matocsserventry* eptr); /*! \brief Get chunkservers ordered by disk usage. */ std::vector matocsserv_getservers_sorted(); +/*! \brief Check if chunkserver is killed. */ +bool matocsserv_is_killed(matocsserventry* eptr); + uint32_t matocsserv_get_version(matocsserventry* eptr); void matocsserv_usagedifference(double *minusage, double *maxusage, uint16_t *usablescount, uint16_t *totalscount); +/*! \brief Check if sorted servers need refresh. */ +bool matocsserv_sorted_servers_need_refresh(); + +/*! \brief Acknowledge that sorted servers have been refreshed. */ +void matocsserv_sorted_servers_refresh_done(); + /*! \brief Get chunkservers for a new chunk. * * This function returns a list of chunkservers that can be used for a new chunk creation. The