Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.7.7"
version = "3.0.1"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down Expand Up @@ -50,7 +50,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.20.15]@oss/master")
self.requires("homestore/[^6.20.20]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/[^3.1]", override=True)
Expand Down
3 changes: 2 additions & 1 deletion src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ struct ShardInfo {
shard_id_t id;
pg_id_t placement_group;
State state;
uint64_t lsn; // created_lsn
uint64_t lsn; // created_lsn
uint64_t sealed_lsn{INT64_MAX}; // sealed_lsn
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
Expand Down
29 changes: 4 additions & 25 deletions src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ bool GCManager::is_eligible_for_gc(chunk_id_t chunk_id) {
const auto defrag_blk_num = chunk->get_defrag_nblks();
if (!defrag_blk_num) { return false; }

// 1 if the chunk state is inuse, it is occupied by a open shard, so it can not be selected and we don't need gc it.
// 1 if the chunk state is inuse, it is now occupied by an open shard and can not be selected ,so we can not gc it.
// 2 if the chunk state is gc, it means this chunk is being gc, or this is a reserved chunk, so we don't need gc it.
if (chunk->m_state != ChunkState::AVAILABLE) {
LOGDEBUG("chunk_id={} state is {}, not eligible for gc", chunk_id, chunk->m_state)
return false;
}

const auto total_blk_num = chunk->get_total_blks();
const auto gc_garbage_rate_threshold = HS_BACKEND_DYNAMIC_CONFIG(gc_garbage_rate_threshold);
static const auto gc_garbage_rate_threshold = HS_BACKEND_DYNAMIC_CONFIG(gc_garbage_rate_threshold);
bool should_gc = 100 * defrag_blk_num >= total_blk_num * gc_garbage_rate_threshold;

LOGDEBUG("gc scan chunk_id={}, use_blks={}, available_blks={}, total_blks={}, defrag_blks={}, should_gc={}",
Expand Down Expand Up @@ -552,27 +552,6 @@ bool GCManager::pdev_gc_actor::replace_blob_index(
return true;
}

sisl::sg_list GCManager::pdev_gc_actor::generate_shard_super_blk_sg_list(shard_id_t shard_id) {
// TODO: do the buffer check before using it.
auto raw_shard_sb = m_hs_home_object->_get_hs_shard(shard_id);
RELEASE_ASSERT(raw_shard_sb, "can not find shard super blk for shard_id={} !!!", shard_id);

const auto shard_sb =
const_cast< HSHomeObject::HS_Shard* >(d_cast< const HSHomeObject::HS_Shard* >(raw_shard_sb))->sb_.get();

auto blk_size = homestore::data_service().get_blk_size();
auto shard_sb_size = sizeof(HSHomeObject::shard_info_superblk);
auto total_size = sisl::round_up(shard_sb_size, blk_size);
auto shard_sb_buf = iomanager.iobuf_alloc(blk_size, total_size);

std::memcpy(shard_sb_buf, shard_sb, shard_sb_size);

sisl::sg_list shard_sb_sgs;
shard_sb_sgs.size = total_size;
shard_sb_sgs.iovs.emplace_back(iovec{.iov_base = shard_sb_buf, .iov_len = total_size});
return shard_sb_sgs;
}

// note that, when we copy data, there is not create shard or put blob in this chunk, only delete blob might happen.
bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk_id_t move_to_chunk,
const uint64_t task_id) {
Expand Down Expand Up @@ -692,7 +671,7 @@ bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk
}

// prepare a shard header for this shard in move_to_chunk
sisl::sg_list header_sgs = generate_shard_super_blk_sg_list(shard_id);
sisl::sg_list header_sgs = m_hs_home_object->generate_shard_super_blk_sg_list(shard_id);

// we ignore the state in shard header blk. we never read a shard header since we don`t know where it is(nor
// record the pba in indextable)
Expand Down Expand Up @@ -837,7 +816,7 @@ bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk
}

// 3 write a shard footer for this shard
sisl::sg_list footer_sgs = generate_shard_super_blk_sg_list(shard_id);
sisl::sg_list footer_sgs = m_hs_home_object->generate_shard_super_blk_sg_list(shard_id);
return folly::collectAllUnsafe(futs)
.thenValue([this, &is_last_shard, &shard_id, &blk_size, &hints, &move_to_chunk,
&last_shard_state, task_id, &data_service, footer_sgs](auto&& results) {
Expand Down
4 changes: 0 additions & 4 deletions src/lib/homestore_backend/gc_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,6 @@ class GCManager {

pdev_gc_metrics& metrics() { return metrics_; }

private:
// utils
sisl::sg_list generate_shard_super_blk_sg_list(shard_id_t shard_id);

private:
uint32_t m_pdev_id;
std::shared_ptr< HeapChunkSelector > m_chunk_selector;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ std::shared_ptr< const std::vector< homestore::chunk_num_t > > HeapChunkSelector
return p_chunk_ids;
}

std::optional< homestore::chunk_num_t > HeapChunkSelector::get_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id) {
std::optional< homestore::chunk_num_t > HeapChunkSelector::pick_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id) {
std::shared_lock lock_guard(m_chunk_selector_mtx);
auto pg_it = m_per_pg_chunks.find(pg_id);
if (pg_it == m_per_pg_chunks.end()) {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class HeapChunkSelector : public homestore::ChunkSelector {
* @param pg_id The ID of the pg.
* @return An optional chunk_num_t value representing v_chunk_id, or std::nullopt if no space left.
*/
std::optional< chunk_num_t > get_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id);
std::optional< chunk_num_t > pick_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id);

// this should be called on each pg meta blk found
bool recover_pg_chunks(pg_id_t pg_id, std::vector< chunk_num_t >&& p_chunk_ids);
Expand Down
70 changes: 61 additions & 9 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,48 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
trace_id_t tid = hs_ctx ? hs_ctx->traceID() : 0;
auto msg_header = r_cast< ReplicationMessageHeader const* >(header.cbytes());
if (msg_header->corrupted()) {
LOGE("replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); }
RELEASE_ASSERT(false, "replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid);
return;
}

const auto shard_id = msg_header->shard_id;
auto const blob_id = *(reinterpret_cast< blob_id_t* >(const_cast< uint8_t* >(key.cbytes())));

int64_t shard_sealed_lsn;
{
std::scoped_lock lock_guard(_shard_lock);
auto iter = _shard_map.find(shard_id);
RELEASE_ASSERT(iter != _shard_map.end(), "shardID=0x{:x}, pg={}, shard=0x{:x}, shard does not exist", shard_id,
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask));
shard_sealed_lsn = (*iter->second)->info.sealed_lsn;
}

// we should use shard state to determine whether to put blob, since when log replay, the if the shard has been
// sealed before homeobject restart, then the shard state in shard_meta_blk will be sealed, and as a result, all the
// replays of put_blob in this shard will fail, and the blob_id->pba will not be inserted into pg index table again.

if (lsn >= shard_sealed_lsn) {
homestore::data_service().async_free_blk(pbas).thenValue([lsn, shard_id, blob_id, tid, &pbas](auto&& err) {
if (err) {
BLOGW(tid, shard_id, blob_id, "failed to free blob data blk, err={}, lsn={}, blkid={}", err.message(),
lsn, pbas.to_string());
} else {
BLOGD(tid, shard_id, blob_id, "succeed to free blob data blk, lsn={}, blkid={}", lsn, pbas.to_string());
}
});

BLOGD(tid, shard_id, blob_id,
"try to commit put_blob message to a non-open shard, lsn={}, shard_sealed_lsn={}, skip it!", lsn,
shard_sealed_lsn);

if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::SEALED_SHARD))); }
return;
}

