Skip to content

Commit f1d478b

Browse files
committed
support trigger gc for a specific pg
1 parent 596f40e commit f1d478b

File tree

3 files changed

+177
-120
lines changed

3 files changed

+177
-120
lines changed

src/lib/homestore_backend/gc_manager.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,20 @@ void GCManager::stop() {
176176
}
177177

178178
folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chunk_id_t chunk_id) {
179-
auto pdev_id = m_chunk_selector->get_extend_vchunk(chunk_id)->get_pdev_id();
179+
auto ex_vchunk = m_chunk_selector->get_extend_vchunk(chunk_id);
180+
if (ex_vchunk == nullptr) {
181+
LOGERRORMOD(gcmgr, "chunk {} not found when submit gc task!", chunk_id);
182+
return folly::makeFuture< bool >(false);
183+
}
184+
185+
// if the chunk has no garbage to be reclaimed, we don`t need to gc it , return true directly
186+
const auto defrag_blk_num = ex_vchunk->get_defrag_nblks();
187+
if (!defrag_blk_num) {
188+
LOGERRORMOD(gcmgr, "chunk {} has no garbage to be reclaimed, skip gc for this chunk!", chunk_id);
189+
return folly::makeFuture< bool >(true);
190+
}
191+
192+
auto pdev_id = ex_vchunk->get_pdev_id();
180193
auto it = m_pdev_gc_actors.find(pdev_id);
181194
if (it == m_pdev_gc_actors.end()) {
182195
LOGINFOMOD(gcmgr, "pdev gc actor not found for pdev_id={}, chunk={}", pdev_id, chunk_id);

src/lib/homestore_backend/hs_http_manager.cpp

Lines changed: 154 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) {
4949
Pistache::Rest::Routes::bind(&HttpManager::dump_chunk, this)},
5050
{Pistache::Http::Method::Get, "/api/v1/shard/dump",
5151
Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)},
52+
53+
// we support triggering gc for:
54+
// 1 all the chunks in all the pg: no input param
55+
// 2 all the chunks in a specific pg: input param is pg_id
56+
// 3 a specific chunk: input param is pchunk_id
57+
5258
{Pistache::Http::Method::Post, "/api/v1/trigger_gc",
5359
Pistache::Rest::Routes::bind(&HttpManager::trigger_gc, this)},
5460
{Pistache::Http::Method::Get, "/api/v1/gc_job_status",
@@ -244,8 +250,6 @@ void HttpManager::dump_shard(const Pistache::Rest::Request& request, Pistache::H
244250
}
245251

246252
void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
247-
const auto chunk_id_param = request.query().get("chunk_id");
248-
249253
auto gc_mgr = ho_.gc_manager();
250254
if (!gc_mgr) {
251255
response.send(Pistache::Http::Code::Internal_Server_Error, "GC manager not available");
@@ -258,101 +262,13 @@ void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::H
258262
return;
259263
}
260264

261-
std::string job_id = generate_job_id();
262-
nlohmann::json result;
263-
264-
// trigger gc for all chunks
265-
if (!chunk_id_param || chunk_id_param.value().empty()) {
266-
LOGINFO("Received trigger_gc request for all chunks, job_id={}", job_id);
267-
268-
auto job_info = std::make_shared< GCJobInfo >(job_id);
269-
{
270-
std::lock_guard< std::mutex > lock(gc_job_mutex_);
271-
gc_jobs_map_.set(job_id, job_info);
272-
}
273-
274-
result["job_id"] = job_id;
275-
result["message"] = "GC triggered for all eligible chunks, pls query job status using gc_job_status API";
276-
response.send(Pistache::Http::Code::Accepted, result.dump());
277-
278-
LOGINFO("GC job {} stopping GC scan timer", job_id);
279-
gc_mgr->stop_gc_scan_timer();
280-
281-
std::vector< pg_id_t > pg_ids;
282-
ho_.get_pg_ids(pg_ids);
283-
LOGINFO("GC job {} will process {} PGs", job_id, pg_ids.size());
284-
285-
std::vector< folly::SemiFuture< bool > > gc_task_futures;
286-
287-
for (const auto& pg_id : pg_ids) {
288-
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
289-
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
290-
291-
LOGINFO("GC job {} draining pending GC tasks for PG {}", job_id, pg_id);
292-
gc_mgr->drain_pg_pending_gc_task(pg_id);
293-
294-
auto pg_sb = hs_pg->pg_sb_.get();
295-
std::vector< homestore::chunk_num_t > pg_chunks(pg_sb->get_chunk_ids(),
296-
pg_sb->get_chunk_ids() + pg_sb->num_chunks);
297-
298-
LOGINFO("GC job {} processing PG {} with {} chunks", job_id, pg_id, pg_chunks.size());
299-
300-
// Resume accepting new requests for this pg
301-
hs_pg->repl_dev_->quiesce_reqs();
302-
303-
for (const auto& chunk_id : pg_chunks) {
304-
job_info->total_chunks++;
305-
// Determine priority based on chunk state (INUSE means has open shard)
306-
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
307-
RELEASE_ASSERT(chunk, "Chunk {} not found during GC job {}", chunk_id, job_id);
308-
auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;
309-
310-
// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
311-
if (priority == task_priority::emergent) { hs_pg->repl_dev_->clear_chunk_req(chunk_id); }
312-
313-
// Submit GC task for this chunk
314-
auto future = gc_mgr->submit_gc_task(priority, chunk_id);
315-
gc_task_futures.push_back(std::move(future));
316-
LOGDEBUG("GC job {} for chunk {} in PG {} with priority={}", job_id, chunk_id, pg_id,
317-
(priority == task_priority::emergent) ? "emergent" : "normal");
318-
}
319-
}
320-
321-
folly::collectAllUnsafe(gc_task_futures)
322-
.thenValue([job_info](auto&& results) {
323-
for (auto const& ok : results) {
324-
RELEASE_ASSERT(ok.hasValue(), "we never throw any exception when copying data");
325-
if (ok.value()) {
326-
job_info->success_count++;
327-
} else {
328-
job_info->failed_count++;
329-
}
330-
}
331-
})
332-
.thenValue([this, pg_ids, job_info, gc_mgr](auto&& rets) {
333-
LOGINFO("All GC tasks have been processed");
334-
const auto& job_id = job_info->job_id;
335-
for (const auto& pg_id : pg_ids) {
336-
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
337-
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
338-
// Resume accepting new requests for this pg
339-
hs_pg->repl_dev_->resume_accepting_reqs();
340-
LOGINFO("GC job {} resumed accepting requests for PG {}", job_id, pg_id);
341-
}
342-
343-
job_info->result = (job_info->failed_count == 0);
344-
job_info->status = job_info->result.value() ? GCJobStatus::COMPLETED : GCJobStatus::FAILED;
345-
LOGINFO("GC job {} completed: total={}, success={}, failed={}", job_id, job_info->total_chunks,
346-
job_info->success_count, job_info->failed_count);
265+
const auto chunk_id_param = request.query().get("chunk_id");
266+
const auto pg_id_param = request.query().get("pg_id");
347267

348-
// Restart the GC scan timer
349-
LOGINFO("GC job {} restarting GC scan timer", job_id);
350-
gc_mgr->start_gc_scan_timer();
351-
});
352-
} else {
353-
// trigger gc for specific chunk
268+
if (chunk_id_param && !chunk_id_param.value().empty()) {
269+
// trigger gc for a specific chunk
354270
uint32_t chunk_id = std::stoul(chunk_id_param.value());
355-
LOGINFO("Received trigger_gc request for chunk_id={}, job_id={}", chunk_id, job_id);
271+
LOGINFO("Received trigger_gc request for chunk_id {}", chunk_id);
356272

357273
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
358274
if (!chunk) {
@@ -372,28 +288,30 @@ void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::H
372288
}
373289

374290
const auto pg_id = chunk->m_pg_id.value();
375-
auto pdev_id = chunk->get_pdev_id();
291+
nlohmann::json result;
292+
const auto job_id = generate_job_id();
376293

377294
result["chunk_id"] = chunk_id;
378-
result["pdev_id"] = pdev_id;
379295
result["pg_id"] = pg_id;
380296
result["job_id"] = job_id;
381297

382298
if (chunk->m_state == ChunkState::GC) {
383-
result["message"] = "chunk is already under GC now";
384-
response.send(Pistache::Http::Code::Accepted, result.dump());
299+
result["message"] = "chunk is already under GC now, this task will not be executed!";
300+
response.send(Pistache::Http::Code::Ok, result.dump());
385301
return;
386302
}
303+
result["message"] = "GC triggered for chunk, pls query job status using gc_job_status API";
387304

388-
// Check for active job and create new job atomically under the same lock
389-
auto job_info = std::make_shared< GCJobInfo >(job_id, chunk_id, pdev_id);
305+
// return response before starting the GC so that we don't block the client.
306+
response.send(Pistache::Http::Code::Accepted, result.dump());
307+
308+
auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id, chunk_id);
390309
{
391-
std::lock_guard< std::mutex > lock(gc_job_mutex_);
310+
std::lock_guard lock(gc_job_mutex_);
392311
gc_jobs_map_.set(job_id, job_info);
393312
}
394313

395-
result["message"] = "GC triggered for chunk, pls query job status using gc_job_status API";
396-
response.send(Pistache::Http::Code::Accepted, result.dump());
314+
// sumbit gc task for this chunk
397315

398316
// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
399317
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
@@ -406,11 +324,73 @@ void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::H
406324
gc_mgr->submit_gc_task(priority, chunk_id)
407325
.via(&folly::InlineExecutor::instance())
408326
.thenValue([this, job_info, repl_dev](bool res) {
409-
job_info->result = res;
410327
job_info->status = res ? GCJobStatus::COMPLETED : GCJobStatus::FAILED;
411328
// Resume accepting new requests for this pg
412329
repl_dev->resume_accepting_reqs();
413330
});
331+
} else if (pg_id_param && !pg_id_param.value().empty()) {
332+
// trigger gc for all chunks in a specific pg
333+
const auto pg_id = std::stoul(pg_id_param.value());
334+
LOGINFO("Received trigger_gc request for pg_id {}", pg_id);
335+
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
336+
if (!hs_pg) {
337+
nlohmann::json error;
338+
error["pg_id"] = pg_id;
339+
error["error"] = "PG not found";
340+
response.send(Pistache::Http::Code::Not_Found, error.dump());
341+
return;
342+
}
343+
344+
nlohmann::json result;
345+
const auto job_id = generate_job_id();
346+
result["pg_id"] = pg_id;
347+
result["job_id"] = job_id;
348+
result["message"] = "GC triggered for a single pg, pls query job status using gc_job_status API";
349+
// return response before starting the GC so that we don't block the client.
350+
response.send(Pistache::Http::Code::Accepted, result.dump());
351+
352+
auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id);
353+
{
354+
std::lock_guard lock(gc_job_mutex_);
355+
gc_jobs_map_.set(job_id, job_info);
356+
}
357+
358+
LOGINFO("GC job {} stopping GC scan timer", job_id);
359+
gc_mgr->stop_gc_scan_timer();
360+
361+
// we block here until all gc tasks for the pg are done
362+
trigger_gc_for_pg(pg_id, job_id);
363+
364+
LOGINFO("GC job {} restarting GC scan timer", job_id);
365+
gc_mgr->start_gc_scan_timer();
366+
} else {
367+
LOGINFO("Received trigger_gc request for all chunks");
368+
nlohmann::json result;
369+
const auto job_id = generate_job_id();
370+
result["job_id"] = job_id;
371+
result["message"] = "GC triggered for all chunks, pls query job status using gc_job_status API";
372+
// return response before starting the GC so that we don't block the client.
373+
response.send(Pistache::Http::Code::Accepted, result.dump());
374+
375+
auto job_info = std::make_shared< GCJobInfo >(job_id);
376+
{
377+
std::lock_guard lock(gc_job_mutex_);
378+
gc_jobs_map_.set(job_id, job_info);
379+
}
380+
381+
std::vector< pg_id_t > pg_ids;
382+
ho_.get_pg_ids(pg_ids);
383+
LOGINFO("GC job {} will process {} PGs", job_id, pg_ids.size());
384+
LOGINFO("GC job {} stopping GC scan timer", job_id);
385+
gc_mgr->stop_gc_scan_timer();
386+
387+
// we block here until all gc tasks for the pg are done
388+
for (const auto& pg_id : pg_ids) {
389+
trigger_gc_for_pg(pg_id, job_id);
390+
}
391+
392+
LOGINFO("GC job {} restarting GC scan timer", job_id);
393+
gc_mgr->start_gc_scan_timer();
414394
}
415395
}
416396

