Skip to content

Commit 6d0fe4e

Browse files
authored
Merge branch 'dev' into feat-add-gc-stats-to-chart
2 parents 0e7e736 + d57c5d3 commit 6d0fe4e

File tree

7 files changed

+58
-13
lines changed

7 files changed

+58
-13
lines changed

src/chunkserver/bgjobs.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *e
154154
: 1;
155155
jobsQueue->put(jobId, operation, reinterpret_cast<uint8_t *>(listenerInfo.jobHash[jobId].get()),
156156
1, priority);
157+
unprocessedJobs_.fetch_add(1, std::memory_order_relaxed);
157158
return jobId;
158159
}
159160

@@ -180,6 +181,20 @@ uint32_t JobPool::addLockJob(JobCallback callback, void *extra, uint32_t listene
180181
return jobId;
181182
}
182183

184+
bool JobPool::allJobsProcessed() const {
185+
return unprocessedJobs_.load(std::memory_order_relaxed) == 0;
186+
}
187+
188+
bool JobPool::isEmpty() {
189+
if (jobsQueue->elements() > 0) { return false; }
190+
191+
for (auto &listenerInfo : listenerInfos_) {
192+
std::lock_guard lock(listenerInfo.notifierMutex);
193+
if (!listenerInfo.statusQueue.empty()) { return false; }
194+
}
195+
return true;
196+
}
197+
183198
uint32_t JobPool::getJobCount() const {
184199
TRACETHIS();
185200
return jobsQueue->elements();
@@ -264,6 +279,7 @@ void JobPool::processCompletedJobs(uint32_t listenerId) {
264279
auto callback = jobIterator->second->callback;
265280
if (callback) { callback(status, jobIterator->second->extra); }
266281
listenerInfo.jobHash.erase(jobIterator);
282+
unprocessedJobs_.fetch_sub(1, std::memory_order_relaxed);
267283
}
268284
}
269285
}

