Skip to content

Commit e6e4ff8

Browse files
committed
[scrubber] phase1: add scrub manager
Add comprehensive scrub infrastructure to detect data corruption and inconsistencies across replicas in HomeObject. This is phase 1 of the scrubber implementation. - Implements deep and shallow scrubbing for PG metadata, shards, and blobs - Supports periodic and manual scrub triggering modes - Uses priority queue (MPMCPriorityQueue) for scrub task scheduling - Persists scrub metadata using superblocks to track last scrub times - Coordinates scrub operations across all replicas in a PG 1. **Deep Scrub**: Full data integrity verification - PG metadata validation - Shard existence and consistency checks - Blob hash verification (reads data and computes checksums) - Detects corrupted, missing, and inconsistent data across replicas 2. **Shallow Scrub**: Lightweight metadata-only verification - Shard existence checks - Blob index validation (no data reads) - Faster execution for routine checks - FlatBuffer-based serialization for scrub requests and responses - Leader sends scrub requests to all replicas - Followers return scrub maps with their local state - Retry logic with configurable timeouts for reliability - **ShallowScrubReport**: Tracks missing shards and blobs per peer - **DeepScrubReport**: Extends shallow report with: - Corrupted blobs/shards with error details - Inconsistent blobs (different hashes across replicas) - Corrupted PG metadata - Scrubs data in configurable ranges to avoid timeouts - Shard range: 2M shards per request - Blob range: Based on HDD IOPS for deep scrub, 2M for shallow - Early cancellation support for graceful shutdown 1. **DeepScrubTest**: Verifies detection of: - Missing blobs on followers - Missing shards on followers - Corrupted blob data (IO errors) - Inconsistent blob hashes across replicas 2. **MPMCPriorityQueue Tests**: Lock-free queue validation - Concurrent push/pop operations - Priority ordering verification - Thread safety under contention
1 parent d1392ce commit e6e4ff8

33 files changed

+4119
-94
lines changed

CHANGELOG.md

Lines changed: 0 additions & 13 deletions
This file was deleted.

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
class HomeObjectConan(ConanFile):
1212
name = "homeobject"
13-
version = "4.1.0"
13+
version = "4.2.0"
1414

1515
homepage = "https://github.com/eBay/HomeObject"
1616
description = "Blob Store built on HomeStore"

src/include/homeobject/common.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
SISL_LOGGING_DECL(homeobject);
1616

17-
#define HOMEOBJECT_LOG_MODS homeobject, blobmgr, shardmgr, gcmgr
17+
#define HOMEOBJECT_LOG_MODS homeobject, blobmgr, shardmgr, gcmgr, scrubmgr
1818

1919
#ifndef Ki
2020
constexpr uint64_t Ki = 1024ul;

