Skip to content

Commit a651659

Browse files
committed
fix: avoid stopping CS with pending replies
It was noticed that could happen that CS gracefully stops without sending all the expected responses. Those missing responses could cause the master to invalidate some chunk parts when it was expected everything to be ok. The cause for such behavior comes from the span of time from the instant in which a worker thread (from a jobPool) gets a new job and the instant the job is processed. In that period of time, the job is not counted when calling getJobCount, so the jobPool may appear empty but there were still some jobs in need of processing. This was happening the in jobPools attending requests from clients, and a similar issue happened to the jobPools attending requests from the master. The solution targets improving the accuracy of the stop conditions considering the previous issue. Signed-off-by: Dave <dave@leil.io>
1 parent 642aaf6 commit a651659

File tree

6 files changed

+46
-14
lines changed

6 files changed

+46
-14
lines changed

src/chunkserver/bgjobs.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *e
154154
: 1;
155155
jobsQueue->put(jobId, operation, reinterpret_cast<uint8_t *>(listenerInfo.jobHash[jobId].get()),
156156
1, priority);
157+
unprocessedJobs_.fetch_add(1, std::memory_order_relaxed);
157158
return jobId;
158159
}
159160

@@ -180,6 +181,20 @@ uint32_t JobPool::addLockJob(JobCallback callback, void *extra, uint32_t listene
180181
return jobId;
181182
}
182183

184+
bool JobPool::allJobsProcessed() const {
185+
return unprocessedJobs_.load(std::memory_order_relaxed) == 0;
186+
}
187+
188+
bool JobPool::isEmpty() {
189+
if (jobsQueue->elements() > 0) { return false; }
190+
191+
for (auto &listenerInfo : listenerInfos_) {
192+
std::lock_guard lock(listenerInfo.notifierMutex);
193+
if (!listenerInfo.statusQueue.empty()) { return false; }
194+
}
195+
return true;
196+
}
197+
183198
uint32_t JobPool::getJobCount() const {
184199
TRACETHIS();
185200
return jobsQueue->elements();
@@ -264,6 +279,7 @@ void JobPool::processCompletedJobs(uint32_t listenerId) {
264279
auto callback = jobIterator->second->callback;
265280
if (callback) { callback(status, jobIterator->second->extra); }
266281
listenerInfo.jobHash.erase(jobIterator);
282+
unprocessedJobs_.fetch_sub(1, std::memory_order_relaxed);
267283
}
268284
}
269285
}

