Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ void GCManager::start() {
gc_actor->start();
LOGINFOMOD(gcmgr, "start gc actor for pdev={}", pdev_id);
}
start_gc_scan_timer();
}

void GCManager::start_gc_scan_timer() {
const auto gc_scan_interval_sec = HS_BACKEND_DYNAMIC_CONFIG(gc_scan_interval_sec);

// the initial idea here is that we want gc timer to run in a reactor that not shared with other fibers that
Expand All @@ -147,9 +150,7 @@ void GCManager::start() {
LOGINFOMOD(gcmgr, "gc scheduler timer has started, interval is set to {} seconds", gc_scan_interval_sec);
}

bool GCManager::is_started() { return m_gc_timer_hdl != iomgr::null_timer_handle; }

void GCManager::stop() {
void GCManager::stop_gc_scan_timer() {
if (m_gc_timer_hdl == iomgr::null_timer_handle) {
LOGWARNMOD(gcmgr, "gc scheduler timer is not running, no need to stop it");
return;
Expand All @@ -163,6 +164,10 @@ void GCManager::stop() {
m_gc_timer_hdl = iomgr::null_timer_handle;
});
m_gc_timer_fiber = nullptr;
}

void GCManager::stop() {
stop_gc_scan_timer();

for (const auto& [pdev_id, gc_actor] : m_pdev_gc_actors) {
gc_actor->stop();
Expand All @@ -171,9 +176,20 @@ void GCManager::stop() {
}

folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chunk_id_t chunk_id) {
if (!is_started()) return folly::makeFuture< bool >(false);
auto ex_vchunk = m_chunk_selector->get_extend_vchunk(chunk_id);
if (ex_vchunk == nullptr) {
LOGERRORMOD(gcmgr, "chunk {} not found when submit gc task!", chunk_id);
return folly::makeFuture< bool >(false);
}

// if the chunk has no garbage to be reclaimed, we don`t need to gc it , return true directly
const auto defrag_blk_num = ex_vchunk->get_defrag_nblks();
if (!defrag_blk_num && task_priority::normal == priority) {
LOGERRORMOD(gcmgr, "chunk {} has no garbage to be reclaimed, skip gc for this chunk!", chunk_id);
return folly::makeFuture< bool >(true);
}

auto pdev_id = m_chunk_selector->get_extend_vchunk(chunk_id)->get_pdev_id();
auto pdev_id = ex_vchunk->get_pdev_id();
auto it = m_pdev_gc_actors.find(pdev_id);
if (it == m_pdev_gc_actors.end()) {
LOGINFOMOD(gcmgr, "pdev gc actor not found for pdev_id={}, chunk={}", pdev_id, chunk_id);
Expand Down Expand Up @@ -337,6 +353,11 @@ void GCManager::pdev_gc_actor::add_reserved_chunk(
}

folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority, chunk_id_t move_from_chunk) {
if (m_is_stopped.load()) {
LOGWARNMOD(gcmgr, "pdev gc actor for pdev_id={} is not started yet or already stopped, cannot add gc task!",
m_pdev_id);
return folly::makeSemiFuture< bool >(false);
}
auto EXvchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk);
// it does not belong to any pg, so we don't need to gc it.
if (!EXvchunk->m_pg_id.has_value()) {
Expand Down
6 changes: 5 additions & 1 deletion src/lib/homestore_backend/gc_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,11 @@ class GCManager {

void start();
void stop();
bool is_started();

// the following two functions should not be called concurrently. if we need to call them concurrently, we need to
// add lock to protect
void start_gc_scan_timer();
void stop_gc_scan_timer();

void scan_chunks_for_gc();
void drain_pg_pending_gc_task(const pg_id_t pg_id);
Expand Down
296 changes: 295 additions & 1 deletion src/lib/homestore_backend/hs_http_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) {
{Pistache::Http::Method::Get, "/api/v1/chunk/dump",
Pistache::Rest::Routes::bind(&HttpManager::dump_chunk, this)},
{Pistache::Http::Method::Get, "/api/v1/shard/dump",
Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)}};
Pistache::Rest::Routes::bind(&HttpManager::dump_shard, this)},

// we support triggering gc for:
// 1 all the chunks in all the pg: no input param
// 2 all the chunks in a specific pg: input param is pg_id
// 3 a specific chunk: input param is pchunk_id

{Pistache::Http::Method::Post, "/api/v1/trigger_gc",
Pistache::Rest::Routes::bind(&HttpManager::trigger_gc, this)},
{Pistache::Http::Method::Get, "/api/v1/gc_job_status",
Pistache::Rest::Routes::bind(&HttpManager::get_gc_job_status, this)}};

auto http_server = ioenvironment.get_http_server();
if (!http_server) {
Expand Down Expand Up @@ -239,6 +249,290 @@ void HttpManager::dump_shard(const Pistache::Rest::Request& request, Pistache::H
response.send(Pistache::Http::Code::Ok, j.dump());
}

void HttpManager::trigger_gc(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
auto gc_mgr = ho_.gc_manager();
if (!gc_mgr) {
response.send(Pistache::Http::Code::Internal_Server_Error, "GC manager not available");
return;
}

auto chunk_selector = ho_.chunk_selector();
if (!chunk_selector) {
response.send(Pistache::Http::Code::Internal_Server_Error, "Chunk selector not available");
return;
}

const auto chunk_id_param = request.query().get("chunk_id");
const auto pg_id_param = request.query().get("pg_id");

if (chunk_id_param && !chunk_id_param.value().empty()) {
// trigger gc for a specific chunk, the chunk_id is pchunk_id, not vchunk_id
uint32_t chunk_id = std::stoul(chunk_id_param.value());
LOGINFO("Received trigger_gc request for chunk_id {}", chunk_id);

auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
if (!chunk) {
nlohmann::json error;
error["chunk_id"] = chunk_id;
error["error"] = "Chunk not found";
response.send(Pistache::Http::Code::Not_Found, error.dump());
return;
}

if (!chunk->m_pg_id.has_value()) {
nlohmann::json error;
error["chunk_id"] = chunk_id;
error["error"] = "Chunk belongs to no pg";
response.send(Pistache::Http::Code::Not_Found, error.dump());
return;
}

const auto pg_id = chunk->m_pg_id.value();
nlohmann::json result;
const auto job_id = generate_job_id();

result["chunk_id"] = chunk_id;
result["pg_id"] = pg_id;
result["job_id"] = job_id;

if (chunk->m_state == ChunkState::GC) {
result["message"] = "chunk is already under GC now, this task will not be executed!";
response.send(Pistache::Http::Code::Ok, result.dump());
return;
}
result["message"] = "GC triggered for chunk, pls query job status using gc_job_status API";

// return response before starting the GC so that we don't block the client.
response.send(Pistache::Http::Code::Accepted, result.dump());

auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id, chunk_id);
{
std::lock_guard lock(gc_job_mutex_);
gc_jobs_map_.set(job_id, job_info);
}

// sumbit gc task for this chunk

// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
auto repl_dev = hs_pg->repl_dev_;
repl_dev->quiesce_reqs();
repl_dev->clear_chunk_req(chunk_id);
const auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;

gc_mgr->submit_gc_task(priority, chunk_id)
.via(&folly::InlineExecutor::instance())
.thenValue([this, job_info, repl_dev](bool res) {
job_info->status = res ? GCJobStatus::COMPLETED : GCJobStatus::FAILED;
// Resume accepting new requests for this pg
repl_dev->resume_accepting_reqs();
});
} else if (pg_id_param && !pg_id_param.value().empty()) {
// trigger gc for all chunks in a specific pg
const auto pg_id = std::stoul(pg_id_param.value());
LOGINFO("Received trigger_gc request for pg_id {}", pg_id);
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
if (!hs_pg) {
nlohmann::json error;
error["pg_id"] = pg_id;
error["error"] = "PG not found";
response.send(Pistache::Http::Code::Not_Found, error.dump());
return;
}

nlohmann::json result;
const auto job_id = generate_job_id();
result["pg_id"] = pg_id;
result["job_id"] = job_id;
result["message"] = "GC triggered for a single pg, pls query job status using gc_job_status API";
// return response before starting the GC so that we don't block the client.
response.send(Pistache::Http::Code::Accepted, result.dump());

auto job_info = std::make_shared< GCJobInfo >(job_id, pg_id);
{
std::lock_guard lock(gc_job_mutex_);
gc_jobs_map_.set(job_id, job_info);
}

LOGINFO("GC job {} stopping GC scan timer", job_id);
gc_mgr->stop_gc_scan_timer();

// we block here until all gc tasks for the pg are done
trigger_gc_for_pg(pg_id, job_id);

LOGINFO("GC job {} restarting GC scan timer", job_id);
gc_mgr->start_gc_scan_timer();
} else {
LOGINFO("Received trigger_gc request for all chunks");
nlohmann::json result;
const auto job_id = generate_job_id();
result["job_id"] = job_id;
result["message"] = "GC triggered for all chunks, pls query job status using gc_job_status API";
// return response before starting the GC so that we don't block the client.
response.send(Pistache::Http::Code::Accepted, result.dump());

auto job_info = std::make_shared< GCJobInfo >(job_id);
{
std::lock_guard lock(gc_job_mutex_);
gc_jobs_map_.set(job_id, job_info);
}

std::vector< pg_id_t > pg_ids;
ho_.get_pg_ids(pg_ids);
LOGINFO("GC job {} will process {} PGs", job_id, pg_ids.size());
LOGINFO("GC job {} stopping GC scan timer", job_id);
gc_mgr->stop_gc_scan_timer();

// we block here until all gc tasks for the pg are done
for (const auto& pg_id : pg_ids) {
trigger_gc_for_pg(pg_id, job_id);
}

LOGINFO("GC job {} restarting GC scan timer", job_id);
gc_mgr->start_gc_scan_timer();
}
}

std::string HttpManager::generate_job_id() {
auto counter = job_counter_.fetch_add(1, std::memory_order_relaxed);
return fmt::format("trigger-gc-task-{}", counter);
}

void HttpManager::get_job_status(const std::string& job_id, nlohmann::json& result) {
result["job_id"] = job_id;
std::shared_ptr< GCJobInfo > job_info;
{
std::shared_lock lock(gc_job_mutex_);
job_info = gc_jobs_map_.get(job_id);
}

if (!job_info) {
result["error"] = "job_id not found, or job has been evicted";
return;
}

switch (job_info->status) {
case GCJobStatus::RUNNING:
result["status"] = "running";
break;
case GCJobStatus::COMPLETED:
result["status"] = "completed";
break;
case GCJobStatus::FAILED:
result["status"] = "failed";
break;
}

if (job_info->chunk_id.has_value()) { result["chunk_id"] = job_info->chunk_id.value(); }
if (job_info->pg_id.has_value()) { result["pg_id"] = job_info->pg_id.value(); }

if (job_info->total_chunks > 0) {
nlohmann::json stats;
stats["total_chunks"] = job_info->total_chunks;
stats["success_count"] = job_info->success_count;
stats["failed_count"] = job_info->failed_count;
result["statistics"] = stats;
}
}

void HttpManager::get_gc_job_status(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
auto job_id_param = request.query().get("job_id");
if (job_id_param && !job_id_param.value().empty()) {
const auto job_id = job_id_param.value();
LOGINFO("query job {} status!", job_id);
nlohmann::json result;
get_job_status(job_id, result);
response.send(Pistache::Http::Code::Ok, result.dump());
return;
}

LOGINFO("query all job status!");
nlohmann::json result;
std::vector< std::string > job_ids;
{
std::shared_lock lock(gc_job_mutex_);
for (const auto& [k, v] : gc_jobs_map_) {
job_ids.push_back(k);
}
}

for (const auto& job_id : job_ids) {
nlohmann::json job_json;
get_job_status(job_id, job_json);
result["jobs"].push_back(job_json);
}

response.send(Pistache::Http::Code::Ok, result.dump());
}

void HttpManager::trigger_gc_for_pg(uint16_t pg_id, const std::string& job_id) {
auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);

LOGINFO("GC job {} draining pending GC tasks for PG {}", job_id, pg_id);
auto gc_mgr = ho_.gc_manager();
gc_mgr->drain_pg_pending_gc_task(pg_id);
auto pg_sb = hs_pg->pg_sb_.get();
std::vector< homestore::chunk_num_t > pg_chunks(pg_sb->get_chunk_ids(), pg_sb->get_chunk_ids() + pg_sb->num_chunks);

LOGINFO("GC job {} processing PG {} with {} chunks", job_id, pg_id, pg_chunks.size());
hs_pg->repl_dev_->quiesce_reqs();
std::vector< folly::SemiFuture< bool > > gc_task_futures;

std::shared_ptr< GCJobInfo > job_info;
{
std::shared_lock lock(gc_job_mutex_);
job_info = gc_jobs_map_.get(job_id);
}

auto chunk_selector = ho_.chunk_selector();

for (const auto& chunk_id : pg_chunks) {
job_info->total_chunks++;
// Determine priority based on chunk state (INUSE means has open shard)
auto chunk = chunk_selector->get_extend_vchunk(chunk_id);
RELEASE_ASSERT(chunk, "Chunk {} not found during GC job {}", chunk_id, job_id);
auto priority = chunk->m_state == ChunkState::INUSE ? task_priority::emergent : task_priority::normal;

// Clear in-memory requests only for emergent priority chunks (chunks with open shards)
if (priority == task_priority::emergent) { hs_pg->repl_dev_->clear_chunk_req(chunk_id); }

// Submit GC task for this chunk
auto future = gc_mgr->submit_gc_task(priority, chunk_id);
gc_task_futures.push_back(std::move(future));
LOGDEBUG("GC job {} for chunk {} in PG {} with priority={}", job_id, chunk_id, pg_id,
(priority == task_priority::emergent) ? "emergent" : "normal");
}

folly::collectAllUnsafe(gc_task_futures)
.thenValue([job_info](auto&& results) {
for (auto const& ok : results) {
RELEASE_ASSERT(ok.hasValue(), "we never throw any exception when copying data");
if (ok.value()) {
job_info->success_count++;
} else {
job_info->failed_count++;
}
}
})
.thenValue([this, pg_id, job_info, gc_mgr](auto&& rets) {
LOGINFO("All GC tasks have been processed");
const auto& job_id = job_info->job_id;

auto hs_pg = const_cast< HSHomeObject::HS_PG* >(ho_.get_hs_pg(pg_id));
RELEASE_ASSERT(hs_pg, "HS PG {} not found during GC job {}", pg_id, job_id);
// Resume accepting new requests for this pg
hs_pg->repl_dev_->resume_accepting_reqs();
LOGINFO("GC job {} resumed accepting requests for PG {}", job_id, pg_id);

job_info->status = job_info->failed_count ? GCJobStatus::FAILED : GCJobStatus::COMPLETED;
LOGINFO("GC job {} completed: total={}, success={}, failed={}", job_id, job_info->total_chunks,
job_info->success_count, job_info->failed_count);
})
.get();
}

#ifdef _PRERELEASE
void HttpManager::crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) {
std::string crash_type;
Expand Down
Loading