Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "4.1.1"
version = "4.1.2"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand Down
15 changes: 7 additions & 8 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,12 +576,10 @@ class HSHomeObject : public HomeObjectImpl {
REGISTER_COUNTER(snp_dnr_error_count, "Error times when reading blobs in baseline resync");
REGISTER_HISTOGRAM(snp_dnr_blob_process_latency,
"Time cost(us) of successfully process a blob in baseline resync",
HistogramBucketsType(LowResolutionLatecyBuckets),
_publish_as::publish_as_sum_count);
HistogramBucketsType(LowResolutionLatecyBuckets), _publish_as::publish_as_sum_count);
REGISTER_HISTOGRAM(snp_dnr_batch_process_latency,
"Time cost(ms) of successfully process a batch in baseline resync",
HistogramBucketsType(LowResolutionLatecyBuckets),
_publish_as::publish_as_sum_count);
HistogramBucketsType(LowResolutionLatecyBuckets), _publish_as::publish_as_sum_count);
REGISTER_HISTOGRAM(snp_dnr_batch_e2e_latency,
"Time cost(ms) of a batch end-to-end round trip in baseline resync",
HistogramBucketsType(LowResolutionLatecyBuckets));
Expand Down Expand Up @@ -684,12 +682,10 @@ class HSHomeObject : public HomeObjectImpl {
REGISTER_GAUGE(snp_rcvr_error_count, "Error count in baseline resync");
REGISTER_HISTOGRAM(snp_rcvr_blob_process_time,
"Time cost(us) of successfully process a blob in baseline resync",
HistogramBucketsType(LowResolutionLatecyBuckets),
_publish_as::publish_as_sum_count);
HistogramBucketsType(LowResolutionLatecyBuckets), _publish_as::publish_as_sum_count);
REGISTER_HISTOGRAM(snp_rcvr_batch_process_time,
"Time cost(ms) of successfully process a batch in baseline resync",
HistogramBucketsType(LowResolutionLatecyBuckets),
_publish_as::publish_as_sum_count);
HistogramBucketsType(LowResolutionLatecyBuckets), _publish_as::publish_as_sum_count);

attach_gather_cb(std::bind(&ReceiverSnapshotMetrics::on_gather, this));
register_me_to_farm();
Expand Down Expand Up @@ -1059,6 +1055,9 @@ class HSHomeObject : public HomeObjectImpl {

BlobManager::Result< std::vector< BlobInfo > > get_shard_blobs(shard_id_t shard_id);

// Refresh PG statistics (called after log replay)
void refresh_pg_statistics(pg_id_t pg_id);

private:
BlobManager::Result< std::string > do_verify_blob(const void* blob, shard_id_t expected_shard_id,
blob_id_t expected_blob_id = 0) const;
Expand Down
93 changes: 81 additions & 12 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,10 @@ void HSHomeObject::on_pg_complete_replace_member(group_id_t group_id, const std:
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
}

//This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists
// This function actually perform rollback for replace member task: Remove in_member, and ensure out_member exists
void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const std::string& task_id,
const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) {
const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) {
std::unique_lock lck(_pg_lock);
for (const auto& iter : _pg_map) {
auto& pg = iter.second;
Expand All @@ -421,13 +421,13 @@ void HSHomeObject::on_pg_clean_replace_member_task(group_id_t group_id, const st

LOGI("PG clean replace member task done (rollback), task_id={}, removed in_member={} (removed={}), "
"ensured out_member={} (inserted={}), member_nums={}, trace_id={}",
task_id, boost::uuids::to_string(member_in.id), removed_count,
boost::uuids::to_string(member_out.id), inserted, pg->pg_info_.members.size(), tid);
task_id, boost::uuids::to_string(member_in.id), removed_count, boost::uuids::to_string(member_out.id),
inserted, pg->pg_info_.members.size(), tid);
return;
}
}
LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}", task_id,
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
LOGE("PG clean replace member task failed, pg not found, task_id={}, member_out={}, member_in={}, trace_id={}",
task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id), tid);
}

bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
Expand Down Expand Up @@ -457,7 +457,8 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
// New member not in our records, add with default name
PGMember new_member(member_id);
new_members.insert(std::move(new_member));
LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id), hs_pg->pg_info_.id);
LOGE("Adding new member {} to pg={} membership, should not happen!", boost::uuids::to_string(member_id),
hs_pg->pg_info_.id);
}
}
// Check if membership changed
Expand All @@ -471,9 +472,7 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
std::vector< peer_id_t > added_members;

for (auto& old_member : hs_pg->pg_info_.members) {
if (new_members.find(old_member) == new_members.end()) {
removed_members.push_back(old_member.id);
}
if (new_members.find(old_member) == new_members.end()) { removed_members.push_back(old_member.id); }
}

