diff --git a/conanfile.py b/conanfile.py index 657eebc97..6e7d797cb 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.7.7" + version = "3.0.1" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" @@ -50,7 +50,7 @@ def build_requirements(self): def requirements(self): self.requires("sisl/[^12.2]@oss/master", transitive_headers=True) - self.requires("homestore/[^6.20.15]@oss/master") + self.requires("homestore/[^6.20.20]@oss/master") self.requires("iomgr/[^11.3]@oss/master") self.requires("lz4/1.9.4", override=True) self.requires("openssl/[^3.1]", override=True) diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index d5bd44a78..10c606b12 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -34,7 +34,8 @@ struct ShardInfo { shard_id_t id; pg_id_t placement_group; State state; - uint64_t lsn; // created_lsn + uint64_t lsn; // created_lsn + uint64_t sealed_lsn{INT64_MAX}; // sealed_lsn uint64_t created_time; uint64_t last_modified_time; uint64_t available_capacity_bytes; diff --git a/src/lib/homestore_backend/gc_manager.cpp b/src/lib/homestore_backend/gc_manager.cpp index 0c1ad6bc0..8585e1929 100644 --- a/src/lib/homestore_backend/gc_manager.cpp +++ b/src/lib/homestore_backend/gc_manager.cpp @@ -166,7 +166,7 @@ bool GCManager::is_eligible_for_gc(chunk_id_t chunk_id) { const auto defrag_blk_num = chunk->get_defrag_nblks(); if (!defrag_blk_num) { return false; } - // 1 if the chunk state is inuse, it is occupied by a open shard, so it can not be selected and we don't need gc it. + // 1 if the chunk state is inuse, it is now occupied by an open shard and can not be selected ,so we can not gc it. // 2 if the chunk state is gc, it means this chunk is being gc, or this is a reserved chunk, so we don't need gc it. if (chunk->m_state != ChunkState::AVAILABLE) { LOGDEBUG("chunk_id={} state is {}, not eligible for gc", chunk_id, chunk->m_state) @@ -174,7 +174,7 @@ bool GCManager::is_eligible_for_gc(chunk_id_t chunk_id) { } const auto total_blk_num = chunk->get_total_blks(); - const auto gc_garbage_rate_threshold = HS_BACKEND_DYNAMIC_CONFIG(gc_garbage_rate_threshold); + static const auto gc_garbage_rate_threshold = HS_BACKEND_DYNAMIC_CONFIG(gc_garbage_rate_threshold); bool should_gc = 100 * defrag_blk_num >= total_blk_num * gc_garbage_rate_threshold; LOGDEBUG("gc scan chunk_id={}, use_blks={}, available_blks={}, total_blks={}, defrag_blks={}, should_gc={}", @@ -552,27 +552,6 @@ bool GCManager::pdev_gc_actor::replace_blob_index( return true; } -sisl::sg_list GCManager::pdev_gc_actor::generate_shard_super_blk_sg_list(shard_id_t shard_id) { - // TODO: do the buffer check before using it. - auto raw_shard_sb = m_hs_home_object->_get_hs_shard(shard_id); - RELEASE_ASSERT(raw_shard_sb, "can not find shard super blk for shard_id={} !!!", shard_id); - - const auto shard_sb = - const_cast< HSHomeObject::HS_Shard* >(d_cast< const HSHomeObject::HS_Shard* >(raw_shard_sb))->sb_.get(); - - auto blk_size = homestore::data_service().get_blk_size(); - auto shard_sb_size = sizeof(HSHomeObject::shard_info_superblk); - auto total_size = sisl::round_up(shard_sb_size, blk_size); - auto shard_sb_buf = iomanager.iobuf_alloc(blk_size, total_size); - - std::memcpy(shard_sb_buf, shard_sb, shard_sb_size); - - sisl::sg_list shard_sb_sgs; - shard_sb_sgs.size = total_size; - shard_sb_sgs.iovs.emplace_back(iovec{.iov_base = shard_sb_buf, .iov_len = total_size}); - return shard_sb_sgs; -} - // note that, when we copy data, there is not create shard or put blob in this chunk, only delete blob might happen. bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk_id_t move_to_chunk, const uint64_t task_id) { @@ -692,7 +671,7 @@ bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk } // prepare a shard header for this shard in move_to_chunk - sisl::sg_list header_sgs = generate_shard_super_blk_sg_list(shard_id); + sisl::sg_list header_sgs = m_hs_home_object->generate_shard_super_blk_sg_list(shard_id); // we ignore the state in shard header blk. we never read a shard header since we don`t know where it is(nor // record the pba in indextable) @@ -837,7 +816,7 @@ bool GCManager::pdev_gc_actor::copy_valid_data(chunk_id_t move_from_chunk, chunk } // 3 write a shard footer for this shard - sisl::sg_list footer_sgs = generate_shard_super_blk_sg_list(shard_id); + sisl::sg_list footer_sgs = m_hs_home_object->generate_shard_super_blk_sg_list(shard_id); return folly::collectAllUnsafe(futs) .thenValue([this, &is_last_shard, &shard_id, &blk_size, &hints, &move_to_chunk, &last_shard_state, task_id, &data_service, footer_sgs](auto&& results) { diff --git a/src/lib/homestore_backend/gc_manager.hpp b/src/lib/homestore_backend/gc_manager.hpp index afdc43a32..c233e4c02 100644 --- a/src/lib/homestore_backend/gc_manager.hpp +++ b/src/lib/homestore_backend/gc_manager.hpp @@ -237,10 +237,6 @@ class GCManager { pdev_gc_metrics& metrics() { return metrics_; } - private: - // utils - sisl::sg_list generate_shard_super_blk_sg_list(shard_id_t shard_id); - private: uint32_t m_pdev_id; std::shared_ptr< HeapChunkSelector > m_chunk_selector; diff --git a/src/lib/homestore_backend/heap_chunk_selector.cpp b/src/lib/homestore_backend/heap_chunk_selector.cpp index aa92a31d1..3e6bc8b21 100644 --- a/src/lib/homestore_backend/heap_chunk_selector.cpp +++ b/src/lib/homestore_backend/heap_chunk_selector.cpp @@ -514,7 +514,7 @@ std::shared_ptr< const std::vector< homestore::chunk_num_t > > HeapChunkSelector return p_chunk_ids; } -std::optional< homestore::chunk_num_t > HeapChunkSelector::get_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id) { +std::optional< homestore::chunk_num_t > HeapChunkSelector::pick_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id) { std::shared_lock lock_guard(m_chunk_selector_mtx); auto pg_it = m_per_pg_chunks.find(pg_id); if (pg_it == m_per_pg_chunks.end()) { diff --git a/src/lib/homestore_backend/heap_chunk_selector.h b/src/lib/homestore_backend/heap_chunk_selector.h index 39ce5f5d0..33fc45274 100644 --- a/src/lib/homestore_backend/heap_chunk_selector.h +++ b/src/lib/homestore_backend/heap_chunk_selector.h @@ -126,7 +126,7 @@ class HeapChunkSelector : public homestore::ChunkSelector { * @param pg_id The ID of the pg. * @return An optional chunk_num_t value representing v_chunk_id, or std::nullopt if no space left. */ - std::optional< chunk_num_t > get_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id); + std::optional< chunk_num_t > pick_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id); // this should be called on each pg meta blk found bool recover_pg_chunks(pg_id_t pg_id, std::vector< chunk_num_t >&& p_chunk_ids); diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index b7d960b3a..f586df5c7 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -257,16 +257,48 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis trace_id_t tid = hs_ctx ? hs_ctx->traceID() : 0; auto msg_header = r_cast< ReplicationMessageHeader const* >(header.cbytes()); if (msg_header->corrupted()) { - LOGE("replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); } + RELEASE_ASSERT(false, "replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid); return; } + const auto shard_id = msg_header->shard_id; auto const blob_id = *(reinterpret_cast< blob_id_t* >(const_cast< uint8_t* >(key.cbytes()))); + + int64_t shard_sealed_lsn; + { + std::scoped_lock lock_guard(_shard_lock); + auto iter = _shard_map.find(shard_id); + RELEASE_ASSERT(iter != _shard_map.end(), "shardID=0x{:x}, pg={}, shard=0x{:x}, shard does not exist", shard_id, + (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask)); + shard_sealed_lsn = (*iter->second)->info.sealed_lsn; + } + + // we should use shard state to determine whether to put blob, since when log replay, the if the shard has been + // sealed before homeobject restart, then the shard state in shard_meta_blk will be sealed, and as a result, all the + // replays of put_blob in this shard will fail, and the blob_id->pba will not be inserted into pg index table again. + + if (lsn >= shard_sealed_lsn) { + homestore::data_service().async_free_blk(pbas).thenValue([lsn, shard_id, blob_id, tid, &pbas](auto&& err) { + if (err) { + BLOGW(tid, shard_id, blob_id, "failed to free blob data blk, err={}, lsn={}, blkid={}", err.message(), + lsn, pbas.to_string()); + } else { + BLOGD(tid, shard_id, blob_id, "succeed to free blob data blk, lsn={}, blkid={}", lsn, pbas.to_string()); + } + }); + + BLOGD(tid, shard_id, blob_id, + "try to commit put_blob message to a non-open shard, lsn={}, shard_sealed_lsn={}, skip it!", lsn, + shard_sealed_lsn); + + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::SEALED_SHARD))); } + return; + } + auto const pg_id = msg_header->pg_id; BlobInfo blob_info; - blob_info.shard_id = msg_header->shard_id; + blob_info.shard_id = shard_id; blob_info.blob_id = blob_id; blob_info.pbas = pbas; @@ -394,13 +426,23 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())); if (msg_header->corrupted()) { - LOGE("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, replication message header is corrupted with crc error", - tid, msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width), - (msg_header->shard_id & homeobject::shard_mask)); + RELEASE_ASSERT( + false, + "traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, replication message header is corrupted with crc error", + tid, msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width), + (msg_header->shard_id & homeobject::shard_mask)); if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); } return folly::makeUnexpected(homestore::ReplServiceError::FAILED); } + if (msg_header->msg_type != ReplicationMessageType::PUT_BLOB_MSG) { + LOGW("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, unsupported message type {}, reject it!", tid, + msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width), + (msg_header->shard_id & homeobject::shard_mask), msg_header->pg_id, msg_header->msg_type); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::UNSUPPORTED_OP))); } + return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); + } + auto hs_pg = get_hs_pg(msg_header->pg_id); if (hs_pg == nullptr) { LOGW("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, Received a blob_put on an unknown pg={}, underlying " @@ -425,6 +467,14 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get()); + if (hs_shard->sb_->info.state != ShardInfo::State::OPEN) { + LOGW("traceID={}, shardID=0x{:x}, pg={}, shard=0x{:x}, Received a blob_put on an unopen shard, reject it!", tid, + msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width), + (msg_header->shard_id & homeobject::shard_mask)); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::SEALED_SHARD))); } + return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); + } + homestore::blk_alloc_hints hints; hints.chunk_id_hint = hs_shard->sb_->p_chunk_id; if (hs_ctx->is_proposer()) { hints.reserved_blks = get_reserved_blks(); } @@ -519,8 +569,10 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis auto tid = hs_ctx ? hs_ctx->traceID() : 0; auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())); if (msg_header->corrupted()) { - BLOGE(tid, msg_header->shard_id, *r_cast< blob_id_t const* >(key.cbytes()), - "replication message header is corrupted with crc error, lsn={} header={}", lsn, msg_header->to_string()); + RELEASE_ASSERT( + false, + "replication message header is corrupted with crc error, tid={}, shard={}, blob={}, lsn={} header={}", tid, + msg_header->shard_id, *r_cast< blob_id_t const* >(key.cbytes()), lsn, msg_header->to_string()); if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); } return; } @@ -607,7 +659,7 @@ void HSHomeObject::on_blob_message_rollback(int64_t lsn, sisl::blob const& heade auto tid = hs_ctx ? hs_ctx->traceID() : 0; const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); if (msg_header->corrupted()) { - LOGW("replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid); + RELEASE_ASSERT(false, "replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid); if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::CHECKSUM_MISMATCH))); } return; } diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index f7152ee39..179e6c87c 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -689,7 +689,7 @@ class HSHomeObject : public HomeObjectImpl { static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size); static std::string serialize_shard_info(const ShardInfo& info); void local_create_shard(ShardInfo shard_info, homestore::chunk_num_t v_chunk_id, homestore::chunk_num_t p_chunk_id, - homestore::blk_count_t blk_count, trace_id_t tid = 0); + trace_id_t tid = 0); void add_new_shard_to_map(std::unique_ptr< HS_Shard > shard); void update_shard_in_map(const ShardInfo& shard_info); @@ -804,8 +804,8 @@ class HSHomeObject : public HomeObjectImpl { * @param repl_dev The replication device. * @param hs_ctx The replication request context. */ - void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids, - shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx); + void on_shard_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev, + cintrusive< homestore::repl_req_ctx >& hs_ctx); bool on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& hs_ctx); @@ -845,16 +845,6 @@ class HSHomeObject : public HomeObjectImpl { */ void on_replica_restart(); - /** - * @brief Extracts the physical chunk ID for create shard from the message. - * - * @param header The message header that includes the shard_info_superblk, which contains the data necessary for - * extracting and mapping the chunk ID. - * @return An optional virtual chunk id if the extraction and mapping process is successful, otherwise an empty - * optional. - */ - std::optional< homestore::chunk_num_t > resolve_v_chunk_id_from_msg(sisl::blob const& header); - /** * @brief Releases a chunk based on the information provided in a CREATE_SHARD message. * @@ -924,6 +914,8 @@ class HSHomeObject : public HomeObjectImpl { void update_pg_meta_after_gc(const pg_id_t pg_id, const homestore::chunk_num_t move_from_chunk, const homestore::chunk_num_t move_to_chunk, const uint64_t task_id); + sisl::sg_list generate_shard_super_blk_sg_list(shard_id_t shard_id); + uint32_t get_pg_tombstone_blob_count(pg_id_t pg_id) const; // Snapshot persistence related diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 84dbdebb2..3dd0987eb 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -250,9 +250,8 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he auto const* msg_header = r_cast< ReplicationMessageHeader const* >(header.cbytes()); if (msg_header->corrupted()) { - LOGE("create PG message header is corrupted , lsn={}; header={}, trace_id={}", lsn, msg_header->to_string(), - tid); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } + RELEASE_ASSERT(false, "create PG message header is corrupted , lsn={}; header={}, trace_id={}", lsn, + msg_header->to_string(), tid); return; } @@ -261,8 +260,7 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he if (crc32_ieee(init_crc32, serailized_pg_info_buf, serailized_pg_info_size) != msg_header->payload_crc) { // header & value is inconsistent; - LOGE("create PG message header is inconsistent with value, lsn={}, trace_id={}", lsn, tid); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } + RELEASE_ASSERT(false, "create PG message header is inconsistent with value, lsn={}, trace_id={}", lsn, tid); return; } @@ -857,8 +855,8 @@ void HSHomeObject::on_create_pg_message_rollback(int64_t lsn, sisl::blob const& auto const* msg_header = r_cast< ReplicationMessageHeader const* >(header.cbytes()); if (msg_header->corrupted()) { - LOGE("create PG message header is corrupted , lsn={}, header={}", lsn, msg_header->to_string()); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } + RELEASE_ASSERT(false, "create PG message header is corrupted , lsn={}, header={}", lsn, + msg_header->to_string()); return; } @@ -867,8 +865,7 @@ void HSHomeObject::on_create_pg_message_rollback(int64_t lsn, sisl::blob const& if (crc32_ieee(init_crc32, serailized_pg_info_buf, serailized_pg_info_size) != msg_header->payload_crc) { // header & value is inconsistent; - LOGE("create PG message header is inconsistent with value, lsn={}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(PGError::CRC_MISMATCH)); } + RELEASE_ASSERT(false, "create PG message header is inconsistent with value, lsn={}", lsn); return; } diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index bc69531ed..e62dda802 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -84,6 +84,7 @@ std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) { j["shard_info"]["pg_id_t"] = info.placement_group; j["shard_info"]["state"] = info.state; j["shard_info"]["lsn"] = info.lsn; + j["shard_info"]["sealed_lsn"] = info.sealed_lsn; j["shard_info"]["created_time"] = info.created_time; j["shard_info"]["modified_time"] = info.last_modified_time; j["shard_info"]["total_capacity"] = info.total_capacity_bytes; @@ -98,6 +99,7 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_ shard_info.placement_group = shard_json["shard_info"]["pg_id_t"].get< pg_id_t >(); shard_info.state = static_cast< ShardInfo::State >(shard_json["shard_info"]["state"].get< int >()); shard_info.lsn = shard_json["shard_info"]["lsn"].get< uint64_t >(); + shard_info.sealed_lsn = shard_json["shard_info"]["sealed_lsn"].get< uint64_t >(); shard_info.created_time = shard_json["shard_info"]["created_time"].get< uint64_t >(); shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >(); shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >(); @@ -107,7 +109,6 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes, trace_id_t tid) { - if (is_shutting_down()) { LOGI("service is being shut down"); return folly::makeUnexpected(ShardError(ShardErrorCode::SHUTTING_DOWN)); @@ -146,77 +147,68 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow } auto new_shard_id = generate_new_shard_id(pg_owner); SLOGD(tid, new_shard_id, "Create shard request: pg={}, size={}", pg_owner, size_bytes); - auto create_time = get_current_timestamp(); // select chunk for shard. - const auto v_chunkID = chunk_selector()->get_most_available_blk_chunk(new_shard_id, pg_owner); + const auto v_chunkID = chunk_selector()->pick_most_available_blk_chunk(new_shard_id, pg_owner); + if (!v_chunkID.has_value()) { SLOGW(tid, new_shard_id, "no availble chunk left to create shard for pg={}", pg_owner); decr_pending_request_num(); return folly::makeUnexpected(ShardError(ShardErrorCode::NO_SPACE_LEFT)); } + + // now, we put allocate blk for shard head/footer in on_commit of create/seal shard, so we have to make sure that + // the blk can be successfully allocated immediately(or after emergent gc). otherwise, on_commit can not go ahead + // and the whole raft group will be blocked forever. const auto v_chunk_id = v_chunkID.value(); + const auto exVchunk = chunk_selector()->get_pg_vchunk(pg_owner, v_chunk_id); + + // only seal_shard(footer) can used reserved space, so +2 here means we can at least write a shard header and a blob + // except shard footer. + if (exVchunk->available_blks() < get_reserved_blks() + 2) { + const auto pchunk_id = exVchunk->get_chunk_id(); + LOGW("failed to create shard for pg={}, pchunk_id= {} is selected for vchunk_id={} is selected, not enough " + "left space", + pg_owner, pchunk_id, v_chunk_id); + + bool res = chunk_selector()->release_chunk(pg_owner, v_chunk_id); + RELEASE_ASSERT(res, "Failed to release v_chunk_id={}, pg={}", v_chunk_id, pg_owner); + + auto gc_mgr = gc_manager(); + if (gc_mgr->is_started()) { gc_manager()->submit_gc_task(task_priority::normal, pchunk_id); } + + decr_pending_request_num(); + return folly::makeUnexpected(ShardError(ShardErrorCode::NO_SPACE_LEFT)); + } + SLOGD(tid, new_shard_id, "vchunk_id={}", v_chunk_id); - // Prepare the shard info block - sisl::io_blob_safe sb_blob(sisl::round_up(sizeof(shard_info_superblk), repl_dev->get_blk_size()), io_align); - shard_info_superblk* sb = new (sb_blob.bytes()) shard_info_superblk(); - sb->type = DataHeader::data_type_t::SHARD_INFO; - sb->info = ShardInfo{.id = new_shard_id, - .placement_group = pg_owner, - .state = ShardInfo::State::OPEN, - .lsn = 0, - .created_time = create_time, - .last_modified_time = create_time, - .available_capacity_bytes = size_bytes, - .total_capacity_bytes = size_bytes}; - sb->p_chunk_id = 0; - sb->v_chunk_id = v_chunk_id; - - auto req = repl_result_ctx< ShardManager::Result< ShardInfo > >::make( - sizeof(shard_info_superblk) /* header_extn_size */, 0u /* key_size */); - - // for create shard, we disable push_data, so that all the selecting chunk for creating shard will go through raft - // log channel, and thus, the the selecting chunk of later creating shard will go after that of the former one. - req->disable_push_data(); - - // prepare msg header; + auto req = repl_result_ctx< ShardManager::Result< ShardInfo > >::make(0u /* header_extn_size */, 0u /* key_size */); + + // prepare msg header, log only req->header()->msg_type = ReplicationMessageType::CREATE_SHARD_MSG; req->header()->pg_id = pg_owner; req->header()->shard_id = new_shard_id; - req->header()->payload_size = sizeof(shard_info_superblk); - req->header()->payload_crc = crc32_ieee(init_crc32, sb_blob.cbytes(), sizeof(shard_info_superblk)); + req->header()->vchunk_id = v_chunk_id; + req->header()->payload_size = 0; + req->header()->payload_crc = 0; req->header()->seal(); - // ShardInfo block is persisted on both on header and in data portion. - // It is persisted in header portion, so that it is written in journal and hence replay of journal on most cases - // doesn't need additional read from data blks. - // We also persist in data blocks for following reasons: - // * To recover the shard information in case both journal and metablk are lost - // * For garbage collection, we directly read from the data chunk and get shard information. - std::memcpy(req->header_extn(), sb_blob.cbytes(), sizeof(shard_info_superblk)); - req->add_data_sg(std::move(sb_blob)); - // replicate this create shard message to PG members; - repl_dev->async_alloc_write(req->cheader_buf(), sisl::blob{}, req->data_sgs(), req, false /* part_of_batch */, tid); + repl_dev->async_alloc_write(req->cheader_buf(), sisl::blob{}, sisl::sg_list{}, req, false /* part_of_batch */, tid); return req->result().deferValue([this, req, repl_dev, tid, pg_owner, new_shard_id, v_chunk_id](const auto& result) -> ShardManager::AsyncResult< ShardInfo > { if (result.hasError()) { auto err = result.error(); if (err.getCode() == ShardErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); } + // we will never get no_space_left error here. bool res = chunk_selector()->release_chunk(pg_owner, v_chunk_id); RELEASE_ASSERT(res, "Failed to release v_chunk_id={}, pg={}", v_chunk_id, pg_owner); SLOGE(tid, new_shard_id, "got {} when creating shard at leader, failed to create shard {}!", err.getCode(), new_shard_id); - if (err.getCode() == ShardErrorCode::NO_SPACE_LEFT) { - gc_manager()->submit_gc_task(task_priority::normal, - chunk_selector()->get_pg_vchunk(pg_owner, v_chunk_id)->get_chunk_id()); - SLOGD(tid, new_shard_id, "got no space left error when creating shard {} at leader", new_shard_id); - } - decr_pending_request_num(); return folly::makeUnexpected(err); } @@ -234,8 +226,8 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const } incr_pending_request_num(); - auto pg_id = info.placement_group; - auto shard_id = info.id; + const auto pg_id = info.placement_group; + const auto shard_id = info.id; SLOGD(tid, shard_id, "Seal shard request: is_open={}", info.is_open()); auto hs_pg = get_hs_pg(pg_id); if (!hs_pg) { @@ -269,33 +261,24 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const return folly::makeUnexpected(ShardError(ShardErrorCode::RETRY_REQUEST)); } - ShardInfo tmp_info = info; - tmp_info.state = ShardInfo::State::SEALED; - - // Prepare the shard info block - sisl::io_blob_safe sb_blob(sisl::round_up(sizeof(shard_info_superblk), repl_dev->get_blk_size()), io_align); - shard_info_superblk* sb = new (sb_blob.bytes()) shard_info_superblk(); - sb->type = DataHeader::data_type_t::SHARD_INFO; - sb->info = tmp_info; - // p_chunk_id and v_chunk_id will never be used in seal shard workflow. - sb->p_chunk_id = 0; - sb->v_chunk_id = 0; + auto v_chunkID = get_shard_v_chunk_id(shard_id); + if (!v_chunkID.has_value()) { + SLOGW(tid, shard_id, "failed to seal shard, vchunk id not found"); + decr_pending_request_num(); + return folly::makeUnexpected(ShardError(ShardErrorCode::UNKNOWN_SHARD)); + } - auto req = repl_result_ctx< ShardManager::Result< ShardInfo > >::make( - sizeof(shard_info_superblk) /* header_extn_size */, 0u /* key_size */); + auto req = repl_result_ctx< ShardManager::Result< ShardInfo > >::make(0u /* header_extn_size */, 0u /* key_size */); req->header()->msg_type = ReplicationMessageType::SEAL_SHARD_MSG; req->header()->pg_id = pg_id; req->header()->shard_id = shard_id; - req->header()->payload_size = sizeof(shard_info_superblk); - req->header()->payload_crc = crc32_ieee(init_crc32, sb_blob.cbytes(), sizeof(shard_info_superblk)); + req->header()->vchunk_id = v_chunkID.value(); + req->header()->payload_size = 0; + req->header()->payload_crc = 0; req->header()->seal(); - // Similar to create shard - ShardInfo block is persisted on both on header and in data portion. - std::memcpy(req->header_extn(), sb_blob.cbytes(), sizeof(shard_info_superblk)); - req->add_data_sg(std::move(sb_blob)); - // replicate this seal shard message to PG members; - repl_dev->async_alloc_write(req->cheader_buf(), sisl::blob{}, req->data_sgs(), req, false /* part_of_batch */, tid); + repl_dev->async_alloc_write(req->cheader_buf(), sisl::blob{}, sisl::sg_list{}, req, false /* part_of_batch */, tid); return req->result().deferValue( [this, req, repl_dev, tid](const auto& result) -> ShardManager::AsyncResult< ShardInfo > { if (result.hasError()) { @@ -320,45 +303,53 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& hs_ctx) { - repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr}; - if (hs_ctx && hs_ctx->is_proposer()) { - ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get(); - } auto tid = hs_ctx ? hs_ctx->traceID() : 0; const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); - if (msg_header->corrupted()) { - LOGW("replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError(ShardErrorCode::CRC_MISMATCH))); } - // TODO::if fail to pre_commit, shuold we crash here? + if (msg_header->corrupted()) { + RELEASE_ASSERT( + false, "replication message header is corrupted with crc error when pre_committing shard message, lsn={}", + lsn); return false; } - switch (msg_header->msg_type) { - case ReplicationMessageType::SEAL_SHARD_MSG: { - auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader)); - auto const shard_info = sb->info; + + const auto msg_type = msg_header->msg_type; + + RELEASE_ASSERT(msg_type == ReplicationMessageType::CREATE_SHARD_MSG || + msg_type == ReplicationMessageType::SEAL_SHARD_MSG, + "unsupport message tyep {} when pre committing shard message, fatal error!", msg_type); + + const auto& shard_id = msg_header->shard_id; + + if (msg_type == ReplicationMessageType::CREATE_SHARD_MSG) { + SLOGD(tid, shard_id, "pre_commit create_shard message, type={}, lsn= {}", msg_header->msg_type, lsn); + } else { + SLOGD(tid, shard_id, "pre_commit seal_shard message, type={}, lsn= {}", msg_header->msg_type, lsn); { std::scoped_lock lock_guard(_shard_lock); - auto iter = _shard_map.find(shard_info.id); - RELEASE_ASSERT(iter != _shard_map.end(), "shardID=0x{:x}, pg={}, shard=0x{:x}, shard does not exist", - shard_info.id, (shard_info.id >> homeobject::shard_width), - (shard_info.id & homeobject::shard_mask)); + auto iter = _shard_map.find(shard_id); + if (iter == _shard_map.end()) { + // if the create_shard message of this shard is not committed yet at this moment, we can not find it in + // pre_commit sealing shard. + SLOGW(tid, shard_id, "try to seal a shard, but not exist ATM! lsn={}", lsn); + return false; + } + auto& state = (*iter->second)->info.state; // we just change the state to SEALED, so that it will fail the later coming put_blob on this shard and will - // be easy for rollback. - // the update of superblk will be done in on_shard_message_commit; + // be easy for rollback. the update of superblk will be done in on_shard_message_commit; + + // note that , this is a best effort and we can not 100% avoid put_blob to a sealed shard, because the shard + // state is open when checking the shard state, but changed immediately to sealed by pre_commiting sealing + // shard, and as a result, this put blob might scheduled to an seald shard. if (state == ShardInfo::State::OPEN) { state = ShardInfo::State::SEALED; } else { - SLOGW(tid, shard_info.id, "try to seal an unopened shard"); + SLOGW(tid, shard_id, "try to seal an unopened shard, lsn={}", lsn); } } } - default: { - break; - } - } return true; } @@ -370,70 +361,34 @@ void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& head } auto tid = hs_ctx ? hs_ctx->traceID() : 0; const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); + if (msg_header->corrupted()) { - LOGW("replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid); - RELEASE_ASSERT(false, "shardID=0x{:x}, pg={}, shard=0x{:x}, failed to rollback create_shard msg", - msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width), - (msg_header->shard_id & homeobject::shard_mask)); + RELEASE_ASSERT(false, + "replication message header is corrupted with crc error in on_rollback, lsn={}, traceID={}", lsn, + tid); return; } - switch (msg_header->msg_type) { - case ReplicationMessageType::CREATE_SHARD_MSG: { - if (ctx) { - ctx->promise_.setValue(folly::makeUnexpected(ShardError(ShardErrorCode::RETRY_REQUEST))); - } else { - // we have already added release_chunk logic to thenValue of hoemobject#create_shard in originator, so here - // we just need to release_chunk for non-originater case since it will bring a bug if a chunk is released - // for two times. for exampele, as a originator: - - // t1 : chunk1 is released in the rollback of create_shard, the chunk state is marked as available - // t2 : chunk1 is select by a new create shard (shard1), the chunk state is marked as inuse - // t3 : chunk1 is released in thenValue of create_shard, the chunk state is marked as available - // t4 : chunk1 is select by a new create shard (shard2), the chunk state is marked as inuse - // now, shard1 and shard2 hold the same chunk. - bool res = release_chunk_based_on_create_shard_message(header); - if (!res) { - RELEASE_ASSERT(false, - "shardID=0x{:x}, pg={}, shard=0x{:x}, failed to release chunk based on create shard msg", - msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width), - (msg_header->shard_id & homeobject::shard_mask)); - } - } - break; - } - case ReplicationMessageType::SEAL_SHARD_MSG: { - auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader)); - auto const shard_info = sb->info; - { - std::scoped_lock lock_guard(_shard_lock); - auto iter = _shard_map.find(shard_info.id); - RELEASE_ASSERT(iter != _shard_map.end(), "shardID=0x{:x}, pg={}, shard=0x{:x}, shard does not exist", - shard_info.id, (shard_info.id >> homeobject::shard_width), - (shard_info.id & homeobject::shard_mask)); - auto& state = (*iter->second)->info.state; - // we just change the state to SEALED, since it will be easy for rollback - // the update of superblk will be done in on_shard_message_commit; - if (state == ShardInfo::State::SEALED) { - state = ShardInfo::State::OPEN; - } else { - SLOGW(tid, shard_info.id, "try to rollback seal_shard message , but the shard state is not sealed"); - } - } - // TODO:set a proper error code - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError(ShardErrorCode::RETRY_REQUEST))); } + const auto msg_type = msg_header->msg_type; - break; - } - default: { - break; - } + RELEASE_ASSERT(msg_type == ReplicationMessageType::CREATE_SHARD_MSG || + msg_type == ReplicationMessageType::SEAL_SHARD_MSG, + "unsupport message tyep {} when pre committing shard message, fatal error!", msg_type); + + const auto shard_id = msg_header->shard_id; + + // since we do nothing in pre_commit, so we do nothing in rollback + if (msg_type == ReplicationMessageType::CREATE_SHARD_MSG) { + SLOGD(tid, shard_id, "rollback create shard message, type={}, lsn= {}", msg_header->msg_type, lsn); + } else { + SLOGD(tid, shard_id, "rollback seal shard message, type={}, lsn= {}", msg_header->msg_type, lsn); } + + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError(ShardErrorCode::RETRY_REQUEST))); } } void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num_t v_chunk_id, - homestore::chunk_num_t p_chunk_id, homestore::blk_count_t blk_count, - trace_id_t tid) { + homestore::chunk_num_t p_chunk_id, trace_id_t tid) { bool shard_exist = false; { scoped_lock lock_guard(_shard_lock); @@ -442,29 +397,12 @@ void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num if (!shard_exist) { add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, p_chunk_id, v_chunk_id)); - // select_specific_chunk() will do something only when we are relaying journal after restart, during the - // runtime flow chunk is already been be mark busy when we write the shard info to the repldev. - auto pg_id = shard_info.placement_group; - auto chunk = chunk_selector_->select_specific_chunk(pg_id, v_chunk_id); - RELEASE_ASSERT(chunk != nullptr, "chunk selection failed with v_chunk_id={} in pg={}", v_chunk_id, pg_id); } else { - SLOGD(tid, shard_info.id, "shard already exist, skip creating shard"); + SLOGD(tid, shard_info.id, "shard already exist, this should happen in log replay case, skip creating shard"); } - - // update pg's total_occupied_blk_count - auto hs_pg = get_hs_pg(shard_info.placement_group); - RELEASE_ASSERT(hs_pg != nullptr, "shardID=0x{:x}, pg={}, shard=0x{:x}, PG not found", shard_info.id, - (shard_info.id >> homeobject::shard_width), (shard_info.id & homeobject::shard_mask)); - - SLOGD(tid, shard_info.id, "local_create_shard {}, vchunk_id={}, p_chunk_id={}, pg_id={}", shard_info.id, v_chunk_id, - p_chunk_id, shard_info.placement_group); - - const_cast< HS_PG* >(hs_pg)->durable_entities_update( - [blk_count](auto& de) { de.total_occupied_blk_count.fetch_add(blk_count, std::memory_order_relaxed); }); } -void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, homestore::MultiBlkId const& blkids, - shared< homestore::ReplDev > repl_dev, +void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr}; if (hs_ctx && hs_ctx->is_proposer()) { @@ -473,11 +411,15 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom auto tid = hs_ctx ? hs_ctx->traceID() : 0; auto header = r_cast< const ReplicationMessageHeader* >(h.cbytes()); if (header->corrupted()) { - LOGW("replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError(ShardErrorCode::CRC_MISMATCH))); } - // TODO::if fail to commit, shuold we crash here? + RELEASE_ASSERT(false, "replication message header is corrupted with crc error in on_commit, lsn={}, traceID={}", + lsn, tid); return; } + const auto shard_id = header->shard_id; + + RELEASE_ASSERT(header->msg_type == ReplicationMessageType::CREATE_SHARD_MSG || + header->msg_type == ReplicationMessageType::SEAL_SHARD_MSG, + "unsupport message tyep {} when committing shard message, fatal error!", header->msg_type); #ifdef VADLIDATE_ON_REPLAY sisl::io_blob_safe value_blob(blkids.blk_count() * repl_dev->get_blk_size(), io_align); @@ -489,71 +431,140 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom std::error_code err = repl_dev->async_read(blkids, value_sgs, value_blob.size()).get(); if (err) { LOGW("failed to read data from homestore blks, lsn={}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError(ShardErrorCode::UNKNOWN)); } + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError(ShardErrorCode::UNKNOWN)); + } return; } if (crc32_ieee(init_crc32, value.cbytes(), value.size()) != header->payload_crc) { // header & value is inconsistent; LOGW("replication message header is inconsistent with value, lsn={}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError(ShardErrorCode::CRC_MISMATCH)); } + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError(ShardErrorCode::CRC_MISMATCH)); + } return; } #endif + // allocate blk for shard header/footer + const auto pg_id = header->pg_id; + const auto vchunk_id = header->vchunk_id; + + homestore::blk_alloc_hints hints; + hints.application_hint = static_cast< uint64_t >(pg_id) << 16 | vchunk_id; + hints.reserved_blks = header->msg_type == ReplicationMessageType::CREATE_SHARD_MSG ? get_reserved_blks() : 0; + + homestore::MultiBlkId blkids; + homestore::BlkAllocStatus alloc_status; + auto gc_mgr = gc_manager(); + + for (uint8_t i = 0; i < 5; ++i) { + // we need select the chunk before starting allocate blk for shard header, so that we can make sure gc will not + // hit this chunk after we allocate blk for shard header.Here, we specify the application_hint, so it will try + // to select the vchunk for chunk selector(call select_specific_chunk). + alloc_status = homestore::data_service().alloc_blks( + sisl::round_up(sizeof(shard_info_superblk), repl_dev->get_blk_size()), hints, blkids); + + if (alloc_status == homestore::BlkAllocStatus::SUCCESS) { + SLOGD(tid, shard_id, "successfully alloc blk for committing shard message, lsn={}", lsn); + break; + } else if (alloc_status == homestore::BlkAllocStatus::SPACE_FULL) { + SLOGD(tid, shard_id, "no_space_left happens when allocating blk for committing shard message, lsn={}", lsn); + if (!gc_mgr->is_started()) { + RELEASE_ASSERT(false, + "no_space_left when allocating blk for committing shard message, but gc is disabled, " + "can not proceed, fatal error!"); + } + + auto vchunk = chunk_selector_->get_pg_vchunk(pg_id, vchunk_id); + RELEASE_ASSERT(vchunk != nullptr, "chunk selection failed with vchunk_id={} in pg={}", vchunk_id, pg_id); + const auto pchunk = vchunk->get_chunk_id(); + + // although we have checked the available_blks of the selected chunk before accept the creating shard + // request, there is also a case that we have to trigger emergent gc here, for example, a staled push_data + // of a put_blob will consume the capacity of this chunk and lead to no_space_left. + auto ret = gc_mgr->submit_gc_task(task_priority::emergent, pchunk).get(); + SLOGD(tid, shard_id, "emergent gc is completed for chunk_id={}, lsn={}, ok={}", pchunk, lsn, ret); + } else { + RELEASE_ASSERT(false, + "fatal error={} happens when allocating blk for committing shard message, vchunk={}, lsn={}", + alloc_status, vchunk_id, lsn); + } + } + + RELEASE_ASSERT( + alloc_status == homestore::BlkAllocStatus::SUCCESS, + "can not allocate blk for shard meta blk after trying for 5 times, fatal error! vchunk={}, pg={}, shard={}", + vchunk_id, pg_id, shard_id); + + ShardInfo shard_info; + switch (header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: { - auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader)); - auto shard_info = sb->info; - auto v_chunk_id = sb->v_chunk_id; + SLOGD(tid, shard_id, "pchunk {} is selected for vchunk {} in pg {} for creating shard", blkids.chunk_num(), + vchunk_id, pg_id); + // fill shard info + shard_info.id = shard_id; + shard_info.placement_group = pg_id; + shard_info.created_time = get_current_timestamp(); + shard_info.last_modified_time = get_current_timestamp(); + shard_info.total_capacity_bytes = blkids.blk_count() * homestore::data_service().get_blk_size(); shard_info.lsn = lsn; + shard_info.state = ShardInfo::State::OPEN; + shard_info.available_capacity_bytes = shard_info.total_capacity_bytes; - local_create_shard(shard_info, v_chunk_id, blkids.chunk_num(), blkids.blk_count(), tid); + local_create_shard(shard_info, vchunk_id, blkids.chunk_num(), tid); if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } - - SLOGD(tid, shard_info.id, "Commit done for creating shard"); - + SLOGD(tid, shard_id, "Commit done for creating shard at lsn={}", lsn); break; } case ReplicationMessageType::SEAL_SHARD_MSG: { - auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader)); - auto const shard_info = sb->info; - - ShardInfo::State state; { std::scoped_lock lock_guard(_shard_lock); - auto iter = _shard_map.find(shard_info.id); + auto iter = _shard_map.find(shard_id); RELEASE_ASSERT(iter != _shard_map.end(), "shardID=0x{:x}, pg={}, shard=0x{:x}, shard does not exist", - shard_info.id, (shard_info.id >> homeobject::shard_width), - (shard_info.id & homeobject::shard_mask)); - state = (*iter->second)->info.state; + shard_id, (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask)); + shard_info = (*iter->second)->info; } - if (state == ShardInfo::State::SEALED) { - auto pg_id = shard_info.placement_group; - auto v_chunkID = get_shard_v_chunk_id(shard_info.id); - RELEASE_ASSERT(v_chunkID.has_value(), "v_chunk id not found"); - bool res = chunk_selector()->release_chunk(pg_id, v_chunkID.value()); - RELEASE_ASSERT(res, "Failed to release v_chunk_id={}, pg={}", v_chunkID.value(), pg_id); - update_shard_in_map(shard_info); - } else - SLOGW(tid, shard_info.id, "try to commit SEAL_SHARD_MSG but shard state is not sealed."); - if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } - SLOGD(tid, shard_info.id, "Commit done for sealing shard"); + // if we fail the pre_commit for sealing shard(when pre_commit sealing shard, the create_shard is not committed + // yet), when reaching here, the shard state will be open. we just seal it here if the above case happens. + if (shard_info.state != ShardInfo::State::SEALED) { + SLOGW(tid, shard_id, + "the shard state is not sealed when committing seal_shard message at lsn={}, change it to sealed!", + lsn); + shard_info.state = ShardInfo::State::SEALED; + } - auto hs_pg = get_hs_pg(shard_info.placement_group); - RELEASE_ASSERT(hs_pg != nullptr, "shardID=0x{:x}, pg={}, shard=0x{:x}, PG not found", shard_info.id, - (shard_info.id >> homeobject::shard_width), (shard_info.id & homeobject::shard_mask)); - const_cast< HS_PG* >(hs_pg)->durable_entities_update( - // shard_footer will also occupy one blk. - [](auto& de) { de.total_occupied_blk_count.fetch_add(1, std::memory_order_relaxed); }); + shard_info.sealed_lsn = lsn; + bool res = chunk_selector()->release_chunk(pg_id, vchunk_id); + RELEASE_ASSERT(res, "Failed to release v_chunk_id={}, pg={}", vchunk_id, pg_id); + update_shard_in_map(shard_info); - break; + if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } + SLOGD(tid, shard_id, "Commit done for sealing shard at lsn={}", lsn); } + default: break; } + + // write shard header/footer blk. in log replay case, one more shard header/footer will be written again. it does + // not matter. + sisl::sg_list sgs = generate_shard_super_blk_sg_list(shard_id); + RELEASE_ASSERT(sgs.iovs.size() == 1, "sgs.iovs.size() for shard header/footer should be 1, but not!"); + homestore::data_service().async_write(sgs, blkids).thenValue([sgs, lsn, msgtype = header->msg_type](auto&& err) { + // it does not matter if fail to write shard header/footer, we never read them + if (err) { LOGW("failed to write shard super blk, err={}, lsn={}, msgType={}", err.message(), lsn, msgtype); } + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(sgs.iovs[0].iov_base)); + }); + + auto hs_pg = get_hs_pg(pg_id); + RELEASE_ASSERT(hs_pg != nullptr, "shardID=0x{:x}, pg={}, shard=0x{:x}, PG not found", shard_id, + (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask)); + const_cast< HS_PG* >(hs_pg)->durable_entities_update( + // shard footer/header will also take one blk. + [](auto& de) { de.total_occupied_blk_count.fetch_add(1, std::memory_order_relaxed); }); } void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf) { @@ -708,33 +719,6 @@ std::optional< homestore::chunk_num_t > HSHomeObject::get_shard_v_chunk_id(const return std::make_optional< homestore::chunk_num_t >(hs_shard->sb_->v_chunk_id); } -std::optional< homestore::chunk_num_t > HSHomeObject::resolve_v_chunk_id_from_msg(sisl::blob const& header) { - const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); - if (msg_header->corrupted()) { - LOGW("replication message header is corrupted with crc error"); - return std::nullopt; - } - - switch (msg_header->msg_type) { - case ReplicationMessageType::CREATE_SHARD_MSG: { - const pg_id_t pg_id = msg_header->pg_id; - if (!pg_exists(pg_id)) { - LOGW("shardID=0x{:x}, pg={}, shard=0x{:x}, Requesting a chunk for an unknown pg={}", msg_header->shard_id, - (msg_header->shard_id >> homeobject::shard_width), (msg_header->shard_id & homeobject::shard_mask), - pg_id); - return std::nullopt; - } - auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader)); - return sb->v_chunk_id; - } - default: { - LOGW("Unexpected message type encountered={}. This function should only be called with 'CREATE_SHARD_MSG'.", - msg_header->msg_type); - return std::nullopt; - } - } -} - bool HSHomeObject::release_chunk_based_on_create_shard_message(sisl::blob const& header) { const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); if (msg_header->corrupted()) { @@ -745,10 +729,10 @@ bool HSHomeObject::release_chunk_based_on_create_shard_message(sisl::blob const& switch (msg_header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: { const pg_id_t pg_id = msg_header->pg_id; + const auto shard_id = msg_header->shard_id; if (!pg_exists(pg_id)) { - LOGW("shardID=0x{:x}, pg={}, shard=0x{:x}, Requesting a chunk for an unknown pg={}", msg_header->shard_id, - (msg_header->shard_id >> homeobject::shard_width), (msg_header->shard_id & homeobject::shard_mask), - pg_id); + LOGW("shardID=0x{:x}, pg={}, shard=0x{:x}, Requesting a chunk for an unknown pg={}", shard_id, + (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), pg_id); return false; } auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader)); @@ -804,4 +788,24 @@ void HSHomeObject::HS_Shard::update_info(const ShardInfo& shard_info, sb_.write(); } +sisl::sg_list HSHomeObject::generate_shard_super_blk_sg_list(shard_id_t shard_id) { + // TODO: do the buffer check before using it. + auto raw_shard_sb = _get_hs_shard(shard_id); + RELEASE_ASSERT(raw_shard_sb, "can not find shard super blk for shard_id={} !!!", shard_id); + + const auto shard_sb = const_cast< HS_Shard* >(d_cast< const HS_Shard* >(raw_shard_sb))->sb_.get(); + + auto blk_size = homestore::data_service().get_blk_size(); + auto shard_sb_size = sizeof(shard_info_superblk); + auto total_size = sisl::round_up(shard_sb_size, blk_size); + auto shard_sb_buf = iomanager.iobuf_alloc(blk_size, total_size); + + std::memcpy(shard_sb_buf, shard_sb, shard_sb_size); + + sisl::sg_list shard_sb_sgs; + shard_sb_sgs.size = total_size; + shard_sb_sgs.iovs.emplace_back(iovec{.iov_base = shard_sb_buf, .iov_len = total_size}); + return shard_sb_sgs; +} + } // namespace homeobject diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp index 9850a9a60..a4807c8b6 100644 --- a/src/lib/homestore_backend/pg_blob_iterator.cpp +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -179,7 +179,7 @@ bool HSHomeObject::PGBlobIterator::generate_shard_blob_list() { bool HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe& meta_blob) { auto shard = shard_list_[cur_shard_idx_]; auto shard_entry = CreateResyncShardMetaData( - builder_, shard.info.id, pg_id, static_cast< uint8_t >(shard.info.state), shard.info.lsn, + builder_, shard.info.id, pg_id, static_cast< uint8_t >(shard.info.state), shard.info.lsn, shard.info.sealed_lsn, shard.info.created_time, shard.info.last_modified_time, shard.info.total_capacity_bytes, shard.v_chunk_num); builder_.FinishSizePrefixed(shard_entry); diff --git a/src/lib/homestore_backend/replication_message.hpp b/src/lib/homestore_backend/replication_message.hpp index 3a7d5e012..de00ceb11 100644 --- a/src/lib/homestore_backend/replication_message.hpp +++ b/src/lib/homestore_backend/replication_message.hpp @@ -6,12 +6,13 @@ #include #include +#include namespace homeobject { VENUM(ReplicationMessageType, uint16_t, CREATE_PG_MSG = 0, CREATE_SHARD_MSG = 1, SEAL_SHARD_MSG = 2, PUT_BLOB_MSG = 3, DEL_BLOB_MSG = 4, UNKNOWN_MSG = 5); -VENUM(SyncMessageType, uint16_t, PG_META = 0, SHARD_META = 1, SHARD_BATCH = 2, LAST_MSG = 3); +VENUM(SyncMessageType, uint16_t, PG_META = 0, SHARD_META = 1, SHARD_BATCH = 2, LAST_MSG = 3); VENUM(ResyncBlobState, uint8_t, NORMAL = 0, DELETED = 1, CORRUPTED = 2); // magic num comes from the first 8 bytes of 'echo homeobject_replication | md5sum' @@ -21,11 +22,11 @@ static constexpr uint64_t HOMEOBJECT_RESYNC_MAGIC = 0xbb6813cb4a339f30; static constexpr uint32_t HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 = 0x01; static constexpr uint32_t HOMEOBJECT_RESYNC_PROTOCOL_VERSION_V1 = 0x01; static constexpr uint32_t init_crc32 = 0; -static constexpr uint64_t LAST_OBJ_ID =ULLONG_MAX; -static constexpr uint64_t DEFAULT_MAX_BATCH_SIZE_MB =128; +static constexpr uint64_t LAST_OBJ_ID = ULLONG_MAX; +static constexpr uint64_t DEFAULT_MAX_BATCH_SIZE_MB = 128; #pragma pack(1) -template +template < typename Header > struct BaseMessageHeader { uint64_t magic_num{HOMEOBJECT_REPLICATION_MAGIC}; uint32_t protocol_version; @@ -39,8 +40,8 @@ struct BaseMessageHeader { } uint32_t calculate_crc() const { - const auto* hdr=static_cast(this); - return crc32_ieee(init_crc32, reinterpret_cast(hdr), sizeof(*hdr)); + const auto* hdr = static_cast< const Header* >(this); + return crc32_ieee(init_crc32, reinterpret_cast< const unsigned char* >(hdr), sizeof(*hdr)); } bool corrupted() const { @@ -52,25 +53,27 @@ struct BaseMessageHeader { } std::string to_string() const { - return fmt::format( - "magic={:#x} version={} payload_size={} payload_crc={} header_crc={}\n", - magic_num, protocol_version, payload_size, payload_crc, header_crc); + return fmt::format("magic={:#x} version={} payload_size={} payload_crc={} header_crc={}\n", magic_num, + protocol_version, payload_size, payload_crc, header_crc); } }; -struct ReplicationMessageHeader : public BaseMessageHeader{ - ReplicationMessageHeader(): BaseMessageHeader() { +struct ReplicationMessageHeader : public BaseMessageHeader< ReplicationMessageHeader > { + ReplicationMessageHeader() : BaseMessageHeader() { magic_num = HOMEOBJECT_REPLICATION_MAGIC; protocol_version = HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1; } ReplicationMessageType msg_type; pg_id_t pg_id{0}; - uint8_t reserved_pad[4]{}; + // chunk_num_t = uint16_t + homestore::chunk_num_t vchunk_id{0}; + uint8_t reserved_pad[2]{}; shard_id_t shard_id{0}; blob_id_t blob_id{0}; - bool corrupted() const{ - if (magic_num != HOMEOBJECT_REPLICATION_MAGIC || protocol_version != HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1) { + bool corrupted() const { + if (magic_num != HOMEOBJECT_REPLICATION_MAGIC || + protocol_version != HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1) { return true; } return BaseMessageHeader::corrupted(); @@ -83,7 +86,7 @@ struct ReplicationMessageHeader : public BaseMessageHeader { +struct SyncMessageHeader : public BaseMessageHeader< SyncMessageHeader > { SyncMessageHeader() : BaseMessageHeader() { magic_num = HOMEOBJECT_RESYNC_MAGIC; protocol_version = HOMEOBJECT_RESYNC_PROTOCOL_VERSION_V1; @@ -91,7 +94,7 @@ struct SyncMessageHeader : public BaseMessageHeader { SyncMessageType msg_type; uint8_t reserved_pad[6]{}; - bool corrupted() const{ + bool corrupted() const { if (magic_num != HOMEOBJECT_RESYNC_MAGIC || protocol_version != HOMEOBJECT_RESYNC_PROTOCOL_VERSION_V1) { return true; } @@ -99,8 +102,8 @@ struct SyncMessageHeader : public BaseMessageHeader { } std::string to_string() const { - return fmt::format("magic={:#x} version={} msg_type={} payload_size={} payload_crc={} header_crc={}", - magic_num, protocol_version, enum_name(msg_type), payload_size, payload_crc, header_crc); + return fmt::format("magic={:#x} version={} msg_type={} payload_size={} payload_crc={} header_crc={}", magic_num, + protocol_version, enum_name(msg_type), payload_size, payload_crc, header_crc); } }; #pragma pack() @@ -114,14 +117,10 @@ struct objId { snp_batch_id_t batch_id; objId(shard_id_t shard_seq_num, snp_batch_id_t batch_id) : shard_seq_num(shard_seq_num), batch_id(batch_id) { - if (shard_seq_num != (shard_seq_num & 0xFFFFFFFFFFFF)) { - throw std::invalid_argument("shard_id is too large"); - } - if (batch_id != (batch_id & 0x7FFF)){ - throw std::invalid_argument("batch_id is too large"); - } - //type_bit (1 bit) | shard_id (48 bits) | batch_id (15 bits) - value= static_cast(1) << 63 | (shard_seq_num) << 15 | batch_id; + if (shard_seq_num != (shard_seq_num & 0xFFFFFFFFFFFF)) { throw std::invalid_argument("shard_id is too large"); } + if (batch_id != (batch_id & 0x7FFF)) { throw std::invalid_argument("batch_id is too large"); } + // type_bit (1 bit) | shard_id (48 bits) | batch_id (15 bits) + value = static_cast< uint64_t >(1) << 63 | (shard_seq_num) << 15 | batch_id; } explicit objId(snp_obj_id_t value) : value(value) { shard_seq_num = (value >> 15) & 0xFFFFFFFFFFFF; diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 3cd5de400..7c0d3521d 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -25,7 +25,7 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c } case ReplicationMessageType::CREATE_SHARD_MSG: case ReplicationMessageType::SEAL_SHARD_MSG: { - home_object_->on_shard_message_commit(lsn, header, pbas[0], repl_dev(), ctx); + home_object_->on_shard_message_commit(lsn, header, repl_dev(), ctx); break; } @@ -131,6 +131,7 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, } default: { + LOGW("unsupported message type in rollback, lsn={}, mesType={}", lsn, msg_header->msg_type); break; } } @@ -203,62 +204,7 @@ void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& homestore::ReplResult< homestore::blk_alloc_hints > ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size, cintrusive< homestore::repl_req_ctx >& hs_ctx) { - const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); - switch (msg_header->msg_type) { - case ReplicationMessageType::CREATE_SHARD_MSG: { - pg_id_t pg_id = msg_header->pg_id; - // check whether the pg exists - if (!home_object_->pg_exists(pg_id)) { - LOGI("shardID=0x{:x}, pg={}, shard=0x{:x}, can not find pg={} when getting blk_alloc_hint", - msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width), - (msg_header->shard_id & homeobject::shard_mask), pg_id); - // TODO:: add error code to indicate the pg not found in homestore side - return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); - } - - auto v_chunkID = home_object_->resolve_v_chunk_id_from_msg(header); - if (!v_chunkID.has_value()) { - LOGW("shardID=0x{:x}, pg={}, shard=0x{:x}, can not resolve v_chunk_id from msg", msg_header->shard_id, - (msg_header->shard_id >> homeobject::shard_width), (msg_header->shard_id & homeobject::shard_mask)); - return folly::makeUnexpected(homestore::ReplServiceError::FAILED); - } - homestore::blk_alloc_hints hints; - // Both chunk_num_t and pg_id_t are of type uint16_t. - static_assert(std::is_same< pg_id_t, uint16_t >::value, "pg_id_t is not uint16_t"); - static_assert(std::is_same< homestore::chunk_num_t, uint16_t >::value, "chunk_num_t is not uint16_t"); - homestore::chunk_num_t v_chunk_id = v_chunkID.value(); - hints.application_hint = ((uint64_t)pg_id << 16) | v_chunk_id; - if (hs_ctx->is_proposer()) { hints.reserved_blks = home_object_->get_reserved_blks(); } - - auto tid = hs_ctx ? hs_ctx->traceID() : 0; - LOGD("tid={}, get_blk_alloc_hint for creating shard, select vchunk_id={} for pg={}, shardID={}", tid, - v_chunk_id, pg_id, msg_header->shard_id); - - return hints; - } - - case ReplicationMessageType::SEAL_SHARD_MSG: { - auto p_chunkID = home_object_->get_shard_p_chunk_id(msg_header->shard_id); - if (!p_chunkID.has_value()) { - LOGW("shardID=0x{:x}, pg={}, shard=0x{:x}, shard does not exist, underlying engine will retry this later", - msg_header->shard_id, (msg_header->shard_id >> homeobject::shard_width), - (msg_header->shard_id & homeobject::shard_mask)); - return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); - } - homestore::blk_alloc_hints hints; - hints.chunk_id_hint = p_chunkID.value(); - return hints; - } - - case ReplicationMessageType::PUT_BLOB_MSG: - return home_object_->blob_put_get_blk_alloc_hints(header, hs_ctx); - - default: { - LOGW("not support msg type for {} in get_blk_alloc_hints", msg_header->msg_type); - break; - } - } - return homestore::blk_alloc_hints(); + return home_object_->blob_put_get_blk_alloc_hints(header, hs_ctx); } void ReplicationStateMachine::on_start_replace_member(const std::string& task_id, @@ -627,51 +573,18 @@ folly::Future< std::error_code > ReplicationStateMachine::on_fetch_data(const in LOGD("fetch data with lsn={}, msg type={}", lsn, msg_header->msg_type); - // for nuobject case, we can make this assumption, since we use append_blk_allocator. - RELEASE_ASSERT(sgs.iovs.size() == 1, "sgs iovs size should be 1, lsn={}, msg_type={}", lsn, msg_header->msg_type); - - auto const total_size = local_blk_id.blk_count() * repl_dev()->get_blk_size(); - RELEASE_ASSERT(total_size == sgs.size, - "total_blk_size does not match, lsn={}, msg_type={}, expected size={}, given buffer size={}", lsn, - msg_header->msg_type, total_size, sgs.size); - - auto given_buffer = (uint8_t*)(sgs.iovs[0].iov_base); - std::memset(given_buffer, 0, total_size); + if (msg_header->msg_type == ReplicationMessageType::PUT_BLOB_MSG) { + // for nuobject case, we can make this assumption, since we use append_blk_allocator. + RELEASE_ASSERT(sgs.iovs.size() == 1, "sgs iovs size should be 1, lsn={}, msg_type={}", lsn, + msg_header->msg_type); - // in homeobject, we have three kinds of requests that will write data(thus fetch_data might happen) to a - // chunk: - // 1 create_shard : will write a shard header to a chunk - // 2 seal_shard : will write a shard footer to a chunk - // 3 put_blob: will write user data to a chunk + auto const total_size = local_blk_id.blk_count() * repl_dev()->get_blk_size(); + RELEASE_ASSERT(total_size == sgs.size, + "total_blk_size does not match, lsn={}, msg_type={}, expected size={}, given buffer size={}", + lsn, msg_header->msg_type, total_size, sgs.size); - // for any type that writes data to a chunk, we need to handle the fetch_data request for it. - - switch (msg_header->msg_type) { - case ReplicationMessageType::CREATE_SHARD_MSG: - case ReplicationMessageType::SEAL_SHARD_MSG: { - // this function only returns data, not care about raft related logic, so no need to check the existence of - // shard, just return the shard header/footer directly. Also, no need to read the data from disk, generate - // it from Header. - auto sb = - r_cast< HSHomeObject::shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader)); - auto const raw_size = sizeof(HSHomeObject::shard_info_superblk); - auto const expected_size = sisl::round_up(raw_size, repl_dev()->get_blk_size()); - - RELEASE_ASSERT( - sgs.size == expected_size, - "shard metadata size does not match, lsn={}, msg_type={}, expected size={}, given buffer size={}", lsn, - msg_header->msg_type, expected_size, sgs.size); - - // TODO::return error_code if assert fails, so it will not crash here because of the assert failure. - std::memcpy(given_buffer, sb, raw_size); - return folly::makeFuture< std::error_code >(std::error_code{}); - } - - // TODO: for shard header and footer, follower can generate it itself according to header, no need to fetch - // it from leader. this can been done by adding another callback, which will be called before follower tries - // to fetch data. - - case ReplicationMessageType::PUT_BLOB_MSG: { + auto given_buffer = (uint8_t*)(sgs.iovs[0].iov_base); + std::memset(given_buffer, 0, total_size); const auto blob_id = msg_header->blob_id; const auto shard_id = msg_header->shard_id; @@ -687,23 +600,15 @@ folly::Future< std::error_code > ReplicationStateMachine::on_fetch_data(const in throw std::system_error(err); } - // folly future has no machenism to bypass the later thenValue in the then value chain. so for all - // the case that no need to schedule the later async_read, we throw a system_error with no error - // code to bypass the next thenValue. -#ifdef _PRERELEASE - if (iomgr_flip::instance()->test_flip("local_blk_data_invalid")) { - LOGI("Simulating forcing to read by indextable"); - } else if (validate_blob(shard_id, blob_id, given_buffer, total_size)) { - LOGD("local_blk_id matches blob data, lsn={}, blob_id={}, shard=0x{:x}", lsn, blob_id, shard_id); - throw std::system_error(std::error_code{}); - } -#else + // folly future has no machenism to bypass the later thenValue in the then value chain. so for all + // the case that no need to schedule the later async_read, we throw a system_error with no error + // code to bypass the next thenValue. + // if data matches if (validate_blob(shard_id, blob_id, given_buffer, total_size)) { LOGD("local_blk_id matches blob data, lsn={}, blob_id={}, shard_id={}", lsn, blob_id, shard_id); throw std::system_error(std::error_code{}); } -#endif // if data does not match, try to read data according to the index table. this might happen if the // chunk has once been gc. @@ -784,12 +689,10 @@ folly::Future< std::error_code > ReplicationStateMachine::on_fetch_data(const in return ec; }); - } - default: { + } else { LOGW("msg type={}, should not happen in fetch_data rpc", msg_header->msg_type); return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_not_supported)); } - } } bool ReplicationStateMachine::validate_blob(shard_id_t shard_id, blob_id_t blob_id, void* data, size_t size) const { @@ -893,24 +796,6 @@ void ReplicationStateMachine::on_no_space_left(homestore::repl_lsn_t lsn, sisl:: const pg_id_t pg_id = msg_header->pg_id; switch (msg_header->msg_type) { - // this case is only that no_space_left happens when writting shard header block on follower side. - case ReplicationMessageType::CREATE_SHARD_MSG: { - if (!home_object_->pg_exists(pg_id)) { - LOGW("shardID=0x{:x}, shard=0x{:x}, can not find pg={} when handling on_no_space_left", - msg_header->shard_id, (msg_header->shard_id & homeobject::shard_mask), pg_id); - } - auto v_chunkID = home_object_->resolve_v_chunk_id_from_msg(header); - if (!v_chunkID.has_value()) { - LOGW("shardID=0x{:x}, pg={}, shard=0x{:x}, can not resolve v_chunk_id from msg", msg_header->shard_id, - pg_id, (msg_header->shard_id & homeobject::shard_mask)); - } else { - chunk_id = home_object_->chunk_selector()->get_pg_vchunk(pg_id, v_chunkID.value())->get_chunk_id(); - } - - break; - } - - case ReplicationMessageType::SEAL_SHARD_MSG: case ReplicationMessageType::PUT_BLOB_MSG: { auto p_chunkID = home_object_->get_shard_p_chunk_id(msg_header->shard_id); if (!p_chunkID.has_value()) { @@ -921,12 +806,12 @@ void ReplicationStateMachine::on_no_space_left(homestore::repl_lsn_t lsn, sisl:: } else { chunk_id = p_chunkID.value(); } - break; } default: { LOGW("not support msg type for {} in handling on_no_space_left", msg_header->msg_type); + return; } } } diff --git a/src/lib/homestore_backend/resync_shard_data.fbs b/src/lib/homestore_backend/resync_shard_data.fbs index 5f4123f32..2c5fde822 100644 --- a/src/lib/homestore_backend/resync_shard_data.fbs +++ b/src/lib/homestore_backend/resync_shard_data.fbs @@ -3,14 +3,15 @@ native_include "sisl/utility/non_null_ptr.hpp"; namespace homeobject; table ResyncShardMetaData { - shard_id : uint64; // shard id to be created with; + shard_id : uint64; // shard id to be created with; pg_id : uint16; // pg id which this shard belongs to; state : ubyte; // shard state; - created_lsn : uint64; // lsn on shard creation; - created_time : uint64; // shard creation time - last_modified_time : ulong; // shard last modify time - total_capacity_bytes : ulong; // total capacity of the shard - vchunk_id : uint16; // vchunk id + created_lsn : uint64; // lsn on shard creation; + sealed_lsn : uint64; // lsn on shard sealing; + created_time : uint64; // shard creation time + last_modified_time : ulong; // shard last modify time + total_capacity_bytes : ulong; // total capacity of the shard + vchunk_id : uint16; // vchunk id } //ShardMetaData schema is the first batch(batch=0) in the shard transmission diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp index c60e97504..6947f28a3 100644 --- a/src/lib/homestore_backend/snapshot_receive_handler.cpp +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -74,22 +74,29 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar (shard_meta.shard_id() >> homeobject::shard_width), (shard_meta.shard_id() & homeobject::shard_mask)); // Persist shard meta on chunk data + const auto pg_id = shard_meta.pg_id(); + const auto v_chunk_id = shard_meta.vchunk_id(); + sisl::io_blob_safe aligned_buf(sisl::round_up(sizeof(shard_info_superblk), io_align), io_align); shard_info_superblk* shard_sb = r_cast< shard_info_superblk* >(aligned_buf.bytes()); shard_sb->info.id = shard_meta.shard_id(); - shard_sb->info.placement_group = shard_meta.pg_id(); + shard_sb->info.placement_group = pg_id; shard_sb->info.state = static_cast< ShardInfo::State >(shard_meta.state()); shard_sb->info.lsn = shard_meta.created_lsn(); + shard_sb->info.sealed_lsn = shard_meta.sealed_lsn(); shard_sb->info.created_time = shard_meta.created_time(); shard_sb->info.last_modified_time = shard_meta.last_modified_time(); shard_sb->info.available_capacity_bytes = shard_meta.total_capacity_bytes(); shard_sb->info.total_capacity_bytes = shard_meta.total_capacity_bytes(); - shard_sb->v_chunk_id = shard_meta.vchunk_id(); + shard_sb->v_chunk_id = v_chunk_id; homestore::blk_alloc_hints hints; hints.application_hint = static_cast< uint64_t >(ctx_->pg_id) << 16 | shard_sb->v_chunk_id; homestore::MultiBlkId blk_id; + + // here, select_specific_chunk will be called in side alloc_blk, and as a result, the vchunk will be selected, which + // will prevent it to be selected by other create_shard. auto status = homestore::data_service().alloc_blks( sisl::round_up(aligned_buf.size(), homestore::data_service().get_blk_size()), hints, blk_id); if (status != homestore::BlkAllocStatus::SUCCESS) { @@ -134,7 +141,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar } // Now let's create local shard - home_obj_.local_create_shard(shard_sb->info, shard_sb->v_chunk_id, shard_sb->p_chunk_id, blk_id.blk_count()); + home_obj_.local_create_shard(shard_sb->info, shard_sb->v_chunk_id, shard_sb->p_chunk_id); ctx_->shard_cursor = shard_meta.shard_id(); ctx_->cur_batch_num = 0; return 0; @@ -358,13 +365,46 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob auto iter = home_obj_._shard_map.find(ctx_->shard_cursor); state = (*iter->second)->info.state; } + + auto shard_id = ctx_->shard_cursor; + + // shard header takes one blk. + uint8_t occupied_blk_count_by_shard_meta = 1; if (state == ShardInfo::State::SEALED) { home_obj_.chunk_selector()->release_chunk(ctx_->pg_id, v_chunk_id.value()); + + // write a shard footer for a sealed shard. + homestore::MultiBlkId footer_blk; + auto footer_sgs = home_obj_.generate_shard_super_blk_sg_list(shard_id); + // footer can use reserved blks + hints.reserved_blks = home_obj_.get_reserved_blks(); + homestore::data_service() + .async_alloc_write(footer_sgs, hints, footer_blk) + .thenValue([footer_sgs, &shard_id](auto&& err) { + // it does not matter if fail to write shard header/footer, we never read them + if (err) { LOGW("failed to write shard footer blk for shard={}, err={}", shard_id, err.message()); } + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(footer_sgs.iovs[0].iov_base)); + }); + // TODO: take care of the result of writting footer if we need + + // shard footer takes one blk + occupied_blk_count_by_shard_meta++; } + + // update metrics + auto hs_pg = home_obj_.get_hs_pg(ctx_->pg_id); + RELEASE_ASSERT(hs_pg != nullptr, "shardID=0x{:x}, pg={}, shard=0x{:x}, PG not found", shard_id, + (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask)); + const_cast< HS_PG* >(hs_pg)->durable_entities_update([occupied_blk_count_by_shard_meta](auto& de) { + de.total_occupied_blk_count.fetch_add(occupied_blk_count_by_shard_meta, std::memory_order_relaxed); + }); + + // update ctx progress { std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock); ctx_->progress.complete_shards++; } + // We only update the snp info superblk on completion of each shard, since resumption is also shard-level update_snp_info_sb(ctx_->shard_cursor == ctx_->shard_list.front()); } diff --git a/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp b/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp index 7304d38b7..df2b1ffa4 100644 --- a/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp +++ b/src/lib/homestore_backend/tests/homeobj_misc_tests.cpp @@ -141,6 +141,7 @@ TEST_F(HomeObjectFixture, PGBlobIterator) { ASSERT_EQ(shard_msg->pg_id(), pg->pg_info_.id); ASSERT_EQ(shard_msg->state(), static_cast< uint8_t >(shard->info.state)); ASSERT_EQ(shard_msg->created_lsn(), shard->info.lsn); + ASSERT_EQ(shard_msg->sealed_lsn(), shard->info.sealed_lsn); ASSERT_EQ(shard_msg->created_time(), shard->info.created_time); ASSERT_EQ(shard_msg->last_modified_time(), shard->info.last_modified_time); ASSERT_EQ(shard_msg->total_capacity_bytes(), shard->info.total_capacity_bytes); @@ -233,8 +234,9 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { for (uint64_t i = 1; i <= num_shards_per_pg; i++) { shard_ids.push_back(i); } - auto pg_entry = CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, pg->pg_info_.size, pg->pg_info_.expected_member_num, pg->pg_info_.chunk_size, - blob_seq_num, num_shards_per_pg, &members, &shard_ids); + auto pg_entry = + CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, pg->pg_info_.size, pg->pg_info_.expected_member_num, + pg->pg_info_.chunk_size, blob_seq_num, num_shards_per_pg, &members, &shard_ids); builder.Finish(pg_entry); auto pg_meta = GetResyncPGMetaData(builder.GetBufferPointer()); auto ret = handler->process_pg_snapshot_data(*pg_meta); @@ -264,11 +266,11 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { shard.total_capacity_bytes = 1024 * Mi; shard.lsn = snp_lsn; - auto v_chunk_id = _obj_inst->chunk_selector()->get_most_available_blk_chunk(shard.id, pg_id); + auto v_chunk_id = _obj_inst->chunk_selector()->pick_most_available_blk_chunk(shard.id, pg_id); - auto shard_entry = CreateResyncShardMetaData(builder, shard.id, pg_id, static_cast< uint8_t >(shard.state), - shard.lsn, shard.created_time, shard.last_modified_time, - shard.total_capacity_bytes, v_chunk_id.value()); + auto shard_entry = CreateResyncShardMetaData( + builder, shard.id, pg_id, static_cast< uint8_t >(shard.state), shard.lsn, shard.sealed_lsn, + shard.created_time, shard.last_modified_time, shard.total_capacity_bytes, v_chunk_id.value()); builder.Finish(shard_entry); auto shard_meta = GetResyncShardMetaData(builder.GetBufferPointer()); auto status = handler->process_shard_snapshot_data(*shard_meta); @@ -287,6 +289,7 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) { ASSERT_EQ(shard_res.last_modified_time, shard.last_modified_time); ASSERT_EQ(shard_res.total_capacity_bytes, shard.total_capacity_bytes); ASSERT_EQ(shard_res.lsn, shard.lsn); + ASSERT_EQ(shard_res.sealed_lsn, shard.sealed_lsn); // Step 2-2: Test write blob batch data // Generate ResyncBlobDataBatch message diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index 6371906d4..47de25530 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -449,14 +449,6 @@ TEST_F(HomeObjectFixture, BasicPutGetBlobWithPushDataDisabled) { // statemachine set_basic_flip("disable_leader_push_data", std::numeric_limits< int >::max()); - // set the flip to force to read by index table to exercise reading from the index table is working as expected - // 50% percentage will achieve the effect that half of the blobs are read from index table and the other - // half are read by blk_id - - // for now, given_buffer will be filled by the first async_read, so this will pass. - // TODO:: enhence the logic after we have real gc - set_basic_flip("local_blk_data_invalid", std::numeric_limits< int >::max(), 50); - // test recovery with pristine state firstly restart(); @@ -487,13 +479,7 @@ TEST_F(HomeObjectFixture, BasicPutGetBlobWithPushDataDisabled) { // Verify the stats verify_obj_count(num_pgs, num_blobs_per_shard, num_shards_per_pg, false /* deleted */); - - remove_flip("local_blk_data_invalid"); remove_flip("disable_leader_push_data"); } -// TODO:: add a test for no_space_left without flip. - #endif - -// TODO:: add more test cases to verify the push data disabled scenario after we have gc diff --git a/src/lib/homestore_backend/tests/hs_shard_tests.cpp b/src/lib/homestore_backend/tests/hs_shard_tests.cpp index c2e057488..88038bd57 100644 --- a/src/lib/homestore_backend/tests/hs_shard_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_shard_tests.cpp @@ -125,8 +125,6 @@ TEST_F(HomeObjectFixture, ShardManagerRecovery) { auto shard_info = create_shard(pg_id, Mi); auto shard_id = shard_info.id; EXPECT_EQ(ShardInfo::State::OPEN, shard_info.state); - EXPECT_EQ(Mi, shard_info.total_capacity_bytes); - EXPECT_EQ(Mi, shard_info.available_capacity_bytes); EXPECT_EQ(pg_id, shard_info.placement_group); // restart homeobject and check if pg/shard info will be recovered. diff --git a/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp b/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp index 04b44d899..d0f4ca256 100644 --- a/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp +++ b/src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp @@ -177,7 +177,7 @@ TEST_F(HeapChunkSelectorTest, test_identical_layout) { for (int j = 3; j > 0; --j) { ASSERT_EQ(pg_chunk_collection->available_blk_count, start_available_blk_count); - const auto v_chunkID = HCS.get_most_available_blk_chunk(j, pg_id); + const auto v_chunkID = HCS.pick_most_available_blk_chunk(j, pg_id); ASSERT_TRUE(v_chunkID.has_value()); p_chunk_id = pg_chunk_collection->m_pg_chunks[v_chunkID.value()]->get_chunk_id(); ASSERT_EQ(HCS.m_chunks[p_chunk_id]->m_state, ChunkState::INUSE); @@ -216,7 +216,7 @@ TEST_F(HeapChunkSelectorTest, test_identical_layout) { start_available_blk_count -= j; } // all chunks have been given out - ASSERT_FALSE(HCS.get_most_available_blk_chunk(9999, pg_id).has_value()); + ASSERT_FALSE(HCS.pick_most_available_blk_chunk(9999, pg_id).has_value()); } } @@ -378,7 +378,7 @@ TEST_F(HeapChunkSelectorTest, test_recovery) { ASSERT_EQ(pg_chunk_collection->m_pg_chunks[0]->m_state, ChunkState::INUSE); ASSERT_EQ(pg_chunk_collection->m_pg_chunks[1]->m_state, ChunkState::AVAILABLE); - const auto v_chunkID = HCS_recovery.get_most_available_blk_chunk(9999, pg_id); + const auto v_chunkID = HCS_recovery.pick_most_available_blk_chunk(9999, pg_id); ASSERT_TRUE(v_chunkID.has_value()); auto chunk = HCS_recovery.select_specific_chunk(pg_id, v_chunkID.value()); ASSERT_NE(chunk, nullptr); diff --git a/src/lib/memory_backend/mem_shard_manager.cpp b/src/lib/memory_backend/mem_shard_manager.cpp index 80c791725..945bc57ab 100644 --- a/src/lib/memory_backend/mem_shard_manager.cpp +++ b/src/lib/memory_backend/mem_shard_manager.cpp @@ -10,7 +10,7 @@ ShardManager::AsyncResult< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t p trace_id_t tid) { (void)tid; auto const now = get_current_timestamp(); - auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, 0, now, now, size_bytes, size_bytes); + auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, 0, 0, now, now, size_bytes, size_bytes); { auto lg = std::scoped_lock(_pg_lock, _shard_lock); auto pg_it = _pg_map.find(pg_owner);