Skip to content

Commit 593c5e5

Browse files
committed
fixes
1 parent 18e3a9d commit 593c5e5

File tree

4 files changed

+21
-7
lines changed

4 files changed

+21
-7
lines changed

src/chunkserver/masterconn.cc

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "chunkserver/bgjobs.h"
3939
#include "chunkserver/hddspacemgr.h"
4040
#include "chunkserver/master_connection.h"
41+
#include "chunkserver/network_main_thread.h"
4142
#include "common/event_loop.h"
4243
#include "common/massert.h"
4344
#include "common/network_address.h"
@@ -77,8 +78,6 @@ constexpr uint32_t kDefaultReplicationNumberOfWorkers = 5;
7778
constexpr uint32_t kMinReplicationNumberOfWorkers = 1;
7879
static uint32_t gReplicationNumberOfWorkers = kDefaultReplicationNumberOfWorkers;
7980

80-
static std::atomic<bool> gDoTerminate = false;
81-
8281
static void* gReconnectHook;
8382

8483
// Stats
@@ -145,8 +144,6 @@ JobPool* masterconn_get_job_pool() {
145144
return gJobPool.get();
146145
}
147146

148-
void masterconn_wantexit(void) { gDoTerminate.store(true); }
149-
150147
int masterconn_canexit(void) {
151148
if (gJobPool->getJobCount() == 0 && gReplicationJobPool->getJobCount() == 0 &&
152149
gMasterConnSingleton->isOutputQueueEmpty()) {
@@ -187,7 +184,7 @@ void masterconn_desc(std::vector<pollfd> &pdesc) {
187184
}
188185
}
189186

190-
eptr->providePollDescriptors(pdesc, gDoTerminate.load());
187+
eptr->providePollDescriptors(pdesc, doTerminate());
191188
}
192189

193190
void masterconn_send_status() {
@@ -323,7 +320,7 @@ int masterconn_init(void) {
323320
gReconnectHook =
324321
eventloop_timeregister(TIMEMODE_RUN_LATE, reconnectionDelay,
325322
rnd_ranged<uint32_t>(reconnectionDelay), masterconn_reconnect);
326-
eventloop_wantexitregister(masterconn_wantexit);
323+
327324
eventloop_canexitregister(masterconn_canexit);
328325
eventloop_destructregister(masterconn_term);
329326
eventloop_pollregister(masterconn_desc, masterconn_serve);

src/chunkserver/network_main_thread.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ static uint32_t gNrOfNetworkWorkers;
6464
static uint32_t gNrOfHddWorkersPerNetworkWorker;
6565
static uint32_t gBgjobsCountPerNetworkWorker;
6666

67+
static std::atomic<bool> gDoTerminate = false;
68+
69+
bool doTerminate() {
70+
return gDoTerminate.load();
71+
}
72+
6773
void chunkReplicatorReload() {
6874
unsigned rep_total = cfg_get_minmaxvalue<unsigned>("REPLICATION_TOTAL_TIMEOUT_MS",
6975
ChunkReplicator::kDefaultTotalTimeout_ms,
@@ -177,6 +183,10 @@ void mainNetworkThreadReload(void) {
177183

178184
void mainNetworkThreadDesc(std::vector<pollfd> &pdesc) {
179185
TRACETHIS();
186+
if (doTerminate()) {
187+
return;
188+
}
189+
180190
pdesc.push_back({lsock, POLLIN, 0});
181191
lsockpdescpos = pdesc.size() - 1;
182192
}
@@ -196,6 +206,8 @@ void mainNetworkThreadWantExit(void) {
196206
for (auto& threadObject : networkThreadObjects) {
197207
threadObject.askForTermination();
198208
}
209+
210+
gDoTerminate.store(true);
199211
}
200212

201213
int mainNetworkThreadCanExit(void) {
@@ -222,6 +234,10 @@ void mainNetworkThreadTerm(void) {
222234

223235
void mainNetworkThreadServe(const std::vector<pollfd> &pdesc) {
224236
TRACETHIS();
237+
if (doTerminate()) {
238+
return;
239+
}
240+
225241
int newSocketFD;
226242

227243
if (lsockpdescpos >= 0 && (pdesc[lsockpdescpos].revents & POLLIN)) {

src/chunkserver/network_main_thread.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,5 @@ int mainNetworkThreadInitThreads(void);
2828

2929
uint32_t mainNetworkThreadGetListenIp();
3030
uint16_t mainNetworkThreadGetListenPort();
31+
32+
bool doTerminate();

src/chunkserver/network_worker_thread.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,6 @@ void NetworkWorkerThread::askForTermination() {
335335
doTerminate = true;
336336
std::unique_lock lock(csservheadLock);
337337
terminationTimer_.reset();
338-
for (auto &entry : csservEntries) { entry.closeJobs(); }
339338
}
340339

341340
void NetworkWorkerThread::addConnection(int newSocketFD) {

0 commit comments

Comments
 (0)