for (auto& new_member : new_members) {
Expand All @@ -482,7 +481,8 @@ bool HSHomeObject::reconcile_membership(pg_id_t pg_id) {
}
}

LOGI("Reconciling PG {} membership: removing {} members, adding {} members, old membership count {}, new membership count: {}",
LOGI("Reconciling PG {} membership: removing {} members, adding {} members, old membership count {}, new "
"membership count: {}",
pg_id, removed_members.size(), added_members.size(), hs_pg->pg_info_.members.size(), new_members.size());

for (auto& member_id : removed_members) {
Expand Down Expand Up @@ -1245,6 +1245,75 @@ uint32_t HSHomeObject::get_pg_tombstone_blob_count(pg_id_t pg_id) const {
return tombstone_blob_count;
}

void HSHomeObject::refresh_pg_statistics(pg_id_t pg_id) {
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
RELEASE_ASSERT(hs_pg, "Failed to get pg={} for statistics refresh", pg_id);

// Step 1: Scan index table to count active and tombstone blobs in one pass
uint64_t active_count = 0;
uint64_t tombstone_count = 0;

auto start_key =
BlobRouteKey{BlobRoute{uint64_t(pg_id) << homeobject::shard_width, std::numeric_limits< uint64_t >::min()}};
auto end_key =
BlobRouteKey{BlobRoute{uint64_t(pg_id + 1) << homeobject::shard_width, std::numeric_limits< uint64_t >::min()}};

homestore::BtreeQueryRequest< BlobRouteKey > query_req{
homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key),
false /* inclusive */},
homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY,
std::numeric_limits< uint32_t >::max() /* blob count in a pg will not exceed uint32_t_max*/,
[&active_count, &tombstone_count](homestore::BtreeKey const& key,
homestore::BtreeValue const& value) mutable -> bool {
BlobRouteValue blob_value{value};
if (blob_value.pbas() == HSHomeObject::tombstone_pbas) {
tombstone_count++;
} else {
active_count++;
}
return false; // Continue scanning
}};

std::vector< std::pair< BlobRouteKey, BlobRouteValue > > dummy_out;
auto ret = hs_pg->index_table_->query(query_req, dummy_out);
RELEASE_ASSERT(ret == homestore::btree_status_t::success, "Failed to scan index table for pg={}, status={}", pg_id,
ret);

// Step 2: Scan chunks to calculate total occupied blocks
auto chunk_ids = chunk_selector()->get_pg_chunks(pg_id);
RELEASE_ASSERT(chunk_ids, "Failed to get chunks for pg={}", pg_id);

uint64_t total_occupied = 0;
for (const auto& chunk_id : *chunk_ids) {
auto vchunk = chunk_selector()->get_extend_vchunk(chunk_id);
RELEASE_ASSERT(vchunk, "Failed to get vchunk={} for pg={}", chunk_id, pg_id);
total_occupied += vchunk->get_used_blks();
}

// Step 3: Update durable_entities and capture original values for debugging
uint64_t original_active_count = 0;
uint64_t original_tombstone_count = 0;
uint64_t original_occupied_count = 0;

hs_pg->durable_entities_update([active_count, tombstone_count, total_occupied, &original_active_count,
&original_tombstone_count, &original_occupied_count](auto& de) {
// Capture original values
original_active_count = de.active_blob_count.load(std::memory_order_relaxed);
original_tombstone_count = de.tombstone_blob_count.load(std::memory_order_relaxed);
original_occupied_count = de.total_occupied_blk_count.load(std::memory_order_relaxed);

// Update with corrected values
de.active_blob_count.store(active_count, std::memory_order_relaxed);
de.tombstone_blob_count.store(tombstone_count, std::memory_order_relaxed);
de.total_occupied_blk_count.store(total_occupied, std::memory_order_relaxed);
});

LOGI("[corrected] Refreshed statistics for pg={}: active_blobs={} (original={}), tombstone_blobs={} (original={}), "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the [corrected] should not be printed if we didnt correct anything

"occupied_blocks={} (original={})",
pg_id, active_count, original_active_count, tombstone_count, original_tombstone_count, total_occupied,
original_occupied_count);
}

void HSHomeObject::update_pg_meta_after_gc(const pg_id_t pg_id, const homestore::chunk_num_t move_from_chunk,
const homestore::chunk_num_t move_to_chunk, const uint64_t task_id) {
// 1 update pg metrics
Expand Down
4 changes: 4 additions & 0 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,10 @@ void ReplicationStateMachine::on_log_replay_done(const homestore::group_id_t& gr
LOGD("vchunk={} is selected for shard={} in pg={} when recovery", vchunk_id, shard_sb->info.id, pg_id);
}
}

