Skip to content

Commit 196e86f

Browse files
committed
fix: fix CS kill/crash when writing data
Recent tests show chunks unavailable when performing the following test: - start writing small files in ec(6,2) in the background. - kill two chunkservers. - wait for the writes of the files to finish. - bring the two chunkservers back. - wait for the data to be replicated. - stop some other two chunkservers. - validate data is available. In the last step, there are six chunkservers available and no chunk parts missing so there should be chunks unavailable. The error happening was CRC error in the kill and restarted chunkservers. The issue found is the following: - some chunk gets its data parts successfully written to the drive. - the client gets to know this (chunk write finished OK) and sends WRITE_END packet to the CSs. - the CS gets killed after receiving the WRITE_END but before doing the job_close (hddClose) that is the responsable function to sync the metadata parts to the drive. Therefore, the data parts of those chunks are fine, but the CRC of the blocks is incorrect. - the client unlocks the chunk in the master side (WRITE_END packet) without noticing any issue and without retrying the write (since it finished everything it had to write). - there is no version increase in the other chunk parts and after the CS is restarted, its chunk parts are registered as good ones, despite the previously mentioned CRC error (which no component knows about). - after stopping other CSs and trying to write, the issue emerges. The solution so far is to move the endChunkLock call to after the job_close is processed and increase the priority of the close operations. This way we make sure that master receives notice about the write end after all that chunk part related operations are completed. This solution does not solve the case when USE_CHUNKSERVER_SIDE_CHUNK_LOCK option is disabled. A test was added to check the previously mentioned scenario. Signed-off-by: Dave <dave@leil.io>
1 parent 238a9c8 commit 196e86f

File tree

8 files changed

+176
-18
lines changed

8 files changed

+176
-18
lines changed

src/chunkserver/bgjobs.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,14 @@ uint32_t JobPool::addJob(ChunkOperation operation, JobCallback callback, void *e
146146
job->state = JobPool::State::Enabled;
147147
job->listenerId = listenerId;
148148
listenerInfo.jobHash[jobId] = std::move(job);
149-
// Use higher priority (0) for Open and GetBlocks operations
150-
jobsQueue->put(
151-
jobId, operation, reinterpret_cast<uint8_t *>(listenerInfo.jobHash[jobId].get()), 1,
152-
(operation == ChunkOperation::Open || operation == ChunkOperation::GetBlocks) ? 0 : 1);
149+
// Use higher priority (0) for Open, Close and GetBlocks operations
150+
uint8_t priority =
151+
(operation == ChunkOperation::Open || operation == ChunkOperation::GetBlocks ||
152+
operation == ChunkOperation::Close)
153+
? 0
154+
: 1;
155+
jobsQueue->put(jobId, operation, reinterpret_cast<uint8_t *>(listenerInfo.jobHash[jobId].get()),
156+
1, priority);
153157
return jobId;
154158
}
155159

src/chunkserver/chunk_high_level_ops.cc

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -549,16 +549,34 @@ void WriteHighLevelOp::cleanup() {
549549
}
550550

551551
if (isChunkOpen_) {
552-
job_close(*workerJobPool(), kEmptyCallback, chunkId_, chunkType_);
552+
if (isChunkLocked_) {
553+
// We need to wait for sync the metadata before releasing the lock, so we use callback to
554+
// release the lock after
555+
job_close(*workerJobPool(), jobCloseWriteCallback(chunkId_, chunkType_, SAUNAFS_STATUS_OK),
556+
chunkId_, chunkType_);
557+
} else {
558+
job_close(*workerJobPool(), kEmptyCallback, chunkId_, chunkType_);
559+
}
553560
isChunkOpen_ = false;
554-
}
555-
556-
if (isChunkLocked_) {
561+
} else if (isChunkLocked_) {
557562
masterconn_get_job_pool()->endChunkLock(chunkId_, chunkType_, SAUNAFS_STATUS_OK);
558563
}
564+
559565
isChunkLocked_ = false;
560566
partiallyCompletedWrites_.clear();
561567
chunkId_ = 0;
562568
chunkVersion_ = 0;
563569
chunkType_ = slice_traits::standard::ChunkPartType();
564570
}
571+
572+
std::function<void(uint8_t status, void *packet)> jobCloseWriteCallback(uint64_t chunkId,
573+
ChunkPartType chunkType,
574+
uint8_t untoldStatus) {
575+
return [chunkId, chunkType, untoldStatus](uint8_t status, void * /*entry*/) {
576+
if (untoldStatus == SAUNAFS_STATUS_OK && status != SAUNAFS_STATUS_OK) {
577+
masterconn_get_job_pool()->endChunkLock(chunkId, chunkType, status);
578+
} else {
579+
masterconn_get_job_pool()->endChunkLock(chunkId, chunkType, untoldStatus);
580+
}
581+
};
582+
}

