Skip to content

Commit 5ae1b9c

Browse files
add trigger gc api (#379)
* add trigger gc api * support trigger gc for a specific pg
1 parent 0814e0b commit 5ae1b9c

File tree

7 files changed

+378
-29
lines changed

7 files changed

+378
-29
lines changed

src/lib/homestore_backend/gc_manager.cpp

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ void GCManager::start() {
123123
gc_actor->start();
124124
LOGINFOMOD(gcmgr, "start gc actor for pdev={}", pdev_id);
125125
}
126+
start_gc_scan_timer();
127+
}
126128

129+
void GCManager::start_gc_scan_timer() {
127130
const auto gc_scan_interval_sec = HS_BACKEND_DYNAMIC_CONFIG(gc_scan_interval_sec);
128131

129132
// the initial idea here is that we want gc timer to run in a reactor that not shared with other fibers that
@@ -147,9 +150,7 @@ void GCManager::start() {
147150
LOGINFOMOD(gcmgr, "gc scheduler timer has started, interval is set to {} seconds", gc_scan_interval_sec);
148151
}
149152

150-
bool GCManager::is_started() { return m_gc_timer_hdl != iomgr::null_timer_handle; }
151-
152-
void GCManager::stop() {
153+
void GCManager::stop_gc_scan_timer() {
153154
if (m_gc_timer_hdl == iomgr::null_timer_handle) {
154155
LOGWARNMOD(gcmgr, "gc scheduler timer is not running, no need to stop it");
155156
return;
@@ -163,6 +164,10 @@ void GCManager::stop() {
163164
m_gc_timer_hdl = iomgr::null_timer_handle;
164165
});
165166
m_gc_timer_fiber = nullptr;
167+
}
168+
169+
void GCManager::stop() {
170+
stop_gc_scan_timer();
166171

167172
for (const auto& [pdev_id, gc_actor] : m_pdev_gc_actors) {
168173
gc_actor->stop();
@@ -171,9 +176,20 @@ void GCManager::stop() {
171176
}
172177

173178
folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chunk_id_t chunk_id) {
174-
if (!is_started()) return folly::makeFuture< bool >(false);
179+
auto ex_vchunk = m_chunk_selector->get_extend_vchunk(chunk_id);
180+
if (ex_vchunk == nullptr) {
181+
LOGERRORMOD(gcmgr, "chunk {} not found when submit gc task!", chunk_id);
182+
return folly::makeFuture< bool >(false);
183+
}
184+
185+
// if the chunk has no garbage to be reclaimed, we don`t need to gc it , return true directly
186+
const auto defrag_blk_num = ex_vchunk->get_defrag_nblks();
187+
if (!defrag_blk_num && task_priority::normal == priority) {
188+
LOGERRORMOD(gcmgr, "chunk {} has no garbage to be reclaimed, skip gc for this chunk!", chunk_id);
189+
return folly::makeFuture< bool >(true);
190+
}
175191

176-
auto pdev_id = m_chunk_selector->get_extend_vchunk(chunk_id)->get_pdev_id();
192+
auto pdev_id = ex_vchunk->get_pdev_id();
177193
auto it = m_pdev_gc_actors.find(pdev_id);
178194
if (it == m_pdev_gc_actors.end()) {
179195
LOGINFOMOD(gcmgr, "pdev gc actor not found for pdev_id={}, chunk={}", pdev_id, chunk_id);
@@ -337,6 +353,11 @@ void GCManager::pdev_gc_actor::add_reserved_chunk(
337353
}
338354

339355
folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority, chunk_id_t move_from_chunk) {
356+
if (m_is_stopped.load()) {
357+
LOGWARNMOD(gcmgr, "pdev gc actor for pdev_id={} is not started yet or already stopped, cannot add gc task!",
358+
m_pdev_id);
359+
return folly::makeSemiFuture< bool >(false);
360+
}
340361
auto EXvchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk);
341362
// it does not belong to any pg, so we don't need to gc it.
342363
if (!EXvchunk->m_pg_id.has_value()) {

src/lib/homestore_backend/gc_manager.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,11 @@ class GCManager {
307307

308308
void start();
309309
void stop();
310-
bool is_started();
310+
311+
// the following two functions should not be called concurrently. if we need to call them concurrently, we need to
312+
// add lock to protect
313+
void start_gc_scan_timer();
314+
void stop_gc_scan_timer();
311315

312316
void scan_chunks_for_gc();
313317
void drain_pg_pending_gc_task(const pg_id_t pg_id);

src/lib/homestore_backend/hs_http_manager.cpp

Lines changed: 295 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,17 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) {
5050
{Pistache::Http::Method::Get, "/api/v1/chunk/dump",
5151
Pistache::Rest::Routes::bind(&HttpManager::dump_chunk, this)},
5252
{Pistache::Http::Method::Get, "/api/v1/shard/dump",
53-
Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)}};
53+
Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)},
54+
55+
// we support triggering gc for:
56+
// 1 all the chunks in all the pg: no input param
57+
// 2 all the chunks in a specific pg: input param is pg_id
58+
// 3 a specific chunk: input param is pchunk_id
59+
60+
{Pistache::Http::Method::Post, "/api/v1/trigger_gc",
61+
Pistache::Rest::Routes::bind(&HttpManager::trigger_gc, this)},
62+
{Pistache::Http::Method::Get, "/api/v1/gc_job_status",
63+
Pistache::Rest::Routes::bind(&HttpManager::get_gc_job_status, this)}};
5464

5565
auto http_server = ioenvironment.get_http_server();
5666
if (!http_server) {
@@ -271,6 +281,290 @@ void HttpManager::dump_shard(const Pistache::Rest::Request& request, Pistache::H
271281
response.send(Pistache::Http::Code::Ok, j.dump());
272282
}
273283

284+
void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
285+
auto gc_mgr = ho_.gc_manager();
286+
if (!gc_mgr) {
287+
response.send(Pistache::Http::Code::Internal_Server_Error, "GC manager not available");
288+
return;
289+
}
290+
291+
auto chunk_selector = ho_.chunk_selector();
292+
if (!chunk_selector) {
293+
response.send(Pistache::Http::Code::Internal_Server_Error, "Chunk selector not available");
294+
return;
295+
}
296+
297+
const auto chunk_id_param = request.query().get("chunk_id");
298+
const auto pg_id_param = request.query().get("pg_id");
299+
300+
if (chunk_id_param && !chunk_id_param.value().empty()) {
301+
// trigger gc for a specific chunk, the chunk_id is pchunk_id, not vchunk_id
302+
uint32_t chunk_id = std::stoul(chunk_id_param.value());
303+
LOGINFO("Received trigger_gc request for chunk_id {}", chunk_id);
304+
305+
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
306+
if (!chunk) {
307+
nlohmann::json error;
308+
error["chunk_id"] = chunk_id;
309+
error["error"] = "Chunk not found";
310+
response.send(Pistache::Http::Code::Not_Found, error.dump());
311+
return;
312+
}
313+
314+
if (!chunk->m_pg_id.has_value()) {
315+
nlohmann::json error;
316+
error["chunk_id"] = chunk_id;
317+
error["error"] = "Chunk belongs to no pg";
318+
response.send(Pistache::Http::Code::Not_Found, error.dump());
319+
return;
320+
}
321+
322+
const auto pg_id = chunk->m_pg_id.value();
323+
nlohmann::json result;
324+
const auto job_id = generate_job_id();
325+
326+
result["chunk_id"] = chunk_id;
327+
result["pg_id"] = pg_id;
328+
result["job_id"] = job_id;
329+
330+
if (chunk->m_state == ChunkState::GC) {
331+
result["message"] = "chunk is already under GC now, this task will not be executed!";
332+
response.send(Pistache::Http::Code::Ok, result.dump());
333+
return;
334+
}
335+
result["message"] = "GC triggered for chunk, pls query job status using gc_job_status API";
336+
337+
// return response before starting the GC so that we don't block the client.
338+
response.send(Pistache::Http::Code::Accepted, result.dump());
339+
340+
auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id, chunk_id);
341+
{
342+
std::lock_guard lock(gc_job_mutex_);
343+
gc_jobs_map_.set(job_id, job_info);
344+
}
345+
346+
// sumbit gc task for this chunk
347+
348+
// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
349+
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
350+
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
351+
auto repl_dev = hs_pg->repl_dev_;
352+
repl_dev->quiesce_reqs();
353+
repl_dev->clear_chunk_req(chunk_id);
354+
const auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;
355+
356+
gc_mgr->submit_gc_task(priority, chunk_id)
357+
.via(&folly::InlineExecutor::instance())
358+
.thenValue([this, job_info, repl_dev](bool res) {
359+
job_info->status = res ? GCJobStatus::COMPLETED : GCJobStatus::FAILED;
360+
// Resume accepting new requests for this pg
361+
repl_dev->resume_accepting_reqs();
362+
});
363+
} else if (pg_id_param && !pg_id_param.value().empty()) {
364+
// trigger gc for all chunks in a specific pg
365+
const auto pg_id = std::stoul(pg_id_param.value());
366+
LOGINFO("Received trigger_gc request for pg_id {}", pg_id);
367+
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
368+
if (!hs_pg) {
369+
nlohmann::json error;
370+
error["pg_id"] = pg_id;
371+
error["error"] = "PG not found";
372+
response.send(Pistache::Http::Code::Not_Found, error.dump());
373+
return;
374+
}
375+
376+
nlohmann::json result;
377+
const auto job_id = generate_job_id();
378+
result["pg_id"] = pg_id;
379+
result["job_id"] = job_id;
380+
result["message"] = "GC triggered for a single pg, pls query job status using gc_job_status API";
381+
// return response before starting the GC so that we don't block the client.
382+
response.send(Pistache::Http::Code::Accepted, result.dump());
383+
384+
auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id);
385+
{
386+
std::lock_guard lock(gc_job_mutex_);
387+
gc_jobs_map_.set(job_id, job_info);
388+
}
389+
390+
LOGINFO("GC job {} stopping GC scan timer", job_id);
391+
gc_mgr->stop_gc_scan_timer();
392+
393+
// we block here until all gc tasks for the pg are done
394+
trigger_gc_for_pg(pg_id, job_id);
395+
396+
LOGINFO("GC job {} restarting GC scan timer", job_id);
397+
gc_mgr->start_gc_scan_timer();
398+
} else {
399+
LOGINFO("Received trigger_gc request for all chunks");
400+
nlohmann::json result;
401+
const auto job_id = generate_job_id();
402+
result["job_id"] = job_id;
403+
result["message"] = "GC triggered for all chunks, pls query job status using gc_job_status API";
404+
// return response before starting the GC so that we don't block the client.
405+
response.send(Pistache::Http::Code::Accepted, result.dump());
406+
407+
auto job_info = std::make_shared< GCJobInfo >(job_id);
408+
{
409+
std::lock_guard lock(gc_job_mutex_);
410+
gc_jobs_map_.set(job_id, job_info);
411+
}
412+
413+
std::vector< pg_id_t > pg_ids;
414+
ho_.get_pg_ids(pg_ids);
415+
LOGINFO("GC job {} will process {} PGs", job_id, pg_ids.size());
416+
LOGINFO("GC job {} stopping GC scan timer", job_id);
417+
gc_mgr->stop_gc_scan_timer();
418+
419+
// we block here until all gc tasks for the pg are done
420+
for (const auto& pg_id : pg_ids) {
421+
trigger_gc_for_pg(pg_id, job_id);
422+
}
423+
424+
LOGINFO("GC job {} restarting GC scan timer", job_id);
425+
gc_mgr->start_gc_scan_timer();
426+
}
427+
}
428+
429+
std::string HttpManager::generate_job_id() {
430+
auto counter = job_counter_.fetch_add(1, std::memory_order_relaxed);
431+
return fmt::format("trigger-gc-task-{}", counter);
432+
}
433+
434+
void HttpManager::get_job_status(const std::string& job_id, nlohmann::json& result) {
435+
result["job_id"] = job_id;
436+
std::shared_ptr< GCJobInfo > job_info;
437+
{
438+
std::shared_lock lock(gc_job_mutex_);
439+
job_info = gc_jobs_map_.get(job_id);
440+
}
441+
442+
if (!job_info) {
443+
result["error"] = "job_id not found, or job has been evicted";
444+
return;
445+
}
446+
447+
switch (job_info->status) {
448+
case GCJobStatus::RUNNING:
449+
result["status"] = "running";
450+
break;
451+
case GCJobStatus::COMPLETED:
452+
result["status"] = "completed";
453+
break;
454+
case GCJobStatus::FAILED:
455+
result["status"] = "failed";
456+
break;
457+
}
458+
459+
if (job_info->chunk_id.has_value()) { result["chunk_id"] = job_info->chunk_id.value(); }
460+
if (job_info->pg_id.has_value()) { result["pg_id"] = job_info->pg_id.value(); }
461+
462+
if (job_info->total_chunks > 0) {
463+
nlohmann::json stats;
464+
stats["total_chunks"] = job_info->total_chunks;
465+
stats["success_count"] = job_info->success_count;
466+
stats["failed_count"] = job_info->failed_count;
467+
result["statistics"] = stats;
468+
}
469+
}
470+
471+
void HttpManager::get_gc_job_status(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
472+
auto job_id_param = request.query().get("job_id");
473+
if (job_id_param && !job_id_param.value().empty()) {
474+
const auto job_id = job_id_param.value();
475+
LOGINFO("query job {} status!", job_id);
476+
nlohmann::json result;
477+
get_job_status(job_id, result);
478+
response.send(Pistache::Http::Code::Ok, result.dump());
479+
return;
480+
}
481+
482+
LOGINFO("query all job status!");
483+
nlohmann::json result;
484+
std::vector< std::string > job_ids;
485+
{
486+
std::shared_lock lock(gc_job_mutex_);
487+
for (const auto& [k, v] : gc_jobs_map_) {
488+
job_ids.push_back(k);
489+
}
490+
}
491+
492+
for (const auto& job_id : job_ids) {
493+
nlohmann::json job_json;
494+
get_job_status(job_id, job_json);
495+
result["jobs"].push_back(job_json);
496+
}
497+
498+
response.send(Pistache::Http::Code::Ok, result.dump());
499+
}
500+
501+
void HttpManager::trigger_gc_for_pg(uint16_t pg_id, const std::string& job_id) {
502+
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
503+
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
504+
505+
LOGINFO("GC job {} draining pending GC tasks for PG {}", job_id, pg_id);
506+
auto gc_mgr = ho_.gc_manager();
507+
gc_mgr->drain_pg_pending_gc_task(pg_id);
508+
auto pg_sb = hs_pg->pg_sb_.get();
509+
std::vector< homestore::chunk_num_t > pg_chunks(pg_sb->get_chunk_ids(), pg_sb->get_chunk_ids() + pg_sb->num_chunks);
510+
511+
LOGINFO("GC job {} processing PG {} with {} chunks", job_id, pg_id, pg_chunks.size());
512+
hs_pg->repl_dev_->quiesce_reqs();
513+
std::vector< folly::SemiFuture< bool > > gc_task_futures;
514+
515+
std::shared_ptr< GCJobInfo > job_info;
516+
{
517+
std::shared_lock lock(gc_job_mutex_);
518+
job_info = gc_jobs_map_.get(job_id);
519+
}
520+
521+
auto chunk_selector = ho_.chunk_selector();
522+
523+
for (const auto& chunk_id : pg_chunks) {
524+
job_info->total_chunks++;
525+
// Determine priority based on chunk state (INUSE means has open shard)
526+
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
527+
RELEASE_ASSERT(chunk, "Chunk {} not found during GC job {}", chunk_id, job_id);
528+
auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;
529+
530+
// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
531+
if (priority == task_priority::emergent) { hs_pg->repl_dev_->clear_chunk_req(chunk_id); }
532+
533+
// Submit GC task for this chunk
534+
auto future = gc_mgr->submit_gc_task(priority, chunk_id);
535+
gc_task_futures.push_back(std::move(future));
536+
LOGDEBUG("GC job {} for chunk {} in PG {} with priority={}", job_id, chunk_id, pg_id,
537+
(priority == task_priority::emergent) ? "emergent" : "normal");
538+
}
539+
540+
folly::collectAllUnsafe(gc_task_futures)
541+
.thenValue([job_info](auto&& results) {
542+
for (auto const& ok : results) {
543+
RELEASE_ASSERT(ok.hasValue(), "we never throw any exception when copying data");
544+
if (ok.value()) {
545+
job_info->success_count++;
546+
} else {
547+
job_info->failed_count++;
548+
}
549+
}
550+
})
551+
.thenValue([this, pg_id, job_info, gc_mgr](auto&& rets) {
552+
LOGINFO("All GC tasks have been processed");
553+
const auto& job_id = job_info->job_id;
554+
555+
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
556+
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
557+
// Resume accepting new requests for this pg
558+
hs_pg->repl_dev_->resume_accepting_reqs();
559+
LOGINFO("GC job {} resumed accepting requests for PG {}", job_id, pg_id);
560+
561+
job_info->status = job_info->failed_count ? GCJobStatus::FAILED : GCJobStatus::COMPLETED;
562+
LOGINFO("GC job {} completed: total={}, success={}, failed={}", job_id, job_info->total_chunks,
563+
job_info->success_count, job_info->failed_count);
564+
})
565+
.get();
566+
}
567+
274568
#ifdef _PRERELEASE
275569
void HttpManager::crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
276570
std::string crash_type;

0 commit comments

Comments
 (0)