// Refresh PG statistics after log replay
LOGI("Starting statistics refresh for pg={}", pg_id);
home_object_->refresh_pg_statistics(pg_id);
}

} // namespace homeobject
85 changes: 84 additions & 1 deletion src/lib/homestore_backend/tests/hs_pg_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,4 +426,87 @@ TEST_F(HomeObjectFixture, CreatePGFailed) {
restart();
ASSERT_TRUE(pg_exist(pg_id));
}
#endif
#endif

TEST_F(HomeObjectFixture, PGRefreshStatisticsTest) {
LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num());
g_helper->sync();

// Create a pg and shard
pg_id_t pg_id{1};
create_pg(pg_id);
auto shard_info = create_shard(pg_id, 64 * Mi, "shard meta");
auto shard_id = shard_info.id;
auto s = _obj_inst->shard_manager()->get_shard(shard_id).get();
ASSERT_TRUE(!!s);
LOGINFO("Created shard {}", shard_info.id);

// Put some blobs to populate statistics using put_blobs
const uint32_t num_active_blobs = 10;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have to deal with pagination by ourselves, the number should be large enough to ensure b-tree do have multi pages.

std::map< pg_id_t, blob_id_t > pg_blob_id;
pg_blob_id[pg_id] = 0;
std::map< pg_id_t, std::vector< shard_id_t > > pg_shard_id_vec;
pg_shard_id_vec[pg_id] = {shard_id};

auto shard_blob_map = put_blobs(pg_shard_id_vec, num_active_blobs, pg_blob_id);
LOGINFO("Put {} active blobs", num_active_blobs);

// Delete some blobs to create tombstones
const uint32_t num_tombstones = 3;
for (uint32_t i = 0; i < num_tombstones; ++i) {
del_blob(pg_id, shard_id, i);
}
LOGINFO("Created {} tombstone blobs", num_tombstones);

// Get PG from _pg_map
ASSERT_TRUE(_obj_inst->_pg_map.find(pg_id) != _obj_inst->_pg_map.end());
auto hs_pg = dynamic_cast< HSHomeObject::HS_PG* >(_obj_inst->_pg_map[pg_id].get());
ASSERT_NE(hs_pg, nullptr);

// Manually corrupt statistics to simulate desync
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/desync/inconsistency

hs_pg->durable_entities_update([](auto& de) {
de.active_blob_count.store(999, std::memory_order_relaxed);
de.tombstone_blob_count.store(888, std::memory_order_relaxed);
de.total_occupied_blk_count.store(777, std::memory_order_relaxed);
});
LOGINFO("Corrupted statistics: active=999, tombstone=888, occupied=777");

// Call refresh_pg_statistics
_obj_inst->refresh_pg_statistics(pg_id);

// Verify statistics are corrected (no direct access to durable_entities_)
// Use PG stats API to verify
PGStats pg_stats;
auto res = _obj_inst->pg_manager()->get_stats(pg_id, pg_stats);
ASSERT_TRUE(res);
LOGINFO("Statistics after refresh: stats={}", pg_stats.to_string());

// Verify counts through stats API
EXPECT_EQ(pg_stats.num_active_objects, num_active_blobs - num_tombstones)
<< "Active blob count should be " << (num_active_blobs - num_tombstones);
EXPECT_EQ(pg_stats.num_tombstone_objects, num_tombstones) << "Tombstone blob count should be " << num_tombstones;
EXPECT_GT(pg_stats.used_bytes, 0) << "Used bytes should be greater than 0";

uint64_t used_bytes_after = pg_stats.used_bytes;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to corrupt one more time here

// Test refresh_pg_statistics after restart (log replay scenario)
LOGINFO("Testing statistics refresh after restart");
restart();

// Get PG after restart
ASSERT_TRUE(_obj_inst->_pg_map.find(pg_id) != _obj_inst->_pg_map.end());
hs_pg = dynamic_cast< HSHomeObject::HS_PG* >(_obj_inst->_pg_map[pg_id].get());
ASSERT_NE(hs_pg, nullptr);

// Statistics should be preserved after restart
PGStats pg_stats_restart;
res = _obj_inst->pg_manager()->get_stats(pg_id, pg_stats_restart);
ASSERT_TRUE(res);
LOGINFO("Statistics after restart: stats={}", pg_stats_restart.to_string());

EXPECT_EQ(pg_stats_restart.num_active_objects, num_active_blobs - num_tombstones)
<< "Active blob count should be preserved after restart";
EXPECT_EQ(pg_stats_restart.num_tombstone_objects, num_tombstones)
<< "Tombstone blob count should be preserved after restart";
EXPECT_EQ(pg_stats_restart.used_bytes, used_bytes_after) << "Used bytes should be preserved after restart";
}
Loading