Skip to content

Commit dcf3698

Browse files
authored
Merge branch 'eBay:main' into main
2 parents 2fead85 + dee044c commit dcf3698

File tree

8 files changed

+116
-39
lines changed

8 files changed

+116
-39
lines changed

src/lib/homestore_backend/CMakeLists.txt

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,18 +127,12 @@ add_test(NAME HomestoreResyncTestWithLeaderRestart
127127
# GC tests
128128
add_test(NAME FetchDataWithOriginatorGC
129129
COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./
130-
--override_config hs_backend_config.enable_gc=true
131-
# remove this until gc supports baseline resync
132-
--override_config homestore_config.consensus.snapshot_freq_distance:0
133130
--gtest_filter=HomeObjectFixture.FetchDataWithOriginatorGC)
134131

135132
add_executable(homestore_test_gc)
136133
target_sources(homestore_test_gc PRIVATE $<TARGET_OBJECTS:homestore_tests_gc>)
137134
target_link_libraries(homestore_test_gc PUBLIC homeobject_homestore ${COMMON_TEST_DEPS})
138135
add_test(NAME HomestoreTestGC COMMAND homestore_test_gc -csv error --executor immediate --config_path ./
139-
--override_config hs_backend_config.enable_gc=true
140136
--override_config hs_backend_config.gc_garbage_rate_threshold=0
141-
--override_config hs_backend_config.gc_scan_interval_sec=5
142-
# remove this until gc supports baseline resync
143-
--override_config homestore_config.consensus.snapshot_freq_distance:0)
137+
--override_config hs_backend_config.gc_scan_interval_sec=5)
144138

src/lib/homestore_backend/gc_manager.cpp