auto const pg_id = msg_header->pg_id;

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
blob_info.shard_id = shard_id;
blob_info.blob_id = blob_id;
blob_info.pbas = pbas;

Expand Down Expand Up @@ -394,13 +426,23 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<

auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));
if (msg_header->corrupted()) {
LOGE("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, replication message header is corrupted with crc error",
tid, msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width),
(msg_header->shard_id & homeobject::shard_mask));
RELEASE_ASSERT(
false,
"traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, replication message header is corrupted with crc error",
tid, msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width),
(msg_header->shard_id & homeobject::shard_mask));
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); }
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

if (msg_header->msg_type != ReplicationMessageType::PUT_BLOB_MSG) {
LOGW("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, unsupported message type {}, reject it!", tid,
msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width),
(msg_header->shard_id & homeobject::shard_mask), msg_header->pg_id, msg_header->msg_type);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::UNSUPPORTED_OP))); }
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

auto hs_pg = get_hs_pg(msg_header->pg_id);
if (hs_pg == nullptr) {
LOGW("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, Received a blob_put on an unknown pg={}, underlying "
Expand All @@ -425,6 +467,14 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());

if (hs_shard->sb_->info.state != ShardInfo::State::OPEN) {
LOGW("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, Received a blob_put on an unopen shard, reject it!", tid,
msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width),
(msg_header->shard_id & homeobject::shard_mask));
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::SEALED_SHARD))); }
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