src/chunkserver/chunk_high_level_ops.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,3 +318,16 @@ class WriteHighLevelOp : public HighLevelOp {
318318
/// List of write data buffers waiting to be written to the chunk.
319319
std::list<std::shared_ptr<InputBuffer>> writeDataBuffers_;
320320
};
321+
322+
/// @brief Creates a callback function for closing a write operation.
323+
/// This is only used for write operations, such that the chunk is locked. This callback will be
324+
/// called after the close operation is completed, to release the chunk lock. The close job syncs
325+
/// the metadata part of the chunk, so it is important for the master to know if also that operation
326+
/// succeeded, and not only the write data jobs.
327+
/// @param chunkId ID of the chunk.
328+
/// @param chunkType Type of the chunk.
329+
/// @param untoldStatus Error status untold to the client, to be sent to the master.
330+
/// @return A function to be used as a callback for closing a write operation.
331+
std::function<void(uint8_t status, void *packet)> jobCloseWriteCallback(uint64_t chunkId,
332+
ChunkPartType chunkType,
333+
uint8_t untoldStatus);

src/chunkserver/masterconn.cc

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "chunkserver/bgjobs.h"
3939
#include "chunkserver/hddspacemgr.h"
4040
#include "chunkserver/master_connection.h"
41+
#include "chunkserver/network_main_thread.h"
4142
#include "common/event_loop.h"
4243
#include "common/massert.h"
4344
#include "common/network_address.h"
@@ -77,8 +78,6 @@ constexpr uint32_t kDefaultReplicationNumberOfWorkers = 5;
7778
constexpr uint32_t kMinReplicationNumberOfWorkers = 1;
7879
static uint32_t gReplicationNumberOfWorkers = kDefaultReplicationNumberOfWorkers;
7980

80-
static std::atomic<bool> gDoTerminate = false;
81-
8281
static void* gReconnectHook;
8382

8483
// Stats
@@ -145,8 +144,6 @@ JobPool* masterconn_get_job_pool() {
145144
return gJobPool.get();
146145
}
147146

148-
void masterconn_wantexit(void) { gDoTerminate.store(true); }
149-
150147
int masterconn_canexit(void) {
151148
if (gJobPool->getJobCount() == 0 && gReplicationJobPool->getJobCount() == 0 &&
152149
gMasterConnSingleton->isOutputQueueEmpty()) {
@@ -187,7 +184,7 @@ void masterconn_desc(std::vector<pollfd> &pdesc) {
187184
}
188185
}
189186

190-
eptr->providePollDescriptors(pdesc, gDoTerminate.load());
187+
eptr->providePollDescriptors(pdesc, doTerminate());
191188
}
192189