Lines changed: 83 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,11 @@ void GCManager::stop() {
132132

133133
folly::SemiFuture< bool > GCManager::submit_gc_task(task_priority priority, chunk_id_t chunk_id) {
134134
if (!is_started()) return folly::makeFuture< bool >(false);
135+
135136
auto pdev_id = m_chunk_selector->get_extend_vchunk(chunk_id)->get_pdev_id();
136137
auto it = m_pdev_gc_actors.find(pdev_id);
137138
if (it == m_pdev_gc_actors.end()) {
138-
LOGINFO("pdev gc actor not found for pdev_id: {}", pdev_id);
139+
LOGINFO("pdev gc actor not found for pdev_id={}, chunk={}", pdev_id, chunk_id);
139140
return folly::makeFuture< bool >(false);
140141
}
141142
auto& actor = it->second;
@@ -178,22 +179,14 @@ bool GCManager::is_eligible_for_gc(chunk_id_t chunk_id) {
178179
return false;
179180
}
180181

181-
// it does not belong to any pg, so we don't need to gc it.
182-
if (!chunk->m_pg_id.has_value()) {
183-
LOGDEBUG("chunk_id={} belongs to no pg, not eligible for gc", chunk_id)
184-
return false;
185-
}
186-
187182
const auto total_blk_num = chunk->get_total_blks();
188183

189184
const auto gc_garbage_rate_threshold = HS_BACKEND_DYNAMIC_CONFIG(gc_garbage_rate_threshold);
190185

191186
bool should_gc = 100 * defrag_blk_num >= total_blk_num * gc_garbage_rate_threshold;
192187

193-
LOGDEBUG("gc scan chunk_id={}, belongs to pg {}, use_blks={}, available_blks={}, total_blks={}, defrag_blks={}, "
194-
"should_gc={}",
195-
chunk_id, chunk->m_pg_id.value(), chunk->get_used_blks(), chunk->available_blks(), total_blk_num,
196-
defrag_blk_num, should_gc);
188+
LOGDEBUG("gc scan chunk_id={}, use_blks={}, available_blks={}, total_blks={}, defrag_blks={}, should_gc={}",
189+
chunk_id, chunk->get_used_blks(), chunk->available_blks(), total_blk_num, defrag_blk_num, should_gc);
197190

198191
return should_gc;
199192
}
@@ -292,10 +285,27 @@ void GCManager::pdev_gc_actor::add_reserved_chunk(
292285
}
293286

294287
folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority, chunk_id_t move_from_chunk) {
288+
auto EXvchunk = m_chunk_selector->get_extend_vchunk(move_from_chunk);
289+
// it does not belong to any pg, so we don't need to gc it.
290+
if (!EXvchunk->m_pg_id.has_value()) {
291+
LOGDEBUG("chunk_id={} belongs to no pg, not eligible for gc", move_from_chunk)
292+
return folly::makeSemiFuture< bool >(false);
293+
}
294+
295+
const auto pg_id = EXvchunk->m_pg_id.value();
296+
297+
m_hs_home_object->gc_manager()->incr_pg_pending_gc_task(pg_id);
298+
299+
if (!m_hs_home_object->can_chunks_in_pg_be_gc(pg_id)) {
300+
LOGDEBUG("chunk_id={} belongs to pg {}, which is not eligible for gc at this moment!", move_from_chunk, pg_id)
301+
m_hs_home_object->gc_manager()->decr_pg_pending_gc_task(pg_id);
302+
return folly::makeSemiFuture< bool >(false);
303+
}
304+
295305
if (m_chunk_selector->try_mark_chunk_to_gc_state(move_from_chunk,
296306
priority == static_cast< uint8_t >(task_priority::emergent))) {
297307
auto [promise, future] = folly::makePromiseContract< bool >();
298-
auto gc_task_id = GCManager::_gc_task_id.fetch_add(1);
308+
const auto gc_task_id = GCManager::_gc_task_id.fetch_add(1);
299309

300310
if (sisl_unlikely(priority == static_cast< uint8_t >(task_priority::emergent))) {
301311
m_egc_executor->add([this, gc_task_id, priority, move_from_chunk, promise = std::move(promise)]() mutable {
@@ -313,9 +323,49 @@ folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority
313323
}
314324

315325
LOGWARN("fail to submit gc task for chunk_id={}, priority={}", move_from_chunk, priority);
326+
m_hs_home_object->gc_manager()->decr_pg_pending_gc_task(pg_id);
316327
return folly::makeSemiFuture< bool >(false);
317328
}
318329

330+
void GCManager::drain_pg_pending_gc_task(const pg_id_t pg_id) {
331+
while (true) {
332+
uint64_t pending_gc_task_num{0};
333+
{
334+
std::unique_lock lock(m_pending_gc_task_mtx);
335+
pending_gc_task_num = m_pending_gc_task_num_per_pg.try_emplace(pg_id, 0).first->second.load();
336+
}
337+
338+
if (pending_gc_task_num) {
339+
LOGDEBUG("{} pending gc tasks to be completed for pg={}, wait for 2 seconds!", pending_gc_task_num, pg_id);
340+
} else {
341+
break;
342+
}
343+
// wait until all the pending gc tasks for this pg are completed
344+
std::this_thread::sleep_for(std::chrono::seconds(2));
345+
}
346+
347+
LOGDEBUG("all pending gc tasks for pg_id={} are completed", pg_id);
348+
}
349+
350+
void GCManager::decr_pg_pending_gc_task(const pg_id_t pg_id) {
351+
std::unique_lock lock(m_pending_gc_task_mtx);
352+
auto& pending_gc_task_num = m_pending_gc_task_num_per_pg.try_emplace(pg_id, 0).first->second;
353+
if (pending_gc_task_num.load()) {
354+
// TODO::avoid overflow here.
355+
--pending_gc_task_num;
356+
LOGDEBUG("decrease pending gc task num for pg_id={}, now it is {}", pg_id, pending_gc_task_num.load());
357+
return;
358+
}
359+
LOGDEBUG("pending gc task num for pg_id={} is already 0, no need to decrease it", pg_id);
360+
}
361+
362+
void GCManager::incr_pg_pending_gc_task(const pg_id_t pg_id) {
363+
std::unique_lock lock(m_pending_gc_task_mtx);
364+
auto& pending_gc_task_num = m_pending_gc_task_num_per_pg.try_emplace(pg_id, 0).first->second;
365+
++pending_gc_task_num;
366+
LOGDEBUG("increase pending gc task num for pg_id={}, now it is {}", pg_id, pending_gc_task_num.load());
367+
}
368+
319369
// this method is expected to be called sequentially when replaying metablk, so we don't need to worry about the
320370
// concurrency issue.
321371
void GCManager::pdev_gc_actor::handle_recovered_gc_task(
@@ -448,10 +498,11 @@ bool GCManager::pdev_gc_actor::replace_blob_index(
448498
}
449499

450500
if (existing_pbas.chunk_num() != move_from_chunk) {
451-
LOGERROR("gc task_id={}, existing pbas chunk={} should be equal to move_from_chunk={}, pg_id={}, "
452-
"shard_id={}, blob_id={}, move_to_chunk={} , existing_pbas={}, new_pbas={}",
453-
task_id, existing_pbas.chunk_num(), move_from_chunk, pg_id, shard, blob, move_to_chunk,
454-
existing_pbas.to_string(), new_pbas.to_string());
501+
LOGWARN("gc task_id={}, existing pbas chunk={} should be equal to move_from_chunk={}, pg_id={}, "
502+
"shard_id={}, blob_id={}, move_to_chunk={} , existing_pbas={}, new_pbas={}, this case "
503+
"might happen when crash recovery",
504+
task_id, existing_pbas.chunk_num(), move_from_chunk, pg_id, shard, blob, move_to_chunk,
505+
existing_pbas.to_string(), new_pbas.to_string());
455506
return homestore::put_filter_decision::keep;
456507
}
457508

@@ -676,8 +727,9 @@ bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk
676727
RELEASE_ASSERT(header_sgs.iovs.size() == 1, "header_sgs.iovs.size() should be 1, but not!");
677728
iomanager.iobuf_free(reinterpret_cast< uint8_t* >(header_sgs.iovs[0].iov_base));
678729
if (err) {
679-
LOGERROR("gc task_id={}, Failed to write shard header for move_to_chunk={} shard_id={}, err={}",
680-
task_id, move_to_chunk, shard_id, err.value());
730+
LOGERROR("gc task_id={}, Failed to write shard header for move_to_chunk={} shard_id={}, "
731+
"err={}, err_category={}, err_message={}",
732+
task_id, move_to_chunk, shard_id, err.value(), err.category().name(), err.message());
681733
return folly::makeFuture< bool >(false);
682734
}
683735

@@ -713,8 +765,9 @@ bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk
713765
if (err) {
714766
LOGERROR(
715767
"gc task_id={}, Failed to read blob from move_from_chunk={}, shard_id={}, "
716-
"blob_id={}: err={}",
717-
task_id, move_from_chunk, k.key().shard, k.key().blob, err.value());
768+
"blob_id={}, err={}, err_category={}, err_message={}",
769+
task_id, move_from_chunk, k.key().shard, k.key().blob, err.value(),
770+
err.category().name(), err.message());
718771
iomanager.iobuf_free(reinterpret_cast< uint8_t* >(data_sgs.iovs[0].iov_base));
719772
return folly::makeFuture< bool >(false);
720773
}
@@ -747,10 +800,11 @@ bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk
747800
iomanager.iobuf_free(
748801
reinterpret_cast< uint8_t* >(data_sgs.iovs[0].iov_base));
749802
if (err) {
750-
LOGERROR("gc task_id={}, Failed to write blob to move_to_chunk={}, "
751-
"shard_id={}, blob_id={}, err={}",
752-
task_id, move_to_chunk, k.key().shard, k.key().blob,
753-
err.value());
803+
LOGERROR(
804+
"gc task_id={}, Failed to write blob to move_to_chunk={}, "
805+
"shard_id={}, blob_id={}, err={}, err_category={}, err_message={}",
806+
task_id, move_to_chunk, k.key().shard, k.key().blob, err.value(),
807+
err.category().name(), err.message());
754808
return false;
755809
}
756810

@@ -811,10 +865,10 @@ bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk
811865
iomanager.iobuf_free(reinterpret_cast< uint8_t* >(footer_sgs.iovs[0].iov_base));
812866

813867
if (err) {
814-
LOGERROR(
815-
"gc task_id={}, Failed to write shard footer for move_to_chunk={} shard_id={}, "
816-
"err={}",
817-
task_id, move_to_chunk, shard_id, err.value());
868+
LOGERROR("gc task_id={}, Failed to write shard footer for move_to_chunk={} "
869+
"shard_id={}, err={}, error_category={}, error_message={}",
870+
task_id, move_to_chunk, shard_id, err.value(), err.category().name(),
871+
err.message());
818872
return false;
819873
}
820874
return true;
@@ -1043,6 +1097,7 @@ void GCManager::pdev_gc_actor::process_gc_task(chunk_id_t move_from_chunk, uint8
10431097

10441098
task.setValue(true);
10451099
m_reserved_chunk_queue.blockingWrite(move_from_chunk);
1100+
m_hs_home_object->gc_manager()->decr_pg_pending_gc_task(pg_id);
10461101
LOGINFO("gc task_id={} for move_from_chunk={} to move_to_chunk={} with priority={} is completed", task_id,
10471102
move_from_chunk, move_to_chunk, priority);
10481103
}

src/lib/homestore_backend/gc_manager.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ class GCManager {
199199

200200
void scan_chunks_for_gc();
201201
void handle_all_recovered_gc_tasks();
202+
void drain_pg_pending_gc_task(const pg_id_t pg_id);
203+
void decr_pg_pending_gc_task(const pg_id_t pg_id);
204+
void incr_pg_pending_gc_task(const pg_id_t pg_id);
202205

203206
private:
204207
void on_gc_task_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie);
@@ -212,6 +215,8 @@ class GCManager {
212215
iomgr::timer_handle_t m_gc_timer_hdl{iomgr::null_timer_handle};
213216
HSHomeObject* m_hs_home_object{nullptr};
214217
std::list< homestore::superblk< GCManager::gc_task_superblk > > m_recovered_gc_tasks;
218+
std::unordered_map< pg_id_t, atomic_uint64_t > m_pending_gc_task_num_per_pg;
219+
std::mutex m_pending_gc_task_mtx;
215220
};
216221

217222
} // namespace homeobject

src/lib/homestore_backend/heap_chunk_selector.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ csharedChunk HeapChunkSelector::select_specific_chunk(const pg_id_t pg_id, const
139139
}
140140

141141
// if the chunk is not available, probably being gc. we wait for a while and retry
142-
std::this_thread::sleep_for(std::chrono::seconds(5));
142+
std::this_thread::sleep_for(std::chrono::seconds(1));
143143
}
144144

145145
LOGDEBUGMOD(homeobject, "chunk={} is selected for v_chunk_id={}, pg={}", chunk->get_chunk_id(), v_chunk_id, pg_id);

src/lib/homestore_backend/hs_backend_config.fbs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ table HSBackendSettings {
2121

2222
//Enable GC
2323
//TODO: make this hotswap after gc is well tested
24-
enable_gc: bool = false;
24+
enable_gc: bool = true;
2525

2626
//Total reserved chunk num (dedicated for gc/egc) per pdev
2727
reserved_chunk_num_per_pdev: uint8 = 6;
@@ -33,7 +33,7 @@ table HSBackendSettings {
3333
gc_scan_interval_sec: uint64 = 60;
3434

3535
//GC garbage rate threshold, upon which a chunk will be selected for gc
36-
gc_garbage_rate_threshold: uint8 = 80;
36+
gc_garbage_rate_threshold: uint8 = 50;
3737

3838
//max read/write block count per second, which is used by ratelimiter to limit the io resource taken by gc
3939
max_read_write_block_count_per_second: uint16 = 7680;

src/lib/homestore_backend/hs_homeobject.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,14 @@ class HSHomeObject : public HomeObjectImpl {
856856
*/
857857
bool release_chunk_based_on_create_shard_message(sisl::blob const& header);
858858

859+
/**
860+
* @brief check whether the chunks in a given pg can be gc.
861+
*
862+
* @param pg_id The ID of the PG whose shards are to be destroyed.
863+
* @return True if the chunks in the PG can be garbage collected, false otherwise.
864+
*/
865+
bool can_chunks_in_pg_be_gc(pg_id_t pg_id) const;
866+
859867
bool pg_exists(pg_id_t pg_id) const;
860868

861869
uint32_t get_reserved_blks() const { return _hs_reserved_blks; }

src/lib/homestore_backend/hs_pg_manager.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,11 @@ bool HSHomeObject::pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine
450450
}
451451
LOGI("Destroying pg={}", pg_id);
452452
mark_pg_destroyed(pg_id);
453+
454+
// we have the assumption that after pg is marked as destroyed, it will not be marked as alive again.
455+
// TODO:: if this assumption is broken, we need to handle it.
456+
gc_mgr_->drain_pg_pending_gc_task(pg_id);
457+
453458
destroy_shards(pg_id);
454459
destroy_hs_resources(pg_id);
455460
destroy_pg_index_table(pg_id);
@@ -505,6 +510,17 @@ void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
505510
LOGD("pg={} is marked as destroyed", pg_id);
506511
}
507512

513+
bool HSHomeObject::can_chunks_in_pg_be_gc(pg_id_t pg_id) const {
514+
auto lg = std::scoped_lock(_pg_lock);
515+
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
516+
if (hs_pg == nullptr) {
517+
LOGW("unknown pg={}", pg_id);
518+
return false;
519+
}
520+
521+
return hs_pg->pg_sb_->state == PGState::ALIVE;
522+
}
523+
508524
void HSHomeObject::destroy_hs_resources(pg_id_t pg_id) { chunk_selector_->reset_pg_chunks(pg_id); }
509525

510526
void HSHomeObject::destroy_pg_index_table(pg_id_t pg_id) {

src/lib/homestore_backend/replication_state_machine.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,6 @@ folly::Future< std::error_code > ReplicationStateMachine::on_fetch_data(const in
683683
LOGD("fetch data with blob_id={}, shard=0x{:x}", blob_id, shard_id);
684684
// we first try to read data according to the local_blk_id to see if it matches the blob_id
685685
return std::move(homestore::data_service().async_read(local_blk_id, given_buffer, total_size))
686-
.via(folly::getGlobalIOExecutor())
687686
.thenValue([this, lsn, blob_id, shard_id, given_buffer, total_size](auto&& err) {
688687
// io error
689688
if (err) {

0 commit comments

Comments
 (0)