src/chunkserver/bgjobs.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,17 @@ class JobPool {
139139
/// @return The ID of the added lock job.
140140
uint32_t addLockJob(JobCallback callback, void *extra, uint32_t listenerId = 0);
141141

142+
/// @brief Returns whether all jobs in the JobPool have been processed by the worker threads.
143+
/// This is a very accurate way to check if there are pending jobs in the JobPool, as it counts
144+
/// the number of jobs that have been added but not yet passed by processCompletedJobs. Must
145+
/// not be used for the masterConn's jobPool due to the special behavior of the lock jobs.
146+
bool allJobsProcessed() const;
147+
148+
/// @brief Checks if the JobPool has no jobs and no status to be sent.
149+
/// This function is a lighter version of allJobsProcessed that can be used for the masterConn's
150+
/// jobPool to check if it is idle.
151+
bool isEmpty();
152+
142153
/// @brief Gets the number of jobs in the JobPool.
143154
uint32_t getJobCount() const;
144155

@@ -303,6 +314,10 @@ class JobPool {
303314
uint8_t workers; /// Number of worker threads in the pool.
304315
std::vector<std::thread> workerThreads; /// Vector of worker threads.
305316
std::unique_ptr<ProducerConsumerQueue> jobsQueue; /// Queue for jobs.
317+
/// Counter for unprocessed jobs, i.e jobs that have been added to the JobPool but have not yet
318+
/// been passed by processCompletedJobs and had their callbacks called. This is used to make
319+
/// sure the JobPool is truly empty when stopping the chunkserver.
320+
std::atomic<uint32_t> unprocessedJobs_{0};
306321
};
307322

308323
/// @brief Adds an open job to the JobPool.

src/chunkserver/masterconn.cc

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,9 @@ JobPool* masterconn_get_job_pool() {
144144
return gJobPool.get();
145145
}
146146

147-
int masterconn_canexit(void) {
148-
if (gJobPool->getJobCount() == 0 && gReplicationJobPool->getJobCount() == 0 &&
149-
gMasterConnSingleton->isOutputQueueEmpty()) {
150-
return 1;
151-
}
152-
return 0;
147+
bool masterconn_canexit() {
148+
return gJobPool->isEmpty() && gReplicationJobPool->isEmpty() &&
149+
gMasterConnSingleton->isOutputQueueEmpty();
153150
}
154151

155152
void masterconn_term(void) {
@@ -321,7 +318,6 @@ int masterconn_init(void) {
321318
eventloop_timeregister(TIMEMODE_RUN_LATE, reconnectionDelay,
322319
rnd_ranged<uint32_t>(reconnectionDelay), masterconn_reconnect);
323320

324-
eventloop_canexitregister(masterconn_canexit);
325321
eventloop_destructregister(masterconn_term);
326322
eventloop_pollregister(masterconn_desc, masterconn_serve);
327323
eventloop_reloadregister(masterconn_reload);

src/chunkserver/masterconn.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@ void masterconn_stats(uint64_t *bin, uint64_t *bout, uint32_t *maxjobscnt);
2929
int masterconn_init(void);
3030
int masterconn_init_threads(void);
3131
JobPool* masterconn_get_job_pool();
32+
bool masterconn_canexit();

src/chunkserver/network_main_thread.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "chunkserver/chunk_replicator.h"
3636
#include "chunkserver/g_limiters.h"
3737
#include "chunkserver/hdd_readahead.h"
38+
#include "chunkserver/masterconn.h"
3839
#include "chunkserver/network_main_thread.h"
3940
#include "chunkserver/network_worker_thread.h"
4041
#include "common/cwrap.h"
@@ -210,15 +211,18 @@ void mainNetworkThreadWantExit(void) {
210211
gDoTerminate.store(true);
211212
}
212213

213-
int mainNetworkThreadCanExit(void) {
214+
bool mainNetworkThreadCanExit() {
214215
TRACETHIS();
215216
bool allTerminated = true;
216217
for (auto &threadObject : networkThreadObjects) {
217-
if (!threadObject.updateAndCheckTerminationStatus()) {
218-
allTerminated = false;
219-
}
218+
if (!threadObject.updateAndCheckTerminationStatus()) { allTerminated = false; }
220219
}
221-
return allTerminated ? 1 : 0;
220+
return allTerminated;
221+
}
222+
223+
int canExit() {
224+
// This order must be preserved
225+
return mainNetworkThreadCanExit() && masterconn_canexit();
222226
}
223227

224228
void mainNetworkThreadTerm(void) {
@@ -313,7 +317,7 @@ int mainNetworkThreadInit(void) {
313317

314318
eventloop_reloadregister(mainNetworkThreadReload);
315319
eventloop_wantexitregister(mainNetworkThreadWantExit);
316-
eventloop_canexitregister(mainNetworkThreadCanExit);
320+
eventloop_canexitregister(canExit);
317321
eventloop_destructregister(mainNetworkThreadTerm);
318322
eventloop_pollregister(mainNetworkThreadDesc, mainNetworkThreadServe);
319323

src/chunkserver/network_worker_thread.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ bool NetworkWorkerThread::updateAndCheckTerminationStatus() {
125125
std::lock_guard lock(csservheadLock);
126126
bool canTerminate =
127127
doTerminate.load() && ((csservEntries.empty() &&
128-
(bgJobPool_.get() == nullptr || bgJobPool_->getJobCount() == 0)) ||
128+
(bgJobPool_.get() == nullptr || bgJobPool_->allJobsProcessed())) ||
129129
terminationTimer_.elapsed_ms() > kNWForcefulTerminationTimeout_ms);
130130
canTerminate_.store(canTerminate);
131131
return canTerminate;

0 commit comments

Comments
 (0)