193190
void masterconn_send_status() {
@@ -323,7 +320,7 @@ int masterconn_init(void) {
323320
gReconnectHook =
324321
eventloop_timeregister(TIMEMODE_RUN_LATE, reconnectionDelay,
325322
rnd_ranged<uint32_t>(reconnectionDelay), masterconn_reconnect);
326-
eventloop_wantexitregister(masterconn_wantexit);
323+
327324
eventloop_canexitregister(masterconn_canexit);
328325
eventloop_destructregister(masterconn_term);
329326
eventloop_pollregister(masterconn_desc, masterconn_serve);

src/chunkserver/network_main_thread.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ static uint32_t gNrOfNetworkWorkers;
6464
static uint32_t gNrOfHddWorkersPerNetworkWorker;
6565
static uint32_t gBgjobsCountPerNetworkWorker;
6666

67+
static std::atomic<bool> gDoTerminate = false;
68+
69+
bool doTerminate() {
70+
return gDoTerminate.load();
71+
}
72+
6773
void chunkReplicatorReload() {
6874
unsigned rep_total = cfg_get_minmaxvalue<unsigned>("REPLICATION_TOTAL_TIMEOUT_MS",
6975
ChunkReplicator::kDefaultTotalTimeout_ms,
@@ -177,6 +183,10 @@ void mainNetworkThreadReload(void) {
177183

178184
void mainNetworkThreadDesc(std::vector<pollfd> &pdesc) {
179185
TRACETHIS();
186+
if (doTerminate()) {
187+
return;
188+
}
189+
180190
pdesc.push_back({lsock, POLLIN, 0});
181191
lsockpdescpos = pdesc.size() - 1;
182192
}
@@ -196,6 +206,8 @@ void mainNetworkThreadWantExit(void) {
196206
for (auto& threadObject : networkThreadObjects) {
197207
threadObject.askForTermination();
198208
}
209+
210+
gDoTerminate.store(true);
199211
}
200212

201213
int mainNetworkThreadCanExit(void) {
@@ -222,6 +234,10 @@ void mainNetworkThreadTerm(void) {
222234

223235
void mainNetworkThreadServe(const std::vector<pollfd> &pdesc) {
224236
TRACETHIS();
237+
if (doTerminate()) {
238+
return;
239+
}
240+
225241
int newSocketFD;
226242

227243
if (lsockpdescpos >= 0 && (pdesc[lsockpdescpos].revents & POLLIN)) {

src/chunkserver/network_main_thread.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,5 @@ int mainNetworkThreadInitThreads(void);
2828

2929
uint32_t mainNetworkThreadGetListenIp();
3030
uint16_t mainNetworkThreadGetListenPort();
31+
32+
bool doTerminate();

src/chunkserver/network_worker_thread.cc

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,20 +85,28 @@ void NetworkWorkerThread::operator()() {
8585
static std::atomic_uint16_t threadCounter(0);
8686
std::string threadName = "netWorker_" + std::to_string(threadCounter++);
8787
pthread_setname_np(pthread_self(), threadName.c_str());
88+
bool lastDoTerminateValue = false;
8889

8990
while (!canTerminate_.load()) {
91+
if (doTerminate.load() && !lastDoTerminateValue) {
92+
// We've just switched to terminating mode, start wrapping up.
93+
lastDoTerminateValue = true;
94+
std::lock_guard lock(csservheadLock);
95+
for (auto &entry : csservEntries) { entry.closeJobs(); }
96+
}
97+
9098
preparePollFds(doTerminate.load());
9199
int fdWithEvents = poll(pdesc.data(), pdesc.size(), gPollTimeout);
92100

93101
if (fdWithEvents < 0) {
94102
if (errno == EAGAIN) {
95-
safs::log_warn("{}: poll returned EAGAIN", __func__);
103+
safs::log_warn("{} loop: poll returned EAGAIN", threadName);
96104
usleep(100000);
97105
continue;
98106
}
99107

100108
if (errno != EINTR) {
101-
safs::log_warn("{}: poll error: {}", __func__, strerr(errno));
109+
safs::log_warn("{} loop: poll error: {}", threadName, strerr(errno));
102110
break;
103111
}
104112
} else {
@@ -116,7 +124,8 @@ void NetworkWorkerThread::operator()() {
116124
bool NetworkWorkerThread::updateAndCheckTerminationStatus() {
117125
std::lock_guard lock(csservheadLock);
118126
bool canTerminate =
119-
doTerminate.load() && (csservEntries.empty() ||
127+
doTerminate.load() && ((csservEntries.empty() &&
128+
(bgJobPool_.get() == nullptr || bgJobPool_->getJobCount() == 0)) ||
120129
terminationTimer_.elapsed_ms() > kNWForcefulTerminationTimeout_ms);
121130
canTerminate_.store(canTerminate);
122131
return canTerminate;
@@ -334,7 +343,6 @@ void NetworkWorkerThread::askForTermination() {
334343
doTerminate = true;
335344
std::unique_lock lock(csservheadLock);
336345
terminationTimer_.reset();
337-
for (auto &entry : csservEntries) { entry.closeJobs(); }
338346
}
339347

340348
void NetworkWorkerThread::addConnection(int newSocketFD) {
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
timeout_set 25 minutes
2+
3+
CHUNKSERVERS=8 \
4+
USE_RAMDISK=YES \
5+
MOUNT_EXTRA_CONFIG="sfscachemode=NEVER,sfswriteworkers=30,sfsioretries=13" \
6+
CHUNKSERVER_EXTRA_CONFIG="NR_OF_NETWORK_WORKERS = 1|NR_OF_HDD_WORKERS_PER_NETWORK_WORKER = 1" \
7+
MASTER_EXTRA_CONFIG="CHUNKS_WRITE_REP_LIMIT = 10|CHUNKS_READ_REP_LIMIT = 10|`
8+
`CHUNKS_LOOP_MIN_TIME = 1|CHUNKS_LOOP_MAX_CPU = 90|CHUNKS_LOOP_PERIOD = 50|`
9+
`OPERATIONS_DELAY_INIT = 0" \
10+
MASTER_CUSTOM_GOALS="8 ec62: \$ec(6,2)"
11+
setup_local_empty_saunafs info
12+
13+
cd ${info[mount0]}
14+
15+
number_of_files=1000
16+
17+
for i in $(seq 1 ${number_of_files}); do
18+
dd if=/dev/random of=${TEMP_DIR}/file_$i bs=64K count=6 conv=fsync &> /dev/null
19+
done
20+
21+
mkdir dir
22+
saunafs setgoal ec62 dir
23+
saunafs settrashtime 0 dir
24+
25+
for test_loop in {1..10}; do
26+
for i in $(seq 1 ${number_of_files}); do
27+
dd if="${TEMP_DIR}/file_${i}" of="dir/file_${i}" bs=384K count=1 status=none &> /dev/null &
28+
done
29+
30+
# Write some files with all CSs available, then kill some CSs and write more files, to increase
31+
# the probability of having chunks with parts being written at a time when the chunkservers are
32+
# killed.
33+
sleep 0.3
34+
35+
saunafs_chunkserver_daemon 0 kill
36+
saunafs_chunkserver_daemon 1 kill
37+
echo "Chunkservers killed, now starting them again"
38+
39+
wait
40+
echo "All files written"
41+
42+
# Wait enough time for the chunks to have increased their version because of the lost copies,
43+
# which will prevent the chunks from having copies with wrong data.
44+
sleep 10
45+
echo "Starting chunkservers"
46+
47+
saunafs_chunkserver_daemon 0 start
48+
saunafs_chunkserver_daemon 1 start
49+
saunafs_wait_for_all_ready_chunkservers
50+
51+
# Wait for the chunkservers to fix the chunks, which may take some time because of the number of
52+
# missing parts and the fact that we have only 1 worker thread per chunkserver.
53+
while true; do
54+
current_chunk_ok=0
55+
for i in $(seq 1 ${number_of_files}); do
56+
if saunafs fileinfo dir/file_$i | tail -n1 | grep -E \
57+
'^[[:space:]]*copy 8:' > /dev/null; then
58+
current_chunk_ok=$((current_chunk_ok + 1))
59+
fi
60+
done
61+
62+
echo "Checked all files, ${current_chunk_ok} copies are OK"
63+
if (( current_chunk_ok == number_of_files )); then
64+
break
65+
fi
66+
67+
sleep 5
68+
done
69+
70+
saunafs_chunkserver_daemon 6 stop
71+
saunafs_chunkserver_daemon 7 stop
72+
73+
# Test if we can read files with only 6 copies, specially when reading from chunkservers 0 and
74+
# 1, which were killed and restarted.
75+
for i in $(seq 1 ${number_of_files}); do
76+
assert_success dd if="dir/file_$i" of=/dev/null bs=384K count=1 status=none
77+
cmp ${TEMP_DIR}/file_$i dir/file_$i || echo "File $i is different after reading back"
78+
done
79+
80+
saunafs_chunkserver_daemon 6 start
81+
saunafs_chunkserver_daemon 7 start
82+
saunafs_wait_for_all_ready_chunkservers
83+
84+
rm dir/file_*
85+
86+
# Ensure all CS report zero chunks before starting the next loop, to reuse the space
87+
while true; do
88+
cs_with_chunks=$(saunafs-admin list-chunkservers localhost "${info[matocl]}" | \
89+
awk '/chunks:/ && $2!=0 {print $0}' | wc -l)
90+
91+
echo "${cs_with_chunks} CSs report non-zero chunk counts"
92+
if (( cs_with_chunks == 0 )); then
93+
break
94+
fi
95+
96+
sleep 1
97+
done
98+
99+
echo "Test loop ${test_loop} completed"
100+
done

0 commit comments

Comments
 (0)