src/chunkserver/bgjobs.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,17 @@ class JobPool {
139139
/// @return The ID of the added lock job.
140140
uint32_t addLockJob(JobCallback callback, void *extra, uint32_t listenerId = 0);
141141

142+
/// @brief Returns whether all jobs in the JobPool have been processed by the worker threads.
143+
/// This is a very accurate way to check if there are pending jobs in the JobPool, as it counts
144+
/// the number of jobs that have been added but not yet passed by processCompletedJobs. Must
145+
/// not be used for the masterConn's jobPool due to the special behavior of the lock jobs.
146+
bool allJobsProcessed() const;
147+
148+
/// @brief Checks if the JobPool has no jobs and no status to be sent.
149+
/// This function is a lighter version of allJobsProcessed that can be used for the masterConn's
150+
/// jobPool to check if it is idle.
151+
bool isEmpty();
152+
142153
/// @brief Gets the number of jobs in the JobPool.
143154
uint32_t getJobCount() const;
144155

@@ -303,6 +314,10 @@ class JobPool {
303314
uint8_t workers; /// Number of worker threads in the pool.
304315
std::vector<std::thread> workerThreads; /// Vector of worker threads.
305316
std::unique_ptr<ProducerConsumerQueue> jobsQueue; /// Queue for jobs.
317+
/// Counter for unprocessed jobs, i.e jobs that have been added to the JobPool but have not yet
318+
/// been passed by processCompletedJobs and had their callbacks called. This is used to make
319+
/// sure the JobPool is truly empty when stopping the chunkserver.
320+
std::atomic<uint32_t> unprocessedJobs_{0};
306321
};
307322

308323
/// @brief Adds an open job to the JobPool.

src/chunkserver/masterconn.cc

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,10 @@ JobPool* masterconn_get_job_pool() {
144144
return gJobPool.get();
145145
}
146146

147-
int masterconn_canexit(void) {
148-
if (gJobPool->getJobCount() == 0 && gReplicationJobPool->getJobCount() == 0 &&
149-
gMasterConnSingleton->isOutputQueueEmpty()) {
150-
return 1;
151-
}
152-
return 0;
147+
bool masterconn_canexit() {
148+
return gMasterConnSingleton->mode() != ConnectionMode::CONNECTED ||
149+
(gJobPool->isEmpty() && gReplicationJobPool->isEmpty() &&
150+
gMasterConnSingleton->isOutputQueueEmpty());
153151
}
154152

155153
void masterconn_term(void) {
@@ -321,7 +319,6 @@ int masterconn_init(void) {
321319
eventloop_timeregister(TIMEMODE_RUN_LATE, reconnectionDelay,
322320
rnd_ranged<uint32_t>(reconnectionDelay), masterconn_reconnect);
323321

324-
eventloop_canexitregister(masterconn_canexit);
325322
eventloop_destructregister(masterconn_term);
326323
eventloop_pollregister(masterconn_desc, masterconn_serve);
327324
eventloop_reloadregister(masterconn_reload);

src/chunkserver/masterconn.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@ void masterconn_stats(uint64_t *bin, uint64_t *bout, uint32_t *maxjobscnt);
2929
int masterconn_init(void);
3030
int masterconn_init_threads(void);
3131
JobPool* masterconn_get_job_pool();
32+
bool masterconn_canexit();

src/chunkserver/network_main_thread.cc

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "chunkserver/chunk_replicator.h"
3636
#include "chunkserver/g_limiters.h"
3737
#include "chunkserver/hdd_readahead.h"
38+
#include "chunkserver/masterconn.h"
3839
#include "chunkserver/network_main_thread.h"
3940
#include "chunkserver/network_worker_thread.h"
4041
#include "common/cwrap.h"
@@ -210,15 +211,23 @@ void mainNetworkThreadWantExit(void) {
210211
gDoTerminate.store(true);
211212
}
212213

213-
int mainNetworkThreadCanExit(void) {
214+
bool networkThreadsCanExit() {
214215
TRACETHIS();
215216
bool allTerminated = true;
216217
for (auto &threadObject : networkThreadObjects) {
217-
if (!threadObject.updateAndCheckTerminationStatus()) {
218-
allTerminated = false;
219-
}
218+
if (!threadObject.updateAndCheckTerminationStatus()) { allTerminated = false; }
220219
}
221-
return allTerminated ? 1 : 0;
220+
return allTerminated;
221+
}
222+
223+
int mainNetworkThreadCanExit() {
224+
// Preserve this order:
225+
// networkThreadsCanExit() must be checked before masterconn_canexit().
226+
// If masterconn_canexit() is checked first, a network worker may still be processing an
227+
// endChunkLock, which could add statuses to the masterconn job pool after masterconn_canexit()
228+
// returns true. This could lead to the chunkserver exiting prematurely while holding chunk
229+
// locks that have not been replied to the master.
230+
return networkThreadsCanExit() && masterconn_canexit();
222231
}
223232

224233
void mainNetworkThreadTerm(void) {

src/chunkserver/network_worker_thread.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,15 @@ void NetworkWorkerThread::operator()() {
122122
}
123123

124124
bool NetworkWorkerThread::updateAndCheckTerminationStatus() {
125+
// Don't even check the rest if we already know we can terminate.
126+
if (canTerminate_.load()) {
127+
return true;
128+
}
129+
125130
std::lock_guard lock(csservheadLock);
126131
bool canTerminate =
127132
doTerminate.load() && ((csservEntries.empty() &&
128-
(bgJobPool_.get() == nullptr || bgJobPool_->getJobCount() == 0)) ||
133+
(bgJobPool_.get() == nullptr || bgJobPool_->allJobsProcessed())) ||
129134
terminationTimer_.elapsed_ms() > kNWForcefulTerminationTimeout_ms);
130135
canTerminate_.store(canTerminate);
131136
return canTerminate;

tests/test_suites/SingleMachineTests/test_helgrind_basic.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,5 @@ generateFiles
5353
# finishes validating files and sometimes it gets stuck
5454
# drop_caches
5555
# validateFiles
56+
57+
saunafs_chunkserver_daemon 0 stop

0 commit comments

Comments
 (0)