diff --git a/conanfile.py b/conanfile.py index ea920cb5..f8d1a43d 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "4.1.1" + version = "4.1.2" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeStore" diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 72c896cd..d4a1d25f 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -576,12 +576,10 @@ class HSHomeObject : public HomeObjectImpl { REGISTER_COUNTER(snp_dnr_error_count, "Error times when reading blobs in baseline resync"); REGISTER_HISTOGRAM(snp_dnr_blob_process_latency, "Time cost(us) of successfully process a blob in baseline resync", - HistogramBucketsType(LowResolutionLatecyBuckets), - _publish_as::publish_as_sum_count); + HistogramBucketsType(LowResolutionLatecyBuckets), _publish_as::publish_as_sum_count); REGISTER_HISTOGRAM(snp_dnr_batch_process_latency, "Time cost(ms) of successfully process a batch in baseline resync", - HistogramBucketsType(LowResolutionLatecyBuckets), - _publish_as::publish_as_sum_count); + HistogramBucketsType(LowResolutionLatecyBuckets), _publish_as::publish_as_sum_count); REGISTER_HISTOGRAM(snp_dnr_batch_e2e_latency, "Time cost(ms) of a batch end-to-end round trip in baseline resync", HistogramBucketsType(LowResolutionLatecyBuckets)); @@ -684,12 +682,10 @@ class HSHomeObject : public HomeObjectImpl { REGISTER_GAUGE(snp_rcvr_error_count, "Error count in baseline resync"); REGISTER_HISTOGRAM(snp_rcvr_blob_process_time, "Time cost(us) of successfully process a blob in baseline resync", - HistogramBucketsType(LowResolutionLatecyBuckets), - _publish_as::publish_as_sum_count); + HistogramBucketsType(LowResolutionLatecyBuckets), _publish_as::publish_as_sum_count); REGISTER_HISTOGRAM(snp_rcvr_batch_process_time, "Time cost(ms) of successfully process a batch in baseline resync", - HistogramBucketsType(LowResolutionLatecyBuckets), - _publish_as::publish_as_sum_count); + HistogramBucketsType(LowResolutionLatecyBuckets), _publish_as::publish_as_sum_count); attach_gather_cb(std::bind(&ReceiverSnapshotMetrics::on_gather, this)); register_me_to_farm(); @@ -1059,6 +1055,9 @@ class HSHomeObject : public HomeObjectImpl { BlobManager::Result< std::vector< BlobInfo > > get_shard_blobs(shard_id_t shard_id); + // Refresh PG statistics (called after log replay) + void refresh_pg_statistics(pg_id_t pg_id); + private: BlobManager::Result< std::string > do_verify_blob(const void* blob, shard_id_t expected_shard_id, blob_id_t expected_blob_id = 0) const; diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 9dd9242c..77948d27 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -398,10 +398,10 @@ void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std: boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); } -//This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists +// This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const std::string& task_id, - const replica_member_info& member_out, - const replica_member_info& member_in, trace_id_t tid) { + const replica_member_info& member_out, + const replica_member_info& member_in, trace_id_t tid) { std::unique_lock lck(_pg_lock); for (const auto& iter : _pg_map) { auto& pg = iter.second; @@ -421,13 +421,13 @@ void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const st LOGI("PG clean replace member task done (rollback), task_id={}, removed in_member={} (removed={}), " "ensured out_member={} (inserted={}), member_nums={}, trace_id={}", - task_id, boost::uuids::to_string(member_in.id), removed_count, - boost::uuids::to_string(member_out.id), inserted, pg->pg_info_.members.size(), tid); + task_id, boost::uuids::to_string(member_in.id), removed_count, boost::uuids::to_string(member_out.id), + inserted, pg->pg_info_.members.size(), tid); return; } } - LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}", task_id, - boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); + LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}", + task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid); } bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { @@ -457,7 +457,8 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { // New member not in our records, add with default name PGMember new_member(member_id); new_members.insert(std::move(new_member)); - LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id), hs_pg->pg_info_.id); + LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id), + hs_pg->pg_info_.id); } } // Check if membership changed @@ -471,9 +472,7 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { std::vector< peer_id_t > added_members; for (auto& old_member : hs_pg->pg_info_.members) { - if (new_members.find(old_member) == new_members.end()) { - removed_members.push_back(old_member.id); - } + if (new_members.find(old_member) == new_members.end()) { removed_members.push_back(old_member.id); } } for (auto& new_member : new_members) { @@ -482,7 +481,8 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) { } } - LOGI("Reconciling PG {} membership: removing {} members, adding {} members, old membership count {}, new membership count: {}", + LOGI("Reconciling PG {} membership: removing {} members, adding {} members, old membership count {}, new " + "membership count: {}", pg_id, removed_members.size(), added_members.size(), hs_pg->pg_info_.members.size(), new_members.size()); for (auto& member_id : removed_members) { @@ -1245,6 +1245,75 @@ uint32_t HSHomeObject::get_pg_tombstone_blob_count(pg_id_t pg_id) const { return tombstone_blob_count; } +void HSHomeObject::refresh_pg_statistics(pg_id_t pg_id) { + auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id)); + RELEASE_ASSERT(hs_pg, "Failed to get pg={} for statistics refresh", pg_id); + + // Step 1: Scan index table to count active and tombstone blobs in one pass + uint64_t active_count = 0; + uint64_t tombstone_count = 0; + + auto start_key = + BlobRouteKey{BlobRoute{uint64_t(pg_id) << homeobject::shard_width, std::numeric_limits< uint64_t >::min()}}; + auto end_key = + BlobRouteKey{BlobRoute{uint64_t(pg_id + 1) << homeobject::shard_width, std::numeric_limits< uint64_t >::min()}}; + + homestore::BtreeQueryRequest< BlobRouteKey > query_req{ + homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key), + false /* inclusive */}, + homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY, + std::numeric_limits< uint32_t >::max() /* blob count in a pg will not exceed uint32_t_max*/, + [&active_count, &tombstone_count](homestore::BtreeKey const& key, + homestore::BtreeValue const& value) mutable -> bool { + BlobRouteValue blob_value{value}; + if (blob_value.pbas() == HSHomeObject::tombstone_pbas) { + tombstone_count++; + } else { + active_count++; + } + return false; // Continue scanning + }}; + + std::vector< std::pair< BlobRouteKey, BlobRouteValue > > dummy_out; + auto ret = hs_pg->index_table_->query(query_req, dummy_out); + RELEASE_ASSERT(ret == homestore::btree_status_t::success, "Failed to scan index table for pg={}, status={}", pg_id, + ret); + + // Step 2: Scan chunks to calculate total occupied blocks + auto chunk_ids = chunk_selector()->get_pg_chunks(pg_id); + RELEASE_ASSERT(chunk_ids, "Failed to get chunks for pg={}", pg_id); + + uint64_t total_occupied = 0; + for (const auto& chunk_id : *chunk_ids) { + auto vchunk = chunk_selector()->get_extend_vchunk(chunk_id); + RELEASE_ASSERT(vchunk, "Failed to get vchunk={} for pg={}", chunk_id, pg_id); + total_occupied += vchunk->get_used_blks(); + } + + // Step 3: Update durable_entities and capture original values for debugging + uint64_t original_active_count = 0; + uint64_t original_tombstone_count = 0; + uint64_t original_occupied_count = 0; + + hs_pg->durable_entities_update([active_count, tombstone_count, total_occupied, &original_active_count, + &original_tombstone_count, &original_occupied_count](auto& de) { + // Capture original values + original_active_count = de.active_blob_count.load(std::memory_order_relaxed); + original_tombstone_count = de.tombstone_blob_count.load(std::memory_order_relaxed); + original_occupied_count = de.total_occupied_blk_count.load(std::memory_order_relaxed); + + // Update with corrected values + de.active_blob_count.store(active_count, std::memory_order_relaxed); + de.tombstone_blob_count.store(tombstone_count, std::memory_order_relaxed); + de.total_occupied_blk_count.store(total_occupied, std::memory_order_relaxed); + }); + + LOGI("[corrected] Refreshed statistics for pg={}: active_blobs={} (original={}), tombstone_blobs={} (original={}), " + "occupied_blocks={} (original={})", + pg_id, active_count, original_active_count, tombstone_count, original_tombstone_count, total_occupied, + original_occupied_count); +} + void HSHomeObject::update_pg_meta_after_gc(const pg_id_t pg_id, const homestore::chunk_num_t move_from_chunk, const homestore::chunk_num_t move_to_chunk, const uint64_t task_id) { // 1 update pg metrics diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 71d536cc..6ea8a1c0 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -1046,6 +1046,10 @@ void ReplicationStateMachine::on_log_replay_done(const homestore::group_id_t& gr LOGD("vchunk={} is selected for shard={} in pg={} when recovery", vchunk_id, shard_sb->info.id, pg_id); } } + + // Refresh PG statistics after log replay + LOGI("Starting statistics refresh for pg={}", pg_id); + home_object_->refresh_pg_statistics(pg_id); } } // namespace homeobject diff --git a/src/lib/homestore_backend/tests/hs_pg_tests.cpp b/src/lib/homestore_backend/tests/hs_pg_tests.cpp index a1a9d6f0..dede3778 100644 --- a/src/lib/homestore_backend/tests/hs_pg_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_pg_tests.cpp @@ -426,4 +426,87 @@ TEST_F(HomeObjectFixture, CreatePGFailed) { restart(); ASSERT_TRUE(pg_exist(pg_id)); } -#endif \ No newline at end of file +#endif + +TEST_F(HomeObjectFixture, PGRefreshStatisticsTest) { + LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num()); + g_helper->sync(); + + // Create a pg and shard + pg_id_t pg_id{1}; + create_pg(pg_id); + auto shard_info = create_shard(pg_id, 64 * Mi, "shard meta"); + auto shard_id = shard_info.id; + auto s = _obj_inst->shard_manager()->get_shard(shard_id).get(); + ASSERT_TRUE(!!s); + LOGINFO("Created shard {}", shard_info.id); + + // Put some blobs to populate statistics using put_blobs + const uint32_t num_active_blobs = 10; + std::map< pg_id_t, blob_id_t > pg_blob_id; + pg_blob_id[pg_id] = 0; + std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec; + pg_shard_id_vec[pg_id] = {shard_id}; + + auto shard_blob_map = put_blobs(pg_shard_id_vec, num_active_blobs, pg_blob_id); + LOGINFO("Put {} active blobs", num_active_blobs); + + // Delete some blobs to create tombstones + const uint32_t num_tombstones = 3; + for (uint32_t i = 0; i < num_tombstones; ++i) { + del_blob(pg_id, shard_id, i); + } + LOGINFO("Created {} tombstone blobs", num_tombstones); + + // Get PG from _pg_map + ASSERT_TRUE(_obj_inst->_pg_map.find(pg_id) != _obj_inst->_pg_map.end()); + auto hs_pg = dynamic_cast< HSHomeObject::HS_PG* >(_obj_inst->_pg_map[pg_id].get()); + ASSERT_NE(hs_pg, nullptr); + + // Manually corrupt statistics to simulate desync + hs_pg->durable_entities_update([](auto& de) { + de.active_blob_count.store(999, std::memory_order_relaxed); + de.tombstone_blob_count.store(888, std::memory_order_relaxed); + de.total_occupied_blk_count.store(777, std::memory_order_relaxed); + }); + LOGINFO("Corrupted statistics: active=999, tombstone=888, occupied=777"); + + // Call refresh_pg_statistics + _obj_inst->refresh_pg_statistics(pg_id); + + // Verify statistics are corrected (no direct access to durable_entities_) + // Use PG stats API to verify + PGStats pg_stats; + auto res = _obj_inst->pg_manager()->get_stats(pg_id, pg_stats); + ASSERT_TRUE(res); + LOGINFO("Statistics after refresh: stats={}", pg_stats.to_string()); + + // Verify counts through stats API + EXPECT_EQ(pg_stats.num_active_objects, num_active_blobs - num_tombstones) + << "Active blob count should be " << (num_active_blobs - num_tombstones); + EXPECT_EQ(pg_stats.num_tombstone_objects, num_tombstones) << "Tombstone blob count should be " << num_tombstones; + EXPECT_GT(pg_stats.used_bytes, 0) << "Used bytes should be greater than 0"; + + uint64_t used_bytes_after = pg_stats.used_bytes; + + // Test refresh_pg_statistics after restart (log replay scenario) + LOGINFO("Testing statistics refresh after restart"); + restart(); + + // Get PG after restart + ASSERT_TRUE(_obj_inst->_pg_map.find(pg_id) != _obj_inst->_pg_map.end()); + hs_pg = dynamic_cast< HSHomeObject::HS_PG* >(_obj_inst->_pg_map[pg_id].get()); + ASSERT_NE(hs_pg, nullptr); + + // Statistics should be preserved after restart + PGStats pg_stats_restart; + res = _obj_inst->pg_manager()->get_stats(pg_id, pg_stats_restart); + ASSERT_TRUE(res); + LOGINFO("Statistics after restart: stats={}", pg_stats_restart.to_string()); + + EXPECT_EQ(pg_stats_restart.num_active_objects, num_active_blobs - num_tombstones) + << "Active blob count should be preserved after restart"; + EXPECT_EQ(pg_stats_restart.num_tombstone_objects, num_tombstones) + << "Tombstone blob count should be preserved after restart"; + EXPECT_EQ(pg_stats_restart.used_bytes, used_bytes_after) << "Used bytes should be preserved after restart"; +} \ No newline at end of file