Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions CHANGELOG.md

This file was deleted.

2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "4.1.1"
version = "4.2.0"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homeobject/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

SISL_LOGGING_DECL(homeobject);

#define HOMEOBJECT_LOG_MODS homeobject, blobmgr, shardmgr, gcmgr
#define HOMEOBJECT_LOG_MODS homeobject, blobmgr, shardmgr, gcmgr, scrubmgr

#ifndef Ki
constexpr uint64_t Ki = 1024ul;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ struct PG {
std::atomic< bool > is_dirty_{false};
ShardPtrList shards_;

blob_id_t get_last_blob_id() const { return durable_entities_.blob_sequence_num; }

void durable_entities_update(auto&& cb, bool dirty = true) {
cb(durable_entities_);
if (dirty) { is_dirty_.store(true, std::memory_order_relaxed); }
Expand Down
27 changes: 23 additions & 4 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
hs_cp_callbacks.cpp
hs_http_manager.cpp
gc_manager.cpp
scrub_manager.cpp
MPMCPriorityQueue.hpp
$<TARGET_OBJECTS:${PROJECT_NAME}_core>
)
target_link_libraries("${PROJECT_NAME}_homestore" PUBLIC
Expand All @@ -42,10 +44,19 @@ settings_gen_cpp(
${FLATBUFFERS_FLATC_EXECUTABLE}
${CMAKE_CURRENT_BINARY_DIR}/generated/
"${PROJECT_NAME}_homestore"
hs_backend_config.fbs
resync_pg_data.fbs
resync_shard_data.fbs
resync_blob_data.fbs
hs_homeobject_fbs/hs_backend_config.fbs
hs_homeobject_fbs/resync_pg_data.fbs
hs_homeobject_fbs/resync_shard_data.fbs
hs_homeobject_fbs/resync_blob_data.fbs
hs_homeobject_fbs/deep_blob_scrub_map.fbs
hs_homeobject_fbs/shallow_blob_scrub_map.fbs
hs_homeobject_fbs/blob_scrub_req.fbs
hs_homeobject_fbs/shard_scrub_req.fbs
hs_homeobject_fbs/deep_shard_scrub_map.fbs
hs_homeobject_fbs/shallow_shard_scrub_map.fbs
hs_homeobject_fbs/pg_meta_scrub_req.fbs
hs_homeobject_fbs/pg_meta_scrub_map.fbs
hs_homeobject_fbs/scrub_common.fbs
)

# Unit test objects
Expand Down Expand Up @@ -155,3 +166,11 @@ add_test(NAME HomestoreTestGC COMMAND homestore_test_gc -csv error --executor im
--override_config hs_backend_config.gc_garbage_rate_threshold=0
--override_config hs_backend_config.gc_scan_interval_sec=5)

add_executable(homestore_test_scrubber)
target_sources(homestore_test_scrubber PRIVATE $<TARGET_OBJECTS:homestore_tests_scrubber>)
target_link_libraries(homestore_test_scrubber PUBLIC homeobject_homestore ${COMMON_TEST_DEPS})
add_test(NAME HomestoreTestScrubber COMMAND homestore_test_scrubber -csv error --executor immediate --config_path ./
--override_config hs_backend_config.enable_scrubber=true
--override_config nuraft_mesg_config.mesg_factory_config.data_request_deadline_secs:10)


188 changes: 188 additions & 0 deletions src/lib/homestore_backend/MPMCPriorityQueue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
#pragma once

#include <condition_variable>
#include <concepts>
#include <cstddef>
#include <functional>
#include <mutex>
#include <optional>
#include <queue>
#include <utility>
#include <vector>

namespace homeobject {

/**
* @brief Multi-Producer Multi-Consumer Priority Queue (C++20)
*
* Thread-safe priority queue that supports:
* - Concurrent push operations from multiple producers
* - Concurrent pop operations from multiple consumers
* - Blocking pop when queue is empty
* - Graceful shutdown via close() method
*
* @tparam T Element type (must be comparable)
* @tparam Compare Comparison function (default: std::less for max-heap)
*/
template < typename T, typename Compare = std::less< T > >
requires std::regular< T > && std::predicate< Compare, T, T >
class MPMCPriorityQueue {
public:
using value_type = T;
using size_type = std::size_t;
using comparator_type = Compare;

/**
* @brief Status codes returned by pop operations
*/
enum class Status : uint8_t {
Ok, ///< Successfully popped an element
Closed ///< Queue is closed, no more elements available
};

/**
* @brief Result of a pop operation
*/
struct PopResult {
Status status;
std::optional< T > value; ///< Has value only if status == Ok

// Convenience methods
[[nodiscard]] constexpr bool is_ok() const noexcept { return status == Status::Ok; }
[[nodiscard]] constexpr bool is_closed() const noexcept { return status == Status::Closed; }
};

/**
* @brief Construct an empty priority queue
*/
constexpr MPMCPriorityQueue() noexcept(std::is_nothrow_default_constructible_v< Compare >) = default;

/**
* @brief Destructor - automatically closes the queue
*/
~MPMCPriorityQueue() { close(); }

// Disable copy and move to prevent issues with condition variables
MPMCPriorityQueue(const MPMCPriorityQueue&) = delete;
MPMCPriorityQueue& operator=(const MPMCPriorityQueue&) = delete;
MPMCPriorityQueue(MPMCPriorityQueue&&) = delete;
MPMCPriorityQueue& operator=(MPMCPriorityQueue&&) = delete;

/**
* @brief Thread-safe push operation (copy)
*
* @param value Element to insert
* @note No-op if queue is closed
*/
void push(const T& value) {
{
std::scoped_lock lock(mutex_);
if (closed_) [[unlikely]] {
return; // Silently ignore pushes to closed queue
}
pq_.push(value);
}
cv_.notify_one(); // Wake one waiting consumer
}

/**
* @brief Thread-safe push operation (move)
*
* @param value Element to insert (will be moved)
* @note No-op if queue is closed
*/
void push(T&& value) {
{
std::scoped_lock lock(mutex_);
if (closed_) [[unlikely]] { return; }
pq_.push(std::move(value));
}
cv_.notify_one();
}

/**
* @brief Thread-safe pop operation
*
* Blocks if queue is empty and not closed.
* Returns immediately if queue is closed.
*
* @return PopResult containing status and optional value
* @note Thread-safe for multiple concurrent consumers
*/
[[nodiscard]] PopResult pop() {
std::unique_lock lock(mutex_);

// Wait until queue has elements or is closed
cv_.wait(lock, [this] { return closed_ || !pq_.empty(); });

// Try to pop an element
if (!pq_.empty()) {
T top = std::move(const_cast< T& >(pq_.top()));
pq_.pop();
return PopResult{.status = Status::Ok, .value = std::move(top)};
}

// Queue is empty and closed
return PopResult{.status = Status::Closed, .value = std::nullopt};
}

/**
* @brief Close the queue
*
* After calling close():
* - All blocked pop() calls will wake up
* - Existing elements can still be popped
* - New push() calls will be ignored
* - pop() returns Status::Closed when queue becomes empty
*
* @note Thread-safe and idempotent
*/
void close() noexcept {
{
std::scoped_lock lock(mutex_);
closed_ = true;
}
cv_.notify_all(); // Wake all waiting consumers
}

/**
* @brief Get current number of elements
*
* @return Number of elements in the queue
* @note Thread-safe
*/
[[nodiscard]] size_type size() const {
std::scoped_lock lock(mutex_);
return pq_.size();
}

/**
* @brief Check if queue is empty
*
* @return true if queue has no elements
* @note Thread-safe
*/
[[nodiscard]] bool empty() const {
std::scoped_lock lock(mutex_);
return pq_.empty();
}

/**
* @brief Check if queue is closed
*
* @return true if close() has been called
* @note Thread-safe
*/
[[nodiscard]] bool is_closed() const {
std::scoped_lock lock(mutex_);
return closed_;
}

private:
mutable std::mutex mutex_;
std::condition_variable cv_;
bool closed_{false};
std::priority_queue< T, std::vector< T >, Compare > pq_;
};

} // namespace homeobject
16 changes: 8 additions & 8 deletions src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ SISL_LOGGING_DECL(gcmgr)
GCManager::GCManager(HSHomeObject* homeobject) :
m_chunk_selector{homeobject->chunk_selector()}, m_hs_home_object{homeobject} {
homestore::meta_service().register_handler(
_gc_actor_meta_name,
gc_actor_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_gc_actor_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr, true);

homestore::meta_service().register_handler(
_gc_reserved_chunk_meta_name,
gc_reserved_chunk_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_reserved_chunk_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
Expand All @@ -44,7 +44,7 @@ GCManager::GCManager(HSHomeObject* homeobject) :
true);

homestore::meta_service().register_handler(
_gc_task_meta_name,
gc_task_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_gc_task_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
Expand All @@ -64,7 +64,7 @@ void GCManager::on_gc_task_meta_blk_found(sisl::byte_view const& buf, void* meta
// here, we are under the protection of the lock of metaservice. however, we will also try to update pg and shard
// metablk and then destroy the gc_task_sb, which will also try to acquire the lock of metaservice, as a result, a
// dead lock will happen. so here we will handle all the gc tasks after read all the metablks
m_recovered_gc_tasks.emplace_back(_gc_task_meta_name);
m_recovered_gc_tasks.emplace_back(gc_task_meta_name);
m_recovered_gc_tasks.back().load(buf, meta_cookie);
}

Expand All @@ -89,7 +89,7 @@ void GCManager::handle_all_recovered_gc_tasks() {
}

void GCManager::on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
m_gc_actor_sbs.emplace_back(_gc_actor_meta_name);
m_gc_actor_sbs.emplace_back(gc_actor_meta_name);
auto& gc_actor_sb = m_gc_actor_sbs.back();
gc_actor_sb.load(buf, meta_cookie);
auto pdev_id = gc_actor_sb->pdev_id;
Expand All @@ -100,7 +100,7 @@ void GCManager::on_gc_actor_meta_blk_found(sisl::byte_view const& buf, void* met
}

void GCManager::on_reserved_chunk_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
homestore::superblk< gc_reserved_chunk_superblk > reserved_chunk_sb(_gc_reserved_chunk_meta_name);
homestore::superblk< gc_reserved_chunk_superblk > reserved_chunk_sb(gc_reserved_chunk_meta_name);
auto chunk_id = reserved_chunk_sb.load(buf, meta_cookie)->chunk_id;
auto EXVchunk = m_chunk_selector->get_extend_vchunk(chunk_id);
if (EXVchunk == nullptr) {
Expand Down Expand Up @@ -968,7 +968,7 @@ bool GCManager::pdev_gc_actor::copy_valid_data(

if (err) {
// we will come here if:
// 1 any blob copy fails, then err is operation_canceled
// 1 any blob copy fails, then err is operation_cancelled
// 2 write footer fails, then err is the error code of write footer
GCLOGE(task_id, pg_id, shard_id,
"Failed to copy some blos or failed to write shard footer for move_to_chunk={}, "
Expand Down Expand Up @@ -1253,7 +1253,7 @@ void GCManager::pdev_gc_actor::process_gc_task(chunk_id_t move_from_chunk, uint8

// after data copy, we persist the gc task meta blk. now, we can make sure all the valid blobs are successfully
// copyed and new blob indexes have be written to gc index table before gc task superblk is persisted.
homestore::superblk< GCManager::gc_task_superblk > gc_task_sb{GCManager::_gc_task_meta_name};
homestore::superblk< GCManager::gc_task_superblk > gc_task_sb{GCManager::gc_task_meta_name};
gc_task_sb.create(sizeof(GCManager::gc_task_superblk));
gc_task_sb->move_from_chunk = move_from_chunk;
gc_task_sb->move_to_chunk = move_to_chunk;
Expand Down
12 changes: 6 additions & 6 deletions src/lib/homestore_backend/gc_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class GCManager {
GCManager& operator=(GCManager&&) = delete;

public:
inline static auto const _gc_actor_meta_name = std::string("GCActor");
inline static auto const _gc_task_meta_name = std::string("GCTask");
inline static auto const _gc_reserved_chunk_meta_name = std::string("GCReservedChunk");
inline static auto const gc_actor_meta_name = std::string("GCActor");
inline static auto const gc_task_meta_name = std::string("GCTask");
inline static auto const gc_reserved_chunk_meta_name = std::string("GCReservedChunk");
inline static atomic_uint64_t _gc_task_id{1}; // 0 is used for crash recovery

#pragma pack(1)
Expand All @@ -61,7 +61,7 @@ class GCManager {
uint64_t failed_egc_task_count{0ull};
uint64_t total_reclaimed_blk_count_by_gc{0ull};
uint64_t total_reclaimed_blk_count_by_egc{0ull};
static std::string name() { return _gc_actor_meta_name; }
static std::string name() { return gc_actor_meta_name; }
};

struct gc_task_superblk {
Expand All @@ -70,12 +70,12 @@ class GCManager {
chunk_id_t vchunk_id;
pg_id_t pg_id;
uint8_t priority;
static std::string name() { return _gc_task_meta_name; }
static std::string name() { return gc_task_meta_name; }
};

struct gc_reserved_chunk_superblk {
chunk_id_t chunk_id;
static std::string name() { return _gc_reserved_chunk_meta_name; }
static std::string name() { return gc_reserved_chunk_meta_name; }
};
#pragma pack()

Expand Down
Loading
Loading