homestore::blk_alloc_hints hints;
hints.chunk_id_hint = hs_shard->sb_->p_chunk_id;
if (hs_ctx->is_proposer()) { hints.reserved_blks = get_reserved_blks(); }
Expand Down Expand Up @@ -519,8 +569,10 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
auto tid = hs_ctx ? hs_ctx->traceID() : 0;
auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));
if (msg_header->corrupted()) {
BLOGE(tid, msg_header->shard_id, *r_cast< blob_id_t const* >(key.cbytes()),
"replication message header is corrupted with crc error, lsn={} header={}", lsn, msg_header->to_string());
RELEASE_ASSERT(
false,
"replication message header is corrupted with crc error, tid={}, shard={}, blob={}, lsn={} header={}", tid,
msg_header->shard_id, *r_cast< blob_id_t const* >(key.cbytes()), lsn, msg_header->to_string());
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); }
return;
}
Expand Down Expand Up @@ -607,7 +659,7 @@ void HSHomeObject::on_blob_message_rollback(int64_t lsn, sisl::blob const& heade
auto tid = hs_ctx ? hs_ctx->traceID() : 0;
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid);
RELEASE_ASSERT(false, "replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); }
return;
}
Expand Down
18 changes: 5 additions & 13 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ class HSHomeObject : public HomeObjectImpl {
static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size);
static std::string serialize_shard_info(const ShardInfo& info);
void local_create_shard(ShardInfo shard_info, homestore::chunk_num_t v_chunk_id, homestore::chunk_num_t p_chunk_id,
homestore::blk_count_t blk_count, trace_id_t tid = 0);
trace_id_t tid = 0);
void add_new_shard_to_map(std::unique_ptr< HS_Shard > shard);
void update_shard_in_map(const ShardInfo& shard_info);

Expand Down Expand Up @@ -804,8 +804,8 @@ class HSHomeObject : public HomeObjectImpl {
* @param repl_dev The replication device.
* @param hs_ctx The replication request context.
*/
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

bool on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
Expand Down Expand Up @@ -845,16 +845,6 @@ class HSHomeObject : public HomeObjectImpl {
*/
void on_replica_restart();

/**
* @brief Extracts the physical chunk ID for create shard from the message.
*
* @param header The message header that includes the shard_info_superblk, which contains the data necessary for
* extracting and mapping the chunk ID.
* @return An optional virtual chunk id if the extraction and mapping process is successful, otherwise an empty
* optional.
*/
std::optional< homestore::chunk_num_t > resolve_v_chunk_id_from_msg(sisl::blob const& header);

/**
* @brief Releases a chunk based on the information provided in a CREATE_SHARD message.
*
Expand Down Expand Up @@ -924,6 +914,8 @@ class HSHomeObject : public HomeObjectImpl {
void update_pg_meta_after_gc(const pg_id_t pg_id, const homestore::chunk_num_t move_from_chunk,
const homestore::chunk_num_t move_to_chunk, const uint64_t task_id);

sisl::sg_list generate_shard_super_blk_sg_list(shard_id_t shard_id);

uint32_t get_pg_tombstone_blob_count(pg_id_t pg_id) const;

// Snapshot persistence related
Expand Down
15 changes: 6 additions & 9 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,8 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he
auto const* msg_header = r_cast< ReplicationMessageHeader const* >(header.cbytes());

if (msg_header->corrupted()) {
LOGE("create PG message header is corrupted , lsn={}; header={}, trace_id={}", lsn, msg_header->to_string(),
tid);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); }
RELEASE_ASSERT(false, "create PG message header is corrupted , lsn={}; header={}, trace_id={}", lsn,
msg_header->to_string(), tid);
return;
}

Expand All @@ -261,8 +260,7 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he

if (crc32_ieee(init_crc32, serailized_pg_info_buf, serailized_pg_info_size) != msg_header->payload_crc) {
// header & value is inconsistent;
LOGE("create PG message header is inconsistent with value, lsn={}, trace_id={}", lsn, tid);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); }
RELEASE_ASSERT(false, "create PG message header is inconsistent with value, lsn={}, trace_id={}", lsn, tid);
return;
}

Expand Down Expand Up @@ -857,8 +855,8 @@ void HSHomeObject::on_create_pg_message_rollback(int64_t lsn, sisl::blob const&
auto const* msg_header = r_cast< ReplicationMessageHeader const* >(header.cbytes());

if (msg_header->corrupted()) {
LOGE("create PG message header is corrupted , lsn={}, header={}", lsn, msg_header->to_string());
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); }
RELEASE_ASSERT(false, "create PG message header is corrupted , lsn={}, header={}", lsn,
msg_header->to_string());
return;
}

Expand All @@ -867,8 +865,7 @@ void HSHomeObject::on_create_pg_message_rollback(int64_t lsn, sisl::blob const&

if (crc32_ieee(init_crc32, serailized_pg_info_buf, serailized_pg_info_size) != msg_header->payload_crc) {
// header & value is inconsistent;
LOGE("create PG message header is inconsistent with value, lsn={}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); }
RELEASE_ASSERT(false, "create PG message header is inconsistent with value, lsn={}", lsn);
return;
}

Expand Down
Loading
Loading