src/lib/homeobject_impl.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ struct PG {
7070
std::atomic< bool > is_dirty_{false};
7171
ShardPtrList shards_;
7272

73+
blob_id_t get_last_blob_id() const { return durable_entities_.blob_sequence_num; }
74+
7375
void durable_entities_update(auto&& cb, bool dirty = true) {
7476
cb(durable_entities_);
7577
if (dirty) { is_dirty_.store(true, std::memory_order_relaxed); }

src/lib/homestore_backend/CMakeLists.txt

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
3030
hs_cp_callbacks.cpp
3131
hs_http_manager.cpp
3232
gc_manager.cpp
33+
scrub_manager.cpp
34+
MPMCPriorityQueue.hpp
3335
$<TARGET_OBJECTS:${PROJECT_NAME}_core>
3436
)
3537
target_link_libraries("${PROJECT_NAME}_homestore" PUBLIC
@@ -42,10 +44,19 @@ settings_gen_cpp(
4244
${FLATBUFFERS_FLATC_EXECUTABLE}
4345
${CMAKE_CURRENT_BINARY_DIR}/generated/
4446
"${PROJECT_NAME}_homestore"
45-
hs_backend_config.fbs
46-
resync_pg_data.fbs
47-
resync_shard_data.fbs
48-
resync_blob_data.fbs
47+
hs_homeobject_fbs/hs_backend_config.fbs
48+
hs_homeobject_fbs/resync_pg_data.fbs
49+
hs_homeobject_fbs/resync_shard_data.fbs
50+
hs_homeobject_fbs/resync_blob_data.fbs
51+
hs_homeobject_fbs/deep_blob_scrub_map.fbs
52+
hs_homeobject_fbs/shallow_blob_scrub_map.fbs
53+
hs_homeobject_fbs/blob_scrub_req.fbs
54+
hs_homeobject_fbs/shard_scrub_req.fbs
55+
hs_homeobject_fbs/deep_shard_scrub_map.fbs
56+
hs_homeobject_fbs/shallow_shard_scrub_map.fbs
57+
hs_homeobject_fbs/pg_meta_scrub_req.fbs
58+
hs_homeobject_fbs/pg_meta_scrub_map.fbs
59+
hs_homeobject_fbs/scrub_common.fbs
4960
)
5061

5162
# Unit test objects
@@ -155,3 +166,11 @@ add_test(NAME HomestoreTestGC COMMAND homestore_test_gc -csv error --executor im
155166
--override_config hs_backend_config.gc_garbage_rate_threshold=0
156167
--override_config hs_backend_config.gc_scan_interval_sec=5)
157168

169+
add_executable(homestore_test_scrubber)
170+
target_sources(homestore_test_scrubber PRIVATE $<TARGET_OBJECTS:homestore_tests_scrubber>)
171+
target_link_libraries(homestore_test_scrubber PUBLIC homeobject_homestore ${COMMON_TEST_DEPS})
172+
add_test(NAME HomestoreTestScrubber COMMAND homestore_test_scrubber -csv error --executor immediate --config_path ./
173+
--override_config hs_backend_config.enable_scrubber=true
174+
--override_config nuraft_mesg_config.mesg_factory_config.data_request_deadline_secs:10)
175+
176+
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
#pragma once
2+
3+
#include <condition_variable>
4+
#include <concepts>
5+
#include <cstddef>
6+
#include <functional>
7+
#include <mutex>
8+
#include <optional>
9+
#include <queue>
10+
#include <utility>
11+
#include <vector>
12+
13+
namespace homeobject {
14+
15+
/**
16+
* @brief Multi-Producer Multi-Consumer Priority Queue (C++20)
17+
*
18+
* Thread-safe priority queue that supports:
19+
* - Concurrent push operations from multiple producers
20+
* - Concurrent pop operations from multiple consumers
21+
* - Blocking pop when queue is empty
22+
* - Graceful shutdown via close() method
23+
*
24+
* @tparam T Element type (must be comparable)
25+
* @tparam Compare Comparison function (default: std::less for max-heap)
26+
*/
27+
template < typename T, typename Compare = std::less< T > >
28+
requires std::regular< T > && std::predicate< Compare, T, T >
29+
class MPMCPriorityQueue {
30+
public:
31+
using value_type = T;
32+
using size_type = std::size_t;
33+
using comparator_type = Compare;
34+
35+
/**
36+
* @brief Status codes returned by pop operations
37+
*/
38+
enum class Status : uint8_t {
39+
Ok, ///< Successfully popped an element
40+
Closed ///< Queue is closed, no more elements available
41+
};
42+
43+
/**
44+
* @brief Result of a pop operation
45+
*/
46+
struct PopResult {
47+
Status status;
48+
std::optional< T > value; ///< Has value only if status == Ok
49+
50+
// Convenience methods
51+
[[nodiscard]] constexpr bool is_ok() const noexcept { return status == Status::Ok; }
52+
[[nodiscard]] constexpr bool is_closed() const noexcept { return status == Status::Closed; }
53+
};
54+
55+
/**
56+
* @brief Construct an empty priority queue
57+
*/
58+
constexpr MPMCPriorityQueue() noexcept(std::is_nothrow_default_constructible_v< Compare >) = default;
59+
60+
/**
61+
* @brief Destructor - automatically closes the queue
62+
*/
63+
~MPMCPriorityQueue() { close(); }
64+
65+
// Disable copy and move to prevent issues with condition variables
66+
MPMCPriorityQueue(const MPMCPriorityQueue&) = delete;
67+
MPMCPriorityQueue& operator=(const MPMCPriorityQueue&) = delete;
68+
MPMCPriorityQueue(MPMCPriorityQueue&&) = delete;
69+
MPMCPriorityQueue& operator=(MPMCPriorityQueue&&) = delete;
70+
71+
/**
72+
* @brief Thread-safe push operation (copy)
73+
*
74+
* @param value Element to insert
75+
* @note No-op if queue is closed
76+
*/
77+
void push(const T& value) {
78+
{
79+
std::scoped_lock lock(mutex_);
80+
if (closed_) [[unlikely]] {
81+
return; // Silently ignore pushes to closed queue
82+
}
83+
pq_.push(value);
84+
}
85+
cv_.notify_one(); // Wake one waiting consumer
86+
}
87+
88+
/**
89+
* @brief Thread-safe push operation (move)
90+
*
91+
* @param value Element to insert (will be moved)
92+
* @note No-op if queue is closed
93+
*/
94+
void push(T&& value) {
95+
{
96+
std::scoped_lock lock(mutex_);
97+
if (closed_) [[unlikely]] { return; }
98+
pq_.push(std::move(value));
99+
}
100+
cv_.notify_one();
101+
}
102+
103+
/**
104+
* @brief Thread-safe pop operation
105+
*
106+
* Blocks if queue is empty and not closed.
107+
* Returns immediately if queue is closed.
108+
*
109+
* @return PopResult containing status and optional value
110+
* @note Thread-safe for multiple concurrent consumers
111+
*/
112+
[[nodiscard]] PopResult pop() {
113+
std::unique_lock lock(mutex_);
114+
115+
// Wait until queue has elements or is closed
116+
cv_.wait(lock, [this] { return closed_ || !pq_.empty(); });
117+
118+
// Try to pop an element
119+
if (!pq_.empty()) {
120+
T top = std::move(const_cast< T& >(pq_.top()));
121+
pq_.pop();
122+
return PopResult{.status = Status::Ok, .value = std::move(top)};
123+
}
124+
125+
// Queue is empty and closed
126+
return PopResult{.status = Status::Closed, .value = std::nullopt};
127+
}
128+
129+
/**
130+
* @brief Close the queue
131+
*
132+
* After calling close():
133+
* - All blocked pop() calls will wake up
134+
* - Existing elements can still be popped
135+
* - New push() calls will be ignored
136+
* - pop() returns Status::Closed when queue becomes empty
137+
*
138+
* @note Thread-safe and idempotent
139+
*/
140+
void close() noexcept {
141+
{
142+
std::scoped_lock lock(mutex_);
143+
closed_ = true;
144+
}
145+
cv_.notify_all(); // Wake all waiting consumers
146+
}
147+
148+
/**
149+
* @brief Get current number of elements
150+
*
151+
* @return Number of elements in the queue
152+
* @note Thread-safe
153+
*/
154+
[[nodiscard]] size_type size() const {
155+
std::scoped_lock lock(mutex_);
156+
return pq_.size();
157+
}
158+
159+
/**
160+
* @brief Check if queue is empty
161+
*
162+
* @return true if queue has no elements
163+
* @note Thread-safe
164+
*/
165+
[[nodiscard]] bool empty() const {
166+
std::scoped_lock lock(mutex_);
167+
return pq_.empty();
168+
}
169+
170+
/**
171+
* @brief Check if queue is closed
172+
*
173+
* @return true if close() has been called
174+
* @note Thread-safe
175+
*/
176+
[[nodiscard]] bool is_closed() const {
177+
std::scoped_lock lock(mutex_);
178+
return closed_;
179+
}
180+
181+
private:
182+
mutable std::mutex mutex_;
183+
std::condition_variable cv_;
184+
bool closed_{false};
185+
std::priority_queue< T, std::vector< T >, Compare > pq_;
186+
};
187+
188+
} // namespace homeobject

src/lib/homestore_backend/gc_manager.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ SISL_LOGGING_DECL(gcmgr)
2525
GCManager::GCManager(HSHomeObject* homeobject) :
2626
m_chunk_selector{homeobject->chunk_selector()}, m_hs_home_object{homeobject} {
2727
homestore::meta_service().register_handler(
28-
_gc_actor_meta_name,
28+
gc_actor_meta_name,
2929
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
3030
on_gc_actor_meta_blk_found(std::move(buf), voidptr_cast(mblk));
3131
},
3232
nullptr, true);
3333

3434
homestore::meta_service().register_handler(
35-
_gc_reserved_chunk_meta_name,
35+
gc_reserved_chunk_meta_name,
3636
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
3737
on_reserved_chunk_meta_blk_found(std::move(buf), voidptr_cast(mblk));
3838
},
@@ -44,7 +44,7 @@ GCManager::GCManager(HSHomeObject* homeobject) :
4444
true);
4545

4646
homestore::meta_service().register_handler(
47-
_gc_task_meta_name,
47+
gc_task_meta_name,
4848
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
4949
on_gc_task_meta_blk_found(std::move(buf), voidptr_cast(mblk));
5050
},
@@ -64,7 +64,7 @@ void GCManager::on_gc_task_meta_blk_found(sisl::byte_view const& buf, void* meta
6464
// here, we are under the protection of the lock of metaservice. however, we will also try to update pg and shard
6565
// metablk and then destroy the gc_task_sb, which will also try to acquire the lock of metaservice, as a result, a
6666
// dead lock will happen. so here we will handle all the gc tasks after read all the metablks
67-
m_recovered_gc_tasks.emplace_back(_gc_task_meta_name);
67+
m_recovered_gc_tasks.emplace_back(gc_task_meta_name);
6868
m_recovered_gc_tasks.back().load(buf, meta_cookie);
6969
}
7070

@@ -89,7 +89,7 @@ void GCManager::handle_all_recovered_gc_tasks() {
8989
}
9090

9191
void GCManager::on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
92-
m_gc_actor_sbs.emplace_back(_gc_actor_meta_name);
92+
m_gc_actor_sbs.emplace_back(gc_actor_meta_name);
9393
auto& gc_actor_sb = m_gc_actor_sbs.back();
9494
gc_actor_sb.load(buf, meta_cookie);
9595
auto pdev_id = gc_actor_sb->pdev_id;
@@ -100,7 +100,7 @@ void GCManager::on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* met
100100
}
101101

