diff --git a/src/master/chunks.cc b/src/master/chunks.cc index 4002d0f82..d86e5a8f5 100644 --- a/src/master/chunks.cc +++ b/src/master/chunks.cc @@ -2306,6 +2306,7 @@ class ChunkWorker : public coroutine { ChunkCopiesCalculator& calc, const IpCounter &ip_counter); bool rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator& calc, bool only_todel, const IpCounter &ip_counter); bool rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator &calc, const IpCounter &ip_counter); + void updateSortedServers(); loop_info inforec_; uint32_t deleteNotDone_; @@ -2364,7 +2365,7 @@ void ChunkWorker::doEveryLoopTasks() { chunksinfo_loopend = eventloop_time(); } -void ChunkWorker::doEverySecondTasks() { +void ChunkWorker::updateSortedServers() { sortedServers_ = matocsserv_getservers_sorted(); labeledSortedServers_.clear(); for (const ServerWithUsage& sw : sortedServers_) { @@ -2372,6 +2373,10 @@ void ChunkWorker::doEverySecondTasks() { } } +void ChunkWorker::doEverySecondTasks() { + updateSortedServers(); +} + static bool chunkPresentOnServer(Chunk *c, matocsserventry *server) { auto server_csid = matocsserv_get_csdb(server)->csid; return std::any_of(c->parts.begin(), c->parts.end(), [server_csid](const ChunkPart &part) { @@ -2712,6 +2717,8 @@ bool ChunkWorker::rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator &calc, boo : labeledSortedServers_[current_copy_label]; for (const auto &empty_server : sorted_servers) { + if (matocsserv_is_killed(empty_server.server)) { continue; } + if (!only_todel && gAvoidSameIpChunkservers) { auto empty_server_ip = matocsserv_get_servip(empty_server.server); auto it = ip_counter.find(empty_server_ip); @@ -2784,6 +2791,8 @@ bool ChunkWorker::rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator }); for (const auto &empty_server : sorted_by_ip_count) { + if (matocsserv_is_killed(empty_server.server)) { continue; } + auto empty_server_ip = matocsserv_get_servip(empty_server.server); auto it = ip_counter.find(empty_server_ip); auto empty_server_ip_count = it != ip_counter.end() ? it->second : 0; @@ -2942,7 +2951,6 @@ void ChunkWorker::doChunkJobs(Chunk *c, uint16_t serverCount) { if (rebalanceChunkParts(c, calc, false, ip_occurrence)) { return; } - } bool ChunkWorker::deleteUnusedChunks() { @@ -2989,6 +2997,13 @@ bool ChunkWorker::deleteUnusedChunks() { void ChunkWorker::mainLoop() { Chunk *c; + auto updateSortedServersIfNeeded = [&]() { + if (matocsserv_sorted_servers_need_refresh()) { + updateSortedServers(); + matocsserv_sorted_servers_refresh_done(); + } + }; + reenter(this) { stack_.work_limit.setMaxDuration(std::chrono::milliseconds(ChunksLoopTimeout)); stack_.work_limit.start(); @@ -3026,6 +3041,7 @@ void ChunkWorker::mainLoop() { if (stack_.watchdog.expired()) { yield; stack_.watchdog.start(); + updateSortedServersIfNeeded(); } } } @@ -3052,6 +3068,7 @@ void ChunkWorker::mainLoop() { // regenerate usable_server_count matocsserv_usagedifference(nullptr, nullptr, &stack_.usable_server_count, nullptr); + updateSortedServersIfNeeded(); stack_.node = gChunksMetadata->chunkhash[stack_.current_bucket]; while (stack_.node) { @@ -3065,6 +3082,7 @@ void ChunkWorker::mainLoop() { matocsserv_usagedifference(nullptr, nullptr, &stack_.usable_server_count, nullptr); + updateSortedServersIfNeeded(); } } diff --git a/src/master/matocsserv.cc b/src/master/matocsserv.cc index f1999b95b..0c632646e 100644 --- a/src/master/matocsserv.cc +++ b/src/master/matocsserv.cc @@ -76,6 +76,7 @@ enum class ChunkserverConnectionMode : std::uint8_t { double gLoadFactorPenalty = 0.; bool gPrioritizeDataParts = true; +bool gSortedServersNeedsRefresh = false; // A safe threshold of 15 seconds to determine whether chunks registration is in progress and // prevent dumping metadata. @@ -169,6 +170,14 @@ csdbentry *matocsserv_get_csdb(matocsserventry *eptr) { return eptr->csdb; } +bool matocsserv_sorted_servers_need_refresh() { + return gSortedServersNeedsRefresh; +} + +void matocsserv_sorted_servers_refresh_done() { + gSortedServersNeedsRefresh = false; +} + /* replications DB */ // Replication hash parameters @@ -1586,6 +1595,8 @@ void matocsserv_serve(const std::vector &pdesc) { tcpclose(eptr->sock); entriesIterator = matocsservList.erase(entriesIterator); + + gSortedServersNeedsRefresh = true; } else { ++entriesIterator; } @@ -1655,6 +1666,10 @@ void matocsserv_reload() { lsock = newlsock; } +bool matocsserv_is_killed(matocsserventry* eptr) { + return eptr != nullptr && eptr->mode == ChunkserverConnectionMode::KILL; +} + uint32_t matocsserv_get_version(matocsserventry *eptr) { return eptr->version; } diff --git a/src/master/matocsserv.h b/src/master/matocsserv.h index a5f5df5cd..da64d1565 100644 --- a/src/master/matocsserv.h +++ b/src/master/matocsserv.h @@ -86,10 +86,19 @@ double matocsserv_get_usage(matocsserventry* eptr); /*! \brief Get chunkservers ordered by disk usage. */ std::vector matocsserv_getservers_sorted(); +/*! \brief Check if chunkserver is killed. */ +bool matocsserv_is_killed(matocsserventry* eptr); + uint32_t matocsserv_get_version(matocsserventry* eptr); void matocsserv_usagedifference(double *minusage, double *maxusage, uint16_t *usablescount, uint16_t *totalscount); +/*! \brief Check if sorted servers need refresh. */ +bool matocsserv_sorted_servers_need_refresh(); + +/*! \brief Acknowledge that sorted servers have been refreshed. */ +void matocsserv_sorted_servers_refresh_done(); + /*! \brief Get chunkservers for a new chunk. * * This function returns a list of chunkservers that can be used for a new chunk creation. The