@@ -429,7 +409,7 @@ void HttpManager::get_gc_job_status(const Pistache::Rest::Request& request, Pist
429409
std::string job_id = job_id_param.value();
430410
std::shared_ptr< GCJobInfo > job_info;
431411
{
432-
std::lock_guard< std::mutex > lock(gc_job_mutex_);
412+
std::shared_lock lock(gc_job_mutex_);
433413
job_info = gc_jobs_map_.get(job_id);
434414
}
435415

@@ -454,10 +434,7 @@ void HttpManager::get_gc_job_status(const Pistache::Rest::Request& request, Pist
454434
break;
455435
}
456436

457-
if (job_info->chunk_id.has_value()) {
458-
result["chunk_id"] = job_info->chunk_id.value();
459-
if (job_info->pdev_id.has_value()) { result["pdev_id"] = job_info->pdev_id.value(); }
460-
}
437+
if (job_info->chunk_id.has_value()) { result["chunk_id"] = job_info->chunk_id.value(); }
461438

462439
if (job_info->total_chunks > 0) {
463440
nlohmann::json stats;
@@ -467,11 +444,76 @@ void HttpManager::get_gc_job_status(const Pistache::Rest::Request& request, Pist
467444
result["statistics"] = stats;
468445
}
469446

470-
if (job_info->result.has_value()) { result["result"] = job_info->result.value(); }
471-
472447
response.send(Pistache::Http::Code::Ok, result.dump());
473448
}
474449

450+
void HttpManager::trigger_gc_for_pg(uint16_t pg_id, const std::string& job_id) {
451+
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
452+
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
453+
454+
LOGINFO("GC job {} draining pending GC tasks for PG {}", job_id, pg_id);
455+
auto gc_mgr = ho_.gc_manager();
456+
gc_mgr->drain_pg_pending_gc_task(pg_id);
457+
auto pg_sb = hs_pg->pg_sb_.get();
458+
std::vector< homestore::chunk_num_t > pg_chunks(pg_sb->get_chunk_ids(), pg_sb->get_chunk_ids() + pg_sb->num_chunks);
459+
460+
LOGINFO("GC job {} processing PG {} with {} chunks", job_id, pg_id, pg_chunks.size());
461+
hs_pg->repl_dev_->quiesce_reqs();
462+
std::vector< folly::SemiFuture< bool > > gc_task_futures;
463+
464+
std::shared_ptr< GCJobInfo > job_info;
465+
{
466+
std::shared_lock lock(gc_job_mutex_);
467+
job_info = gc_jobs_map_.get(job_id);
468+
}
469+
470+
auto chunk_selector = ho_.chunk_selector();
471+
472+
for (const auto& chunk_id : pg_chunks) {
473+
job_info->total_chunks++;
474+
// Determine priority based on chunk state (INUSE means has open shard)
475+
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
476+
RELEASE_ASSERT(chunk, "Chunk {} not found during GC job {}", chunk_id, job_id);
477+
auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;
478+
479+
// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
480+
if (priority == task_priority::emergent) { hs_pg->repl_dev_->clear_chunk_req(chunk_id); }
481+
482+
// Submit GC task for this chunk
483+
auto future = gc_mgr->submit_gc_task(priority, chunk_id);
484+
gc_task_futures.push_back(std::move(future));
485+
LOGDEBUG("GC job {} for chunk {} in PG {} with priority={}", job_id, chunk_id, pg_id,
486+
(priority == task_priority::emergent) ? "emergent" : "normal");
487+
}
488+
489+
folly::collectAllUnsafe(gc_task_futures)
490+
.thenValue([job_info](auto&& results) {
491+
for (auto const& ok : results) {
492+
RELEASE_ASSERT(ok.hasValue(), "we never throw any exception when copying data");
493+
if (ok.value()) {
494+
job_info->success_count++;
495+
} else {
496+
job_info->failed_count++;
497+
}
498+
}
499+
})
500+
.thenValue([this, pg_id, job_info, gc_mgr](auto&& rets) {
501+
LOGINFO("All GC tasks have been processed");
502+
const auto& job_id = job_info->job_id;
503+
504+
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
505+
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
506+
// Resume accepting new requests for this pg
507+
hs_pg->repl_dev_->resume_accepting_reqs();
508+
LOGINFO("GC job {} resumed accepting requests for PG {}", job_id, pg_id);
509+
510+
job_info->status = job_info->failed_count ? GCJobStatus::FAILED : GCJobStatus::COMPLETED;
511+
LOGINFO("GC job {} completed: total={}, success={}, failed={}", job_id, job_info->total_chunks,
512+
job_info->success_count, job_info->failed_count);
513+
})
514+
.get();
515+
}
516+
475517
#ifdef _PRERELEASE
476518
void HttpManager::crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
477519
std::string crash_type;

0 commit comments

Comments
 (0)