@@ -64,7 +64,17 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) {
6464 {Pistache::Http::Method::Get, " /api/v1/chunk/dump" ,
6565 Pistache::Rest::Routes::bind (&HttpManager::dump_chunk, this )},
6666 {Pistache::Http::Method::Get, " /api/v1/shard/dump" ,
67- Pistache::Rest::Routes::bind (&HttpManager::dump_shard, this )}};
67+ Pistache::Rest::Routes::bind (&HttpManager::dump_shard, this )},
68+
69+ // we support triggering gc for:
70+ // 1 all the chunks in all the pg: no input param
71+ // 2 all the chunks in a specific pg: input param is pg_id
72+ // 3 a specific chunk: input param is pchunk_id
73+
74+ {Pistache::Http::Method::Post, " /api/v1/trigger_gc" ,
75+ Pistache::Rest::Routes::bind (&HttpManager::trigger_gc, this )},
76+ {Pistache::Http::Method::Get, " /api/v1/gc_job_status" ,
77+ Pistache::Rest::Routes::bind (&HttpManager::get_gc_job_status, this )}};
6878
6979 auto http_server = ioenvironment.get_http_server ();
7080 if (!http_server) {
@@ -476,6 +486,290 @@ void HttpManager::exit_pg(const Pistache::Rest::Request& request, Pistache::Http
476486 response.send (Pistache::Http::Code::Ok, " Exit pg request submitted" );
477487}
478488
489+ void HttpManager::trigger_gc (const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
490+ auto gc_mgr = ho_.gc_manager ();
491+ if (!gc_mgr) {
492+ response.send (Pistache::Http::Code::Internal_Server_Error, " GC manager not available" );
493+ return ;
494+ }
495+
496+ auto chunk_selector = ho_.chunk_selector ();
497+ if (!chunk_selector) {
498+ response.send (Pistache::Http::Code::Internal_Server_Error, " Chunk selector not available" );
499+ return ;
500+ }
501+
502+ const auto chunk_id_param = request.query ().get (" chunk_id" );
503+ const auto pg_id_param = request.query ().get (" pg_id" );
504+
505+ if (chunk_id_param && !chunk_id_param.value ().empty ()) {
506+ // trigger gc for a specific chunk, the chunk_id is pchunk_id, not vchunk_id
507+ uint32_t chunk_id = std::stoul (chunk_id_param.value ());
508+ LOGINFO (" Received trigger_gc request for chunk_id {}" , chunk_id);
509+
510+ auto chunk = chunk_selector->get_extend_vchunk (chunk_id);
511+ if (!chunk) {
512+ nlohmann::json error;
513+ error[" chunk_id" ] = chunk_id;
514+ error[" error" ] = " Chunk not found" ;
515+ response.send (Pistache::Http::Code::Not_Found, error.dump ());
516+ return ;
517+ }
518+
519+ if (!chunk->m_pg_id .has_value ()) {
520+ nlohmann::json error;
521+ error[" chunk_id" ] = chunk_id;
522+ error[" error" ] = " Chunk belongs to no pg" ;
523+ response.send (Pistache::Http::Code::Not_Found, error.dump ());
524+ return ;
525+ }
526+
527+ const auto pg_id = chunk->m_pg_id .value ();
528+ nlohmann::json result;
529+ const auto job_id = generate_job_id ();
530+
531+ result[" chunk_id" ] = chunk_id;
532+ result[" pg_id" ] = pg_id;
533+ result[" job_id" ] = job_id;
534+
535+ if (chunk->m_state == ChunkState::GC) {
536+ result[" message" ] = " chunk is already under GC now, this task will not be executed!" ;
537+ response.send (Pistache::Http::Code::Ok, result.dump ());
538+ return ;
539+ }
540+ result[" message" ] = " GC triggered for chunk, pls query job status using gc_job_status API" ;
541+
542+ // return response before starting the GC so that we don't block the client.
543+ response.send (Pistache::Http::Code::Accepted, result.dump ());
544+
545+ auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id, chunk_id);
546+ {
547+ std::lock_guard lock (gc_job_mutex_);
548+ gc_jobs_map_.set (job_id, job_info);
549+ }
550+
551+ // sumbit gc task for this chunk
552+
553+ // Clear in-memory requests only for emergent priority chunks (chunks with open shards)
554+ auto hs_pg = const_cast < HSHomeObject::HS_PG* >(ho_.get_hs_pg (pg_id));
555+ RELEASE_ASSERT (hs_pg, " HS PG {} not found during GC job {}" , pg_id, job_id);
556+ auto repl_dev = hs_pg->repl_dev_ ;
557+ repl_dev->quiesce_reqs ();
558+ repl_dev->clear_chunk_req (chunk_id);
559+ const auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;
560+
561+ gc_mgr->submit_gc_task (priority, chunk_id)
562+ .via (&folly::InlineExecutor::instance ())
563+ .thenValue ([this , job_info, repl_dev](bool res) {
564+ job_info->status = res ? GCJobStatus::COMPLETED : GCJobStatus::FAILED;
565+ // Resume accepting new requests for this pg
566+ repl_dev->resume_accepting_reqs ();
567+ });
568+ } else if (pg_id_param && !pg_id_param.value ().empty ()) {
569+ // trigger gc for all chunks in a specific pg
570+ const auto pg_id = std::stoul (pg_id_param.value ());
571+ LOGINFO (" Received trigger_gc request for pg_id {}" , pg_id);
572+ auto hs_pg = const_cast < HSHomeObject::HS_PG* >(ho_.get_hs_pg (pg_id));
573+ if (!hs_pg) {
574+ nlohmann::json error;
575+ error[" pg_id" ] = pg_id;
576+ error[" error" ] = " PG not found" ;
577+ response.send (Pistache::Http::Code::Not_Found, error.dump ());
578+ return ;
579+ }
580+
581+ nlohmann::json result;
582+ const auto job_id = generate_job_id ();
583+ result[" pg_id" ] = pg_id;
584+ result[" job_id" ] = job_id;
585+ result[" message" ] = " GC triggered for a single pg, pls query job status using gc_job_status API" ;
586+ // return response before starting the GC so that we don't block the client.
587+ response.send (Pistache::Http::Code::Accepted, result.dump ());
588+
589+ auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id);
590+ {
591+ std::lock_guard lock (gc_job_mutex_);
592+ gc_jobs_map_.set (job_id, job_info);
593+ }
594+
595+ LOGINFO (" GC job {} stopping GC scan timer" , job_id);
596+ gc_mgr->stop_gc_scan_timer ();
597+
598+ // we block here until all gc tasks for the pg are done
599+ trigger_gc_for_pg (pg_id, job_id);
600+
601+ LOGINFO (" GC job {} restarting GC scan timer" , job_id);
602+ gc_mgr->start_gc_scan_timer ();
603+ } else {
604+ LOGINFO (" Received trigger_gc request for all chunks" );
605+ nlohmann::json result;
606+ const auto job_id = generate_job_id ();
607+ result[" job_id" ] = job_id;
608+ result[" message" ] = " GC triggered for all chunks, pls query job status using gc_job_status API" ;
609+ // return response before starting the GC so that we don't block the client.
610+ response.send (Pistache::Http::Code::Accepted, result.dump ());
611+
612+ auto job_info = std::make_shared< GCJobInfo >(job_id);
613+ {
614+ std::lock_guard lock (gc_job_mutex_);
615+ gc_jobs_map_.set (job_id, job_info);
616+ }
617+
618+ std::vector< pg_id_t > pg_ids;
619+ ho_.get_pg_ids (pg_ids);
620+ LOGINFO (" GC job {} will process {} PGs" , job_id, pg_ids.size ());
621+ LOGINFO (" GC job {} stopping GC scan timer" , job_id);
622+ gc_mgr->stop_gc_scan_timer ();
623+
624+ // we block here until all gc tasks for the pg are done
625+ for (const auto & pg_id : pg_ids) {
626+ trigger_gc_for_pg (pg_id, job_id);
627+ }
628+
629+ LOGINFO (" GC job {} restarting GC scan timer" , job_id);
630+ gc_mgr->start_gc_scan_timer ();
631+ }
632+ }
633+
634+ std::string HttpManager::generate_job_id () {
635+ auto counter = job_counter_.fetch_add (1 , std::memory_order_relaxed);
636+ return fmt::format (" trigger-gc-task-{}" , counter);
637+ }
638+
639+ void HttpManager::get_job_status (const std::string& job_id, nlohmann::json& result) {
640+ result[" job_id" ] = job_id;
641+ std::shared_ptr< GCJobInfo > job_info;
642+ {
643+ std::shared_lock lock (gc_job_mutex_);
644+ job_info = gc_jobs_map_.get (job_id);
645+ }
646+
647+ if (!job_info) {
648+ result[" error" ] = " job_id not found, or job has been evicted" ;
649+ return ;
650+ }
651+
652+ switch (job_info->status ) {
653+ case GCJobStatus::RUNNING:
654+ result[" status" ] = " running" ;
655+ break ;
656+ case GCJobStatus::COMPLETED:
657+ result[" status" ] = " completed" ;
658+ break ;
659+ case GCJobStatus::FAILED:
660+ result[" status" ] = " failed" ;
661+ break ;
662+ }
663+
664+ if (job_info->chunk_id .has_value ()) { result[" chunk_id" ] = job_info->chunk_id .value (); }
665+ if (job_info->pg_id .has_value ()) { result[" pg_id" ] = job_info->pg_id .value (); }
666+
667+ if (job_info->total_chunks > 0 ) {
668+ nlohmann::json stats;
669+ stats[" total_chunks" ] = job_info->total_chunks ;
670+ stats[" success_count" ] = job_info->success_count ;
671+ stats[" failed_count" ] = job_info->failed_count ;
672+ result[" statistics" ] = stats;
673+ }
674+ }
675+
676+ void HttpManager::get_gc_job_status (const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
677+ auto job_id_param = request.query ().get (" job_id" );
678+ if (job_id_param && !job_id_param.value ().empty ()) {
679+ const auto job_id = job_id_param.value ();
680+ LOGINFO (" query job {} status!" , job_id);
681+ nlohmann::json result;
682+ get_job_status (job_id, result);
683+ response.send (Pistache::Http::Code::Ok, result.dump ());
684+ return ;
685+ }
686+
687+ LOGINFO (" query all job status!" );
688+ nlohmann::json result;
689+ std::vector< std::string > job_ids;
690+ {
691+ std::shared_lock lock (gc_job_mutex_);
692+ for (const auto & [k, v] : gc_jobs_map_) {
693+ job_ids.push_back (k);
694+ }
695+ }
696+
697+ for (const auto & job_id : job_ids) {
698+ nlohmann::json job_json;
699+ get_job_status (job_id, job_json);
700+ result[" jobs" ].push_back (job_json);
701+ }
702+
703+ response.send (Pistache::Http::Code::Ok, result.dump ());
704+ }
705+
706+ void HttpManager::trigger_gc_for_pg (uint16_t pg_id, const std::string& job_id) {
707+ auto hs_pg = const_cast < HSHomeObject::HS_PG* >(ho_.get_hs_pg (pg_id));
708+ RELEASE_ASSERT (hs_pg, " HS PG {} not found during GC job {}" , pg_id, job_id);
709+
710+ LOGINFO (" GC job {} draining pending GC tasks for PG {}" , job_id, pg_id);
711+ auto gc_mgr = ho_.gc_manager ();
712+ gc_mgr->drain_pg_pending_gc_task (pg_id);
713+ auto pg_sb = hs_pg->pg_sb_ .get ();
714+ std::vector< homestore::chunk_num_t > pg_chunks (pg_sb->get_chunk_ids (), pg_sb->get_chunk_ids () + pg_sb->num_chunks );
715+
716+ LOGINFO (" GC job {} processing PG {} with {} chunks" , job_id, pg_id, pg_chunks.size ());
717+ hs_pg->repl_dev_ ->quiesce_reqs ();
718+ std::vector< folly::SemiFuture< bool > > gc_task_futures;
719+
720+ std::shared_ptr< GCJobInfo > job_info;
721+ {
722+ std::shared_lock lock (gc_job_mutex_);
723+ job_info = gc_jobs_map_.get (job_id);
724+ }
725+
726+ auto chunk_selector = ho_.chunk_selector ();
727+
728+ for (const auto & chunk_id : pg_chunks) {
729+ job_info->total_chunks ++;
730+ // Determine priority based on chunk state (INUSE means has open shard)
731+ auto chunk = chunk_selector->get_extend_vchunk (chunk_id);
732+ RELEASE_ASSERT (chunk, " Chunk {} not found during GC job {}" , chunk_id, job_id);
733+ auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;
734+
735+ // Clear in-memory requests only for emergent priority chunks (chunks with open shards)
736+ if (priority == task_priority::emergent) { hs_pg->repl_dev_ ->clear_chunk_req (chunk_id); }
737+
738+ // Submit GC task for this chunk
739+ auto future = gc_mgr->submit_gc_task (priority, chunk_id);
740+ gc_task_futures.push_back (std::move (future));
741+ LOGDEBUG (" GC job {} for chunk {} in PG {} with priority={}" , job_id, chunk_id, pg_id,
742+ (priority == task_priority::emergent) ? " emergent" : " normal" );
743+ }
744+
745+ folly::collectAllUnsafe (gc_task_futures)
746+ .thenValue ([job_info](auto && results) {
747+ for (auto const & ok : results) {
748+ RELEASE_ASSERT (ok.hasValue (), " we never throw any exception when copying data" );
749+ if (ok.value ()) {
750+ job_info->success_count ++;
751+ } else {
752+ job_info->failed_count ++;
753+ }
754+ }
755+ })
756+ .thenValue ([this , pg_id, job_info, gc_mgr](auto && rets) {
757+ LOGINFO (" All GC tasks have been processed" );
758+ const auto & job_id = job_info->job_id ;
759+
760+ auto hs_pg = const_cast < HSHomeObject::HS_PG* >(ho_.get_hs_pg (pg_id));
761+ RELEASE_ASSERT (hs_pg, " HS PG {} not found during GC job {}" , pg_id, job_id);
762+ // Resume accepting new requests for this pg
763+ hs_pg->repl_dev_ ->resume_accepting_reqs ();
764+ LOGINFO (" GC job {} resumed accepting requests for PG {}" , job_id, pg_id);
765+
766+ job_info->status = job_info->failed_count ? GCJobStatus::FAILED : GCJobStatus::COMPLETED;
767+ LOGINFO (" GC job {} completed: total={}, success={}, failed={}" , job_id, job_info->total_chunks ,
768+ job_info->success_count , job_info->failed_count );
769+ })
770+ .get ();
771+ }
772+
479773#ifdef _PRERELEASE
480774void HttpManager::crash_system (const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
481775 std::string crash_type;
0 commit comments