diff --git a/src/lib/homestore_backend/gc_manager.cpp b/src/lib/homestore_backend/gc_manager.cpp index 30c86199..5f4224f9 100644 --- a/src/lib/homestore_backend/gc_manager.cpp +++ b/src/lib/homestore_backend/gc_manager.cpp @@ -123,7 +123,10 @@ void GCManager::start() { gc_actor->start(); LOGINFOMOD(gcmgr, "start gc actor for pdev={}", pdev_id); } + start_gc_scan_timer(); +} +void GCManager::start_gc_scan_timer() { const auto gc_scan_interval_sec = HS_BACKEND_DYNAMIC_CONFIG(gc_scan_interval_sec); // the initial idea here is that we want gc timer to run in a reactor that not shared with other fibers that @@ -147,9 +150,7 @@ void GCManager::start() { LOGINFOMOD(gcmgr, "gc scheduler timer has started, interval is set to {} seconds", gc_scan_interval_sec); } -bool GCManager::is_started() { return m_gc_timer_hdl != iomgr::null_timer_handle; } - -void GCManager::stop() { +void GCManager::stop_gc_scan_timer() { if (m_gc_timer_hdl == iomgr::null_timer_handle) { LOGWARNMOD(gcmgr, "gc scheduler timer is not running, no need to stop it"); return; @@ -163,6 +164,10 @@ void GCManager::stop() { m_gc_timer_hdl = iomgr::null_timer_handle; }); m_gc_timer_fiber = nullptr; +} + +void GCManager::stop() { + stop_gc_scan_timer(); for (const auto& [pdev_id, gc_actor] : m_pdev_gc_actors) { gc_actor->stop(); @@ -171,9 +176,20 @@ void GCManager::stop() { } folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chunk_id_t chunk_id) { - if (!is_started()) return folly::makeFuture< bool >(false); + auto ex_vchunk = m_chunk_selector->get_extend_vchunk(chunk_id); + if (ex_vchunk == nullptr) { + LOGERRORMOD(gcmgr, "chunk {} not found when submit gc task!", chunk_id); + return folly::makeFuture< bool >(false); + } + + // if the chunk has no garbage to be reclaimed, we don`t need to gc it , return true directly + const auto defrag_blk_num = ex_vchunk->get_defrag_nblks(); + if (!defrag_blk_num && task_priority::normal == priority) { + LOGERRORMOD(gcmgr, "chunk {} has no garbage to be reclaimed, skip gc for this chunk!", chunk_id); + return folly::makeFuture< bool >(true); + } - auto pdev_id = m_chunk_selector->get_extend_vchunk(chunk_id)->get_pdev_id(); + auto pdev_id = ex_vchunk->get_pdev_id(); auto it = m_pdev_gc_actors.find(pdev_id); if (it == m_pdev_gc_actors.end()) { LOGINFOMOD(gcmgr, "pdev gc actor not found for pdev_id={}, chunk={}", pdev_id, chunk_id); @@ -337,6 +353,11 @@ void GCManager::pdev_gc_actor::add_reserved_chunk( } folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority, chunk_id_t move_from_chunk) { + if (m_is_stopped.load()) { + LOGWARNMOD(gcmgr, "pdev gc actor for pdev_id={} is not started yet or already stopped, cannot add gc task!", + m_pdev_id); + return folly::makeSemiFuture< bool >(false); + } auto EXvchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk); // it does not belong to any pg, so we don't need to gc it. if (!EXvchunk->m_pg_id.has_value()) { diff --git a/src/lib/homestore_backend/gc_manager.hpp b/src/lib/homestore_backend/gc_manager.hpp index 06ee0876..b2781e97 100644 --- a/src/lib/homestore_backend/gc_manager.hpp +++ b/src/lib/homestore_backend/gc_manager.hpp @@ -307,7 +307,11 @@ class GCManager { void start(); void stop(); - bool is_started(); + + // the following two functions should not be called concurrently. if we need to call them concurrently, we need to + // add lock to protect + void start_gc_scan_timer(); + void stop_gc_scan_timer(); void scan_chunks_for_gc(); void drain_pg_pending_gc_task(const pg_id_t pg_id); diff --git a/src/lib/homestore_backend/hs_http_manager.cpp b/src/lib/homestore_backend/hs_http_manager.cpp index 75b15b83..99a17f26 100644 --- a/src/lib/homestore_backend/hs_http_manager.cpp +++ b/src/lib/homestore_backend/hs_http_manager.cpp @@ -48,7 +48,17 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) { {Pistache::Http::Method::Get, "/api/v1/chunk/dump", Pistache::Rest::Routes::bind(&HttpManager::dump_chunk, this)}, {Pistache::Http::Method::Get, "/api/v1/shard/dump", - Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)}}; + Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)}, + + // we support triggering gc for: + // 1 all the chunks in all the pg: no input param + // 2 all the chunks in a specific pg: input param is pg_id + // 3 a specific chunk: input param is pchunk_id + + {Pistache::Http::Method::Post, "/api/v1/trigger_gc", + Pistache::Rest::Routes::bind(&HttpManager::trigger_gc, this)}, + {Pistache::Http::Method::Get, "/api/v1/gc_job_status", + Pistache::Rest::Routes::bind(&HttpManager::get_gc_job_status, this)}}; auto http_server = ioenvironment.get_http_server(); if (!http_server) { @@ -239,6 +249,290 @@ void HttpManager::dump_shard(const Pistache::Rest::Request& request, Pistache::H response.send(Pistache::Http::Code::Ok, j.dump()); } +void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + auto gc_mgr = ho_.gc_manager(); + if (!gc_mgr) { + response.send(Pistache::Http::Code::Internal_Server_Error, "GC manager not available"); + return; + } + + auto chunk_selector = ho_.chunk_selector(); + if (!chunk_selector) { + response.send(Pistache::Http::Code::Internal_Server_Error, "Chunk selector not available"); + return; + } + + const auto chunk_id_param = request.query().get("chunk_id"); + const auto pg_id_param = request.query().get("pg_id"); + + if (chunk_id_param && !chunk_id_param.value().empty()) { + // trigger gc for a specific chunk, the chunk_id is pchunk_id, not vchunk_id + uint32_t chunk_id = std::stoul(chunk_id_param.value()); + LOGINFO("Received trigger_gc request for chunk_id {}", chunk_id); + + auto chunk = chunk_selector->get_extend_vchunk(chunk_id); + if (!chunk) { + nlohmann::json error; + error["chunk_id"] = chunk_id; + error["error"] = "Chunk not found"; + response.send(Pistache::Http::Code::Not_Found, error.dump()); + return; + } + + if (!chunk->m_pg_id.has_value()) { + nlohmann::json error; + error["chunk_id"] = chunk_id; + error["error"] = "Chunk belongs to no pg"; + response.send(Pistache::Http::Code::Not_Found, error.dump()); + return; + } + + const auto pg_id = chunk->m_pg_id.value(); + nlohmann::json result; + const auto job_id = generate_job_id(); + + result["chunk_id"] = chunk_id; + result["pg_id"] = pg_id; + result["job_id"] = job_id; + + if (chunk->m_state == ChunkState::GC) { + result["message"] = "chunk is already under GC now, this task will not be executed!"; + response.send(Pistache::Http::Code::Ok, result.dump()); + return; + } + result["message"] = "GC triggered for chunk, pls query job status using gc_job_status API"; + + // return response before starting the GC so that we don't block the client. + response.send(Pistache::Http::Code::Accepted, result.dump()); + + auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id, chunk_id); + { + std::lock_guard lock(gc_job_mutex_); + gc_jobs_map_.set(job_id, job_info); + } + + // sumbit gc task for this chunk + + // Clear in-memory requests only for emergent priority chunks (chunks with open shards) + auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id)); + RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id); + auto repl_dev = hs_pg->repl_dev_; + repl_dev->quiesce_reqs(); + repl_dev->clear_chunk_req(chunk_id); + const auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal; + + gc_mgr->submit_gc_task(priority, chunk_id) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, job_info, repl_dev](bool res) { + job_info->status = res ? GCJobStatus::COMPLETED : GCJobStatus::FAILED; + // Resume accepting new requests for this pg + repl_dev->resume_accepting_reqs(); + }); + } else if (pg_id_param && !pg_id_param.value().empty()) { + // trigger gc for all chunks in a specific pg + const auto pg_id = std::stoul(pg_id_param.value()); + LOGINFO("Received trigger_gc request for pg_id {}", pg_id); + auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id)); + if (!hs_pg) { + nlohmann::json error; + error["pg_id"] = pg_id; + error["error"] = "PG not found"; + response.send(Pistache::Http::Code::Not_Found, error.dump()); + return; + } + + nlohmann::json result; + const auto job_id = generate_job_id(); + result["pg_id"] = pg_id; + result["job_id"] = job_id; + result["message"] = "GC triggered for a single pg, pls query job status using gc_job_status API"; + // return response before starting the GC so that we don't block the client. + response.send(Pistache::Http::Code::Accepted, result.dump()); + + auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id); + { + std::lock_guard lock(gc_job_mutex_); + gc_jobs_map_.set(job_id, job_info); + } + + LOGINFO("GC job {} stopping GC scan timer", job_id); + gc_mgr->stop_gc_scan_timer(); + + // we block here until all gc tasks for the pg are done + trigger_gc_for_pg(pg_id, job_id); + + LOGINFO("GC job {} restarting GC scan timer", job_id); + gc_mgr->start_gc_scan_timer(); + } else { + LOGINFO("Received trigger_gc request for all chunks"); + nlohmann::json result; + const auto job_id = generate_job_id(); + result["job_id"] = job_id; + result["message"] = "GC triggered for all chunks, pls query job status using gc_job_status API"; + // return response before starting the GC so that we don't block the client. + response.send(Pistache::Http::Code::Accepted, result.dump()); + + auto job_info = std::make_shared< GCJobInfo >(job_id); + { + std::lock_guard lock(gc_job_mutex_); + gc_jobs_map_.set(job_id, job_info); + } + + std::vector< pg_id_t > pg_ids; + ho_.get_pg_ids(pg_ids); + LOGINFO("GC job {} will process {} PGs", job_id, pg_ids.size()); + LOGINFO("GC job {} stopping GC scan timer", job_id); + gc_mgr->stop_gc_scan_timer(); + + // we block here until all gc tasks for the pg are done + for (const auto& pg_id : pg_ids) { + trigger_gc_for_pg(pg_id, job_id); + } + + LOGINFO("GC job {} restarting GC scan timer", job_id); + gc_mgr->start_gc_scan_timer(); + } +} + +std::string HttpManager::generate_job_id() { + auto counter = job_counter_.fetch_add(1, std::memory_order_relaxed); + return fmt::format("trigger-gc-task-{}", counter); +} + +void HttpManager::get_job_status(const std::string& job_id, nlohmann::json& result) { + result["job_id"] = job_id; + std::shared_ptr< GCJobInfo > job_info; + { + std::shared_lock lock(gc_job_mutex_); + job_info = gc_jobs_map_.get(job_id); + } + + if (!job_info) { + result["error"] = "job_id not found, or job has been evicted"; + return; + } + + switch (job_info->status) { + case GCJobStatus::RUNNING: + result["status"] = "running"; + break; + case GCJobStatus::COMPLETED: + result["status"] = "completed"; + break; + case GCJobStatus::FAILED: + result["status"] = "failed"; + break; + } + + if (job_info->chunk_id.has_value()) { result["chunk_id"] = job_info->chunk_id.value(); } + if (job_info->pg_id.has_value()) { result["pg_id"] = job_info->pg_id.value(); } + + if (job_info->total_chunks > 0) { + nlohmann::json stats; + stats["total_chunks"] = job_info->total_chunks; + stats["success_count"] = job_info->success_count; + stats["failed_count"] = job_info->failed_count; + result["statistics"] = stats; + } +} + +void HttpManager::get_gc_job_status(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + auto job_id_param = request.query().get("job_id"); + if (job_id_param && !job_id_param.value().empty()) { + const auto job_id = job_id_param.value(); + LOGINFO("query job {} status!", job_id); + nlohmann::json result; + get_job_status(job_id, result); + response.send(Pistache::Http::Code::Ok, result.dump()); + return; + } + + LOGINFO("query all job status!"); + nlohmann::json result; + std::vector< std::string > job_ids; + { + std::shared_lock lock(gc_job_mutex_); + for (const auto& [k, v] : gc_jobs_map_) { + job_ids.push_back(k); + } + } + + for (const auto& job_id : job_ids) { + nlohmann::json job_json; + get_job_status(job_id, job_json); + result["jobs"].push_back(job_json); + } + + response.send(Pistache::Http::Code::Ok, result.dump()); +} + +void HttpManager::trigger_gc_for_pg(uint16_t pg_id, const std::string& job_id) { + auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id)); + RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id); + + LOGINFO("GC job {} draining pending GC tasks for PG {}", job_id, pg_id); + auto gc_mgr = ho_.gc_manager(); + gc_mgr->drain_pg_pending_gc_task(pg_id); + auto pg_sb = hs_pg->pg_sb_.get(); + std::vector< homestore::chunk_num_t > pg_chunks(pg_sb->get_chunk_ids(), pg_sb->get_chunk_ids() + pg_sb->num_chunks); + + LOGINFO("GC job {} processing PG {} with {} chunks", job_id, pg_id, pg_chunks.size()); + hs_pg->repl_dev_->quiesce_reqs(); + std::vector< folly::SemiFuture< bool > > gc_task_futures; + + std::shared_ptr< GCJobInfo > job_info; + { + std::shared_lock lock(gc_job_mutex_); + job_info = gc_jobs_map_.get(job_id); + } + + auto chunk_selector = ho_.chunk_selector(); + + for (const auto& chunk_id : pg_chunks) { + job_info->total_chunks++; + // Determine priority based on chunk state (INUSE means has open shard) + auto chunk = chunk_selector->get_extend_vchunk(chunk_id); + RELEASE_ASSERT(chunk, "Chunk {} not found during GC job {}", chunk_id, job_id); + auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal; + + // Clear in-memory requests only for emergent priority chunks (chunks with open shards) + if (priority == task_priority::emergent) { hs_pg->repl_dev_->clear_chunk_req(chunk_id); } + + // Submit GC task for this chunk + auto future = gc_mgr->submit_gc_task(priority, chunk_id); + gc_task_futures.push_back(std::move(future)); + LOGDEBUG("GC job {} for chunk {} in PG {} with priority={}", job_id, chunk_id, pg_id, + (priority == task_priority::emergent) ? "emergent" : "normal"); + } + + folly::collectAllUnsafe(gc_task_futures) + .thenValue([job_info](auto&& results) { + for (auto const& ok : results) { + RELEASE_ASSERT(ok.hasValue(), "we never throw any exception when copying data"); + if (ok.value()) { + job_info->success_count++; + } else { + job_info->failed_count++; + } + } + }) + .thenValue([this, pg_id, job_info, gc_mgr](auto&& rets) { + LOGINFO("All GC tasks have been processed"); + const auto& job_id = job_info->job_id; + + auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id)); + RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id); + // Resume accepting new requests for this pg + hs_pg->repl_dev_->resume_accepting_reqs(); + LOGINFO("GC job {} resumed accepting requests for PG {}", job_id, pg_id); + + job_info->status = job_info->failed_count ? GCJobStatus::FAILED : GCJobStatus::COMPLETED; + LOGINFO("GC job {} completed: total={}, success={}, failed={}", job_id, job_info->total_chunks, + job_info->success_count, job_info->failed_count); + }) + .get(); +} + #ifdef _PRERELEASE void HttpManager::crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { std::string crash_type; diff --git a/src/lib/homestore_backend/hs_http_manager.hpp b/src/lib/homestore_backend/hs_http_manager.hpp index 62d45f1a..0f24a309 100644 --- a/src/lib/homestore_backend/hs_http_manager.hpp +++ b/src/lib/homestore_backend/hs_http_manager.hpp @@ -16,6 +16,11 @@ #include #include +#include +#include +#include +#include + namespace homeobject { class HSHomeObject; @@ -33,12 +38,43 @@ class HttpManager { void dump_chunk(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void dump_shard(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); void get_shard(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void get_gc_job_status(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void trigger_gc_for_pg(uint16_t pg_id, const std::string& job_id); + void get_job_status(const std::string& job_id, nlohmann::json& result); #ifdef _PRERELEASE void crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); #endif +private: + enum class GCJobStatus { RUNNING, COMPLETED, FAILED }; + + struct GCJobInfo { + std::string job_id; + GCJobStatus status; + std::optional< uint16_t > pg_id; + std::optional< uint32_t > chunk_id; + + // Statistics for batch GC jobs (all chunks) + uint32_t total_chunks{0}; + uint32_t success_count{0}; + uint32_t failed_count{0}; + + GCJobInfo(const std::string& id, std::optional< uint16_t > pgid = std::nullopt, + std::optional< uint32_t > cid = std::nullopt) : + job_id(id), status(GCJobStatus::RUNNING), pg_id(pgid), chunk_id(cid) {} + }; + + std::string generate_job_id(); + private: HSHomeObject& ho_; + std::atomic< uint64_t > job_counter_{0}; + std::shared_mutex gc_job_mutex_; + + // we don`t have an external DB to store the job status, so we only keep the status of the lastest 100 jobs for + // query. or, we can evict the job after it is completed after a timeout period. + folly::EvictingCacheMap< std::string, std::shared_ptr< GCJobInfo > > gc_jobs_map_{100}; }; } // namespace homeobject \ No newline at end of file diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 94f0f5e2..4110d058 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -970,27 +970,20 @@ void ReplicationStateMachine::handle_no_space_left(homestore::repl_lsn_t lsn, ho // 3 handling this error. for homeobject, we will submit an emergent gc task and wait for the completion. auto gc_mgr = home_object_->gc_manager(); - if (gc_mgr->is_started()) { - // FIXME:: there is a very corner case that when reaching this line, gc_mgr is stopped. fix this later. - gc_mgr->submit_gc_task(task_priority::emergent, chunk_id) - .via(&folly::InlineExecutor::instance()) - .thenValue([this, lsn, chunk_id](auto&& res) { - if (!res) { - LOGERROR("failed to submit emergent gc task for chunk_id={} , lsn={}, will retry again if new " - "no_space_left happens", - chunk_id, lsn); - } else { - LOGD("successfully handle no_space_left error for chunk_id={} , lsn={}", chunk_id, lsn); - } + gc_mgr->submit_gc_task(task_priority::emergent, chunk_id) + .via(&folly::InlineExecutor::instance()) + .thenValue([this, lsn, chunk_id](auto&& res) { + if (!res) { + LOGERROR("failed to submit emergent gc task for chunk_id={} , lsn={}, will retry again if new " + "no_space_left happens", + chunk_id, lsn); + } else { + LOGD("successfully handle no_space_left error for chunk_id={} , lsn={}", chunk_id, lsn); + } - // start accepting new requests again. - repl_dev()->resume_accepting_reqs(); - }); - } else { - // start accepting new requests again. - LOGD("gc manager is not started, skip handle no_space_left for chunk={}, lsn={}", chunk_id, lsn); - repl_dev()->resume_accepting_reqs(); - } + // start accepting new requests again. + repl_dev()->resume_accepting_reqs(); + }); } void ReplicationStateMachine::on_log_replay_done(const homestore::group_id_t& group_id) { diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index f029385b..95ff54ae 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -46,8 +46,10 @@ class HomeObjectFixture : public ::testing::Test { s.io_env.http_port = 5000 + g_helper->replica_num(); LOGD("setup http port to {}", s.io_env.http_port); }); + HSHomeObject::_hs_chunk_size = SISL_OPTIONS["chunk_size"].as< uint64_t >() * Mi; _obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->build_new_homeobject()); + // Used to export metrics, it should be called after init_homeobject if (SISL_OPTIONS["enable_http"].as< bool >()) { g_helper->app->start_http_server(); } if (!g_helper->is_current_testcase_restarted()) { diff --git a/src/lib/homestore_backend/tests/hs_gc_tests.cpp b/src/lib/homestore_backend/tests/hs_gc_tests.cpp index 49827c3e..736c5d69 100644 --- a/src/lib/homestore_backend/tests/hs_gc_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_gc_tests.cpp @@ -31,8 +31,7 @@ TEST_F(HomeObjectFixture, BasicGC) { for (const auto& [pg_id, chunk_num] : pg_chunk_nums) { for (uint64_t j = 0; j < chunk_num; j++) { auto shard_seq = i * chunk_num + j + 1; - auto derived_shard_id = - make_new_shard_id(pg_id, shard_seq); // shard id start from 1 + auto derived_shard_id = make_new_shard_id(pg_id, shard_seq); // shard id start from 1 auto shard = create_shard(pg_id, 64 * Mi, "shard meta:" + std::to_string(derived_shard_id)); LOGINFO("create shard pg={} shard {} in chunk {}", pg_id, shard.id, j); ASSERT_EQ(derived_shard_id, shard.id);