102102
void GCManager::on_reserved_chunk_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
103-
homestore::superblk< gc_reserved_chunk_superblk > reserved_chunk_sb(_gc_reserved_chunk_meta_name);
103+
homestore::superblk< gc_reserved_chunk_superblk > reserved_chunk_sb(gc_reserved_chunk_meta_name);
104104
auto chunk_id = reserved_chunk_sb.load(buf, meta_cookie)->chunk_id;
105105
auto EXVchunk = m_chunk_selector->get_extend_vchunk(chunk_id);
106106
if (EXVchunk == nullptr) {
@@ -968,7 +968,7 @@ bool GCManager::pdev_gc_actor::copy_valid_data(
968968

969969
if (err) {
970970
// we will come here if:
971-
// 1 any blob copy fails, then err is operation_canceled
971+
// 1 any blob copy fails, then err is operation_cancelled
972972
// 2 write footer fails, then err is the error code of write footer
973973
GCLOGE(task_id, pg_id, shard_id,
974974
"Failed to copy some blos or failed to write shard footer for move_to_chunk={}, "
@@ -1253,7 +1253,7 @@ void GCManager::pdev_gc_actor::process_gc_task(chunk_id_t move_from_chunk, uint8
12531253

12541254
// after data copy, we persist the gc task meta blk. now, we can make sure all the valid blobs are successfully
12551255
// copyed and new blob indexes have be written to gc index table before gc task superblk is persisted.
1256-
homestore::superblk< GCManager::gc_task_superblk > gc_task_sb{GCManager::_gc_task_meta_name};
1256+
homestore::superblk< GCManager::gc_task_superblk > gc_task_sb{GCManager::gc_task_meta_name};
12571257
gc_task_sb.create(sizeof(GCManager::gc_task_superblk));
12581258
gc_task_sb->move_from_chunk = move_from_chunk;
12591259
gc_task_sb->move_to_chunk = move_to_chunk;

src/lib/homestore_backend/gc_manager.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ class GCManager {
4646
GCManager& operator=(GCManager&&) = delete;
4747

4848
public:
49-
inline static auto const _gc_actor_meta_name = std::string("GCActor");
50-
inline static auto const _gc_task_meta_name = std::string("GCTask");
51-
inline static auto const _gc_reserved_chunk_meta_name = std::string("GCReservedChunk");
49+
inline static auto const gc_actor_meta_name = std::string("GCActor");
50+
inline static auto const gc_task_meta_name = std::string("GCTask");
51+
inline static auto const gc_reserved_chunk_meta_name = std::string("GCReservedChunk");
5252
inline static atomic_uint64_t _gc_task_id{1}; // 0 is used for crash recovery
5353

5454
#pragma pack(1)
@@ -61,7 +61,7 @@ class GCManager {
6161
uint64_t failed_egc_task_count{0ull};
6262
uint64_t total_reclaimed_blk_count_by_gc{0ull};
6363
uint64_t total_reclaimed_blk_count_by_egc{0ull};
64-
static std::string name() { return _gc_actor_meta_name; }
64+
static std::string name() { return gc_actor_meta_name; }
6565
};
6666

6767
struct gc_task_superblk {
@@ -70,12 +70,12 @@ class GCManager {
7070
chunk_id_t vchunk_id;
7171
pg_id_t pg_id;
7272
uint8_t priority;
73-
static std::string name() { return _gc_task_meta_name; }
73+
static std::string name() { return gc_task_meta_name; }
7474
};
7575

7676
struct gc_reserved_chunk_superblk {
7777
chunk_id_t chunk_id;
78-
static std::string name() { return _gc_reserved_chunk_meta_name; }
78+
static std::string name() { return gc_reserved_chunk_meta_name; }
7979
};
8080
#pragma pack()
8181

0 commit comments

Comments
 (0)