Skip to content

Commit 63bbc23

Browse files
committed
fix: Make a threadpool for trash-remover threads.
Add a ProducerConsumerQueue and ThreadPool with up to 5 runners, for executing the per disk deletion jobs. Signed-off-by: GigaCronos <jorge.cabrera@leil.io>
1 parent f69b3ee commit 63bbc23

File tree

6 files changed

+111
-10
lines changed

6 files changed

+111
-10
lines changed

src/chunkserver/chunkserver-common/chunk_trash_manager.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,24 @@ int ChunkTrashManager::init(const std::string &diskPath) {
8484
return impl->init(diskPath);
8585
}
8686

87+
void ChunkTrashManager::terminate(){
88+
if (!isEnabled) { return; }
89+
90+
// Protect against concurrent access
91+
ImplementationPtr impl;
92+
{
93+
std::lock_guard<std::mutex> lock(implMutex);
94+
impl = getImpl();
95+
}
96+
if (!impl) {
97+
safs::log_error_code(SAUNAFS_ERROR_EINVAL,
98+
"ChunkTrashManager implementation not initialized");
99+
return;
100+
}
101+
impl->terminate();
102+
103+
}
104+
87105
void ChunkTrashManager::collectGarbage() {
88106
if (!isEnabled) { return; }
89107

src/chunkserver/chunkserver-common/chunk_trash_manager.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class IChunkTrashManagerImpl {
3434

3535
virtual int init(const std::string &) = 0;
3636

37+
virtual void terminate() = 0;
38+
3739
virtual void collectGarbage() = 0;
3840

3941
virtual void reloadConfig() = 0;
@@ -73,6 +75,11 @@ class ChunkTrashManager {
7375
*/
7476
static int init(const std::string &diskPath);
7577

78+
/**
79+
* @brief Join the background threads.
80+
*/
81+
static void terminate();
82+
7683
/**
7784
* @brief Moves a file to the trash directory.
7885
*

src/chunkserver/chunkserver-common/chunk_trash_manager_impl.cc

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020

2121
#include <sys/statvfs.h>
2222
#include <algorithm>
23+
#include <atomic>
2324
#include <cstdint>
2425
#include <ctime>
26+
#include <utility>
2527

2628
#include "chunkserver-common/chunk_trash_manager_impl.h"
2729
#include "config/cfg.h"
2830
#include "errors/saunafs_error_codes.h"
2931
#include "global_shared_resources.h"
3032
#include "hdd_stats.h"
33+
#include "hdd_utils.h"
3134
#include "slogger/slogger.h"
3235

3336
namespace fs = std::filesystem;
@@ -47,6 +50,12 @@ uint64_t ChunkTrashManagerImpl::previousBytesWritePerDisk = 1024 * 1024; //1MiB
4750
const std::string ChunkTrashManagerImpl::kTrashGuardString =
4851
std::string("/") + ChunkTrashManager::kTrashDirname + "/";
4952

53+
std::vector<std::thread> ChunkTrashManagerImpl::removeFromTrashThreads{};
54+
std::unique_ptr<ProducerConsumerQueue> ChunkTrashManagerImpl::removeFromTrashJobQueue =
55+
std::make_unique<ProducerConsumerQueue>();
56+
57+
std::atomic<uint32_t> ChunkTrashManagerImpl::NotIdleThreadCount{0};
58+
5059
void ChunkTrashManagerImpl::reloadConfig() {
5160
availableThresholdGB =
5261
cfg_get("CHUNK_TRASH_FREE_SPACE_THRESHOLD_GB", kDefaultAvailableThresholdGB);
@@ -167,19 +176,50 @@ int ChunkTrashManagerImpl::moveToTrash(const fs::path &filePath, const fs::path
167176

168177
void ChunkTrashManagerImpl::removeTrashFiles(
169178
const ChunkTrashIndex::TrashIndexDiskEntries &filesToRemove) const {
170-
std::vector<std::jthread> parallelRemovers;
171-
172179
for (const auto &[diskPath, fileEntries] : filesToRemove) {
173-
parallelRemovers.emplace_back(removeTrashFilesFromDisk, fileEntries, diskPath);
180+
removeFromTrashJobQueue->put(
181+
0, 1,
182+
reinterpret_cast<uint8_t *>(
183+
new std::pair<ChunkTrashIndex::TrashIndexFileEntries, std::string>(fileEntries,
184+
diskPath)),
185+
1);
186+
}
187+
188+
while(NotIdleThreadCount && !removeFromTrashJobQueue->isEmpty()){
189+
174190
}
175191
}
176192

177-
void ChunkTrashManagerImpl::removeTrashFilesFromDisk(
178-
const ChunkTrashIndex::TrashIndexFileEntries &filesToRemove, const std::string &diskPath) {
179-
for (const auto &fileEntry : filesToRemove) {
180-
if (removeFileFromTrash(fileEntry.second) != SAUNAFS_STATUS_OK) { continue; }
193+
void ChunkTrashManagerImpl::removeTrashFilesFromDiskThread(uint8_t workerId) {
194+
std::string threadName ="removeTrashFilesFromDisk_worker_" + std::to_string(workerId);
195+
pthread_setname_np(pthread_self(), threadName.c_str());
196+
197+
uint32_t jobId;
198+
uint32_t operation;
199+
uint8_t *jobPtrArg;
200+
201+
while (true) {
202+
removeFromTrashJobQueue->get(&jobId, &operation, &jobPtrArg, nullptr);
203+
204+
if(operation == 0){
205+
break;
206+
}
207+
208+
NotIdleThreadCount ++;
209+
auto tempTuple = reinterpret_cast<std::pair<ChunkTrashIndex::TrashIndexFileEntries, std::string> *>(jobPtrArg);
210+
211+
ChunkTrashIndex::TrashIndexFileEntries filesToRemove = tempTuple->first;
212+
std::string diskPath = tempTuple->second;
213+
214+
for (const auto &fileEntry : filesToRemove) {
215+
if (removeFileFromTrash(fileEntry.second) != SAUNAFS_STATUS_OK) { continue; }
181216
HddStats::gStatsOperationsGCPurge++;
182-
getTrashIndex().remove(fileEntry.first, fileEntry.second, diskPath);
217+
getTrashIndex().remove(fileEntry.first, fileEntry.second, diskPath);
218+
}
219+
220+
delete tempTuple;
221+
222+
NotIdleThreadCount --;
183223
}
184224
}
185225

@@ -225,9 +265,30 @@ int ChunkTrashManagerImpl::init(const std::string &diskPath) {
225265
}
226266
}
227267

268+
if (ChunkTrashManagerImpl::removeFromTrashThreads.size() < 5) {
269+
ChunkTrashManagerImpl::removeFromTrashThreads.emplace_back(
270+
&ChunkTrashManagerImpl::removeTrashFilesFromDiskThread,
271+
uint8_t(ChunkTrashManagerImpl::removeFromTrashThreads.size()));
272+
}
273+
228274
return SAUNAFS_STATUS_OK;
229275
}
230276

277+
void ChunkTrashManagerImpl::terminate() {
278+
for(uint8_t i=0; i < ChunkTrashManagerImpl::removeFromTrashThreads.size(); i++){
279+
ChunkTrashManagerImpl::removeFromTrashJobQueue->put(0, 0, nullptr, 1);
280+
}
281+
282+
283+
for (auto &thread : ChunkTrashManagerImpl::removeFromTrashThreads) {
284+
if (thread.joinable()) { thread.join(); }
285+
}
286+
287+
ChunkTrashManagerImpl::removeFromTrashThreads.clear();
288+
289+
}
290+
291+
231292
bool ChunkTrashManagerImpl::isValidTimestampFormat(const std::string &timestamp) {
232293
return timestamp.size() == kTimeStampLength && std::ranges::all_of(timestamp, ::isdigit);
233294
}

src/chunkserver/chunkserver-common/chunk_trash_manager_impl.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@
2020

2121
#include "common/platform.h"
2222

23+
#include <cstdint>
2324
#include <string>
25+
#include <thread>
26+
2427

2528
#include "chunkserver-common/chunk_trash_index.h"
2629
#include "chunkserver-common/chunk_trash_manager.h"
30+
#include "common/pcqueue.h"
2731
#include "errors/saunafs_error_codes.h"
2832

2933
class ChunkTrashManagerImpl : public IChunkTrashManagerImpl {
@@ -49,6 +53,9 @@ class ChunkTrashManagerImpl : public IChunkTrashManagerImpl {
4953
*/
5054
int init(const std::string &diskPath) override;
5155

56+
57+
void terminate() override;
58+
5259
/**
5360
* @brief Moves a specified file to the trash directory.
5461
* @param filePath The path of the file to be moved.
@@ -85,8 +92,7 @@ class ChunkTrashManagerImpl : public IChunkTrashManagerImpl {
8592
* @param filesToRemove The list of files to be permanently deleted.
8693
* @param diskPath The path of the disk of the files.
8794
*/
88-
static void removeTrashFilesFromDisk(
89-
const ChunkTrashIndex::TrashIndexFileEntries &filesToRemove, const std::string &diskPath);
95+
static void removeTrashFilesFromDiskThread(uint8_t workerId);
9096

9197
/**
9298
* @brief Checks if a given timestamp string matches the expected format.
@@ -172,6 +178,10 @@ class ChunkTrashManagerImpl : public IChunkTrashManagerImpl {
172178
static size_t garbageCollectorSpaceRecoveryStep;
173179
static constexpr size_t kDefaultGarbageCollectorSpaceRecoveryStep = 100;
174180

181+
static std::vector<std::thread> removeFromTrashThreads; /// Vector of worker threads.
182+
static std::unique_ptr<ProducerConsumerQueue> removeFromTrashJobQueue; /// Queue for jobs.
183+
184+
static std::atomic<uint32_t> NotIdleThreadCount;
175185
// Use a function to safely get the ChunkTrashIndex instance
176186
// This prevents issues during program shutdown
177187
/**

src/chunkserver/chunkserver-common/chunk_trash_manager_unittest.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class MockChunkTrashManagerImpl : public IChunkTrashManagerImpl {
2929
(const std::filesystem::path &, const std::filesystem::path &, const std::time_t &),
3030
());
3131
MOCK_METHOD(int, init, (const std::string &), ());
32+
MOCK_METHOD(void, terminate, (), ());
3233
MOCK_METHOD(void, collectGarbage, (), ());
3334
MOCK_METHOD(void, reloadConfig, (), ());
3435
};

src/chunkserver/hddspacemgr.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2541,6 +2541,10 @@ void hddFreeResourcesThread() {
25412541
releaseOldIoBuffers(kOldIoBuffersExpirationTimeMs);
25422542
usleep(timeout.remaining_us());
25432543
}
2544+
2545+
2546+
ChunkTrashManager::terminate();
2547+
25442548
}
25452549

25462550
void hddTerminate(void) {

0 commit comments

Comments
 (0)