Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/master/chunks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2306,6 +2306,7 @@ class ChunkWorker : public coroutine {
ChunkCopiesCalculator& calc, const IpCounter &ip_counter);
bool rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator& calc, bool only_todel, const IpCounter &ip_counter);
bool rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator &calc, const IpCounter &ip_counter);
void updateSortedServers();

loop_info inforec_;
uint32_t deleteNotDone_;
Expand Down Expand Up @@ -2364,14 +2365,18 @@ void ChunkWorker::doEveryLoopTasks() {
chunksinfo_loopend = eventloop_time();
}

void ChunkWorker::doEverySecondTasks() {
void ChunkWorker::updateSortedServers() {
sortedServers_ = matocsserv_getservers_sorted();
labeledSortedServers_.clear();
for (const ServerWithUsage& sw : sortedServers_) {
labeledSortedServers_[sw.label].push_back(sw);
}
}

void ChunkWorker::doEverySecondTasks() {
updateSortedServers();
}

static bool chunkPresentOnServer(Chunk *c, matocsserventry *server) {
auto server_csid = matocsserv_get_csdb(server)->csid;
return std::any_of(c->parts.begin(), c->parts.end(), [server_csid](const ChunkPart &part) {
Expand Down Expand Up @@ -2712,6 +2717,8 @@ bool ChunkWorker::rebalanceChunkParts(Chunk *c, ChunkCopiesCalculator &calc, boo
: labeledSortedServers_[current_copy_label];

for (const auto &empty_server : sorted_servers) {
if (matocsserv_is_killed(empty_server.server)) { continue; }

if (!only_todel && gAvoidSameIpChunkservers) {
auto empty_server_ip = matocsserv_get_servip(empty_server.server);
auto it = ip_counter.find(empty_server_ip);
Expand Down Expand Up @@ -2784,6 +2791,8 @@ bool ChunkWorker::rebalanceChunkPartsWithSameIp(Chunk *c, ChunkCopiesCalculator
});

for (const auto &empty_server : sorted_by_ip_count) {
if (matocsserv_is_killed(empty_server.server)) { continue; }

auto empty_server_ip = matocsserv_get_servip(empty_server.server);
auto it = ip_counter.find(empty_server_ip);
auto empty_server_ip_count = it != ip_counter.end() ? it->second : 0;
Expand Down Expand Up @@ -2942,7 +2951,6 @@ void ChunkWorker::doChunkJobs(Chunk *c, uint16_t serverCount) {
if (rebalanceChunkParts(c, calc, false, ip_occurrence)) {
return;
}

}

bool ChunkWorker::deleteUnusedChunks() {
Expand Down Expand Up @@ -2989,6 +2997,13 @@ bool ChunkWorker::deleteUnusedChunks() {
void ChunkWorker::mainLoop() {
Chunk *c;

auto updateSortedServersIfNeeded = [&]() {
if (matocsserv_sorted_servers_need_refresh()) {
updateSortedServers();
matocsserv_sorted_servers_refresh_done();
}
};

reenter(this) {
stack_.work_limit.setMaxDuration(std::chrono::milliseconds(ChunksLoopTimeout));
stack_.work_limit.start();
Expand Down Expand Up @@ -3026,6 +3041,7 @@ void ChunkWorker::mainLoop() {
if (stack_.watchdog.expired()) {
yield;
stack_.watchdog.start();
updateSortedServersIfNeeded();
}
}
}
Expand All @@ -3052,6 +3068,7 @@ void ChunkWorker::mainLoop() {
// regenerate usable_server_count
matocsserv_usagedifference(nullptr, nullptr, &stack_.usable_server_count,
nullptr);
updateSortedServersIfNeeded();

stack_.node = gChunksMetadata->chunkhash[stack_.current_bucket];
while (stack_.node) {
Expand All @@ -3065,6 +3082,7 @@ void ChunkWorker::mainLoop() {
matocsserv_usagedifference(nullptr, nullptr,
&stack_.usable_server_count,
nullptr);
updateSortedServersIfNeeded();
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/master/matocsserv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ enum class ChunkserverConnectionMode : std::uint8_t {

double gLoadFactorPenalty = 0.;
bool gPrioritizeDataParts = true;
bool gSortedServersNeedsRefresh = false;

// A safe threshold of 15 seconds to determine whether chunks registration is in progress and
// prevent dumping metadata.
Expand Down Expand Up @@ -169,6 +170,14 @@ csdbentry *matocsserv_get_csdb(matocsserventry *eptr) {
return eptr->csdb;
}

bool matocsserv_sorted_servers_need_refresh() {
return gSortedServersNeedsRefresh;
}

void matocsserv_sorted_servers_refresh_done() {
gSortedServersNeedsRefresh = false;
}

/* replications DB */

// Replication hash parameters
Expand Down Expand Up @@ -1586,6 +1595,8 @@ void matocsserv_serve(const std::vector<pollfd> &pdesc) {
tcpclose(eptr->sock);

entriesIterator = matocsservList.erase(entriesIterator);

gSortedServersNeedsRefresh = true;
} else {
++entriesIterator;
}
Expand Down Expand Up @@ -1655,6 +1666,10 @@ void matocsserv_reload() {
lsock = newlsock;
}

bool matocsserv_is_killed(matocsserventry* eptr) {
return eptr != nullptr && eptr->mode == ChunkserverConnectionMode::KILL;
}

uint32_t matocsserv_get_version(matocsserventry *eptr) {
return eptr->version;
}
Expand Down
9 changes: 9 additions & 0 deletions src/master/matocsserv.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,19 @@ double matocsserv_get_usage(matocsserventry* eptr);
/*! \brief Get chunkservers ordered by disk usage. */
std::vector<ServerWithUsage> matocsserv_getservers_sorted();

/*! \brief Check if chunkserver is killed. */
bool matocsserv_is_killed(matocsserventry* eptr);

uint32_t matocsserv_get_version(matocsserventry* eptr);
void matocsserv_usagedifference(double *minusage, double *maxusage, uint16_t *usablescount,
uint16_t *totalscount);

/*! \brief Check if sorted servers need refresh. */
bool matocsserv_sorted_servers_need_refresh();

/*! \brief Acknowledge that sorted servers have been refreshed. */
void matocsserv_sorted_servers_refresh_done();

/*! \brief Get chunkservers for a new chunk.
*
* This function returns a list of chunkservers that can be used for a new chunk creation. The
Expand Down