Skip to content

Commit 09fe0b9

Browse files
committed
address comments
1 parent 4fd93bd commit 09fe0b9

File tree

6 files changed

+62
-139
lines changed

6 files changed

+62
-139
lines changed

src/lib/homestore_backend/heap_chunk_selector.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ std::shared_ptr< const std::vector< homestore::chunk_num_t > > HeapChunkSelector
514514
return p_chunk_ids;
515515
}
516516

517-
std::optional< homestore::chunk_num_t > HeapChunkSelector::get_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id) {
517+
std::optional< homestore::chunk_num_t > HeapChunkSelector::pick_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id) {
518518
std::shared_lock lock_guard(m_chunk_selector_mtx);
519519
auto pg_it = m_per_pg_chunks.find(pg_id);
520520
if (pg_it == m_per_pg_chunks.end()) {
@@ -533,7 +533,6 @@ std::optional< homestore::chunk_num_t > HeapChunkSelector::get_most_available_bl
533533
LOGWARNMOD(homeobject, "No available chunk for pg={}, ctx=0x{:x}", pg_id, ctx);
534534
return std::nullopt;
535535
}
536-
537536
auto v_chunk_id = std::distance(pg_chunks.begin(), max_it);
538537
LOGDEBUGMOD(homeobject, "Picked v_chunk_id={} : [p_chunk_id={}, avail={}], ctx=0x{:x}", v_chunk_id,
539538
pg_chunks[v_chunk_id]->get_chunk_id(), pg_chunks[v_chunk_id]->available_blks(), ctx);

src/lib/homestore_backend/heap_chunk_selector.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class HeapChunkSelector : public homestore::ChunkSelector {
126126
* @param pg_id The ID of the pg.
127127
* @return An optional chunk_num_t value representing v_chunk_id, or std::nullopt if no space left.
128128
*/
129-
std::optional< chunk_num_t > get_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id);
129+
std::optional< chunk_num_t > pick_most_available_blk_chunk(uint64_t ctx, pg_id_t pg_id);
130130

131131
// this should be called on each pg meta blk found
132132
bool recover_pg_chunks(pg_id_t pg_id, std::vector< chunk_num_t >&& p_chunk_ids);

src/lib/homestore_backend/hs_shard_manager.cpp

Lines changed: 37 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow
147147
SLOGD(tid, new_shard_id, "Create shard request: pg={}, size={}", pg_owner, size_bytes);
148148

149149
// select chunk for shard.
150-
const auto v_chunkID = chunk_selector()->get_most_available_blk_chunk(new_shard_id, pg_owner);
150+
const auto v_chunkID = chunk_selector()->pick_most_available_blk_chunk(new_shard_id, pg_owner);
151151

152152
if (!v_chunkID.has_value()) {
153153
SLOGW(tid, new_shard_id, "no availble chunk left to create shard for pg={}", pg_owner);
@@ -176,7 +176,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow
176176

177177
bool res = chunk_selector()->release_chunk(pg_owner, v_chunk_id);
178178
RELEASE_ASSERT(res, "Failed to release v_chunk_id={}, pg={}", v_chunk_id, pg_owner);
179-
// try to gc this chunk to avoid emergent gc in the future
179+
180180
auto gc_mgr = gc_manager();
181181
if (gc_mgr->is_started()) { gc_manager()->submit_gc_task(task_priority::normal, pchunk_id); }
182182

@@ -317,17 +317,17 @@ bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& he
317317
const auto& shard_id = msg_header->shard_id;
318318

319319
if (msg_header->corrupted()) {
320-
LOGE("replication message header is corrupted with crc error when pre_committing shard message, lsn={}, "
321-
"traceID={}, shard={}",
322-
lsn, tid, shard_id);
320+
SLOGE(tid, shard_id,
321+
"replication message header is corrupted with crc error when pre_committing shard message, lsn={}", lsn);
323322
RELEASE_ASSERT(false, "shardID=0x{:x}, pg={}, shard=0x{:x}, failed to pre_commit shard msg", shard_id,
324323
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask));
325324
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
326325
return false;
327326
}
328327

329-
switch (msg_header->msg_type) {
330-
case ReplicationMessageType::SEAL_SHARD_MSG: {
328+
SLOGD(tid, shard_id, "pre_commit shard message, type={}, lsn= {}", msg_header->msg_type, lsn);
329+
330+
if (msg_header->msg_type == ReplicationMessageType::SEAL_SHARD_MSG) {
331331
std::scoped_lock lock_guard(_shard_lock);
332332
auto iter = _shard_map.find(shard_id);
333333
RELEASE_ASSERT(iter != _shard_map.end(), "shardID=0x{:x}, pg={}, shard=0x{:x}, shard does not exist", shard_id,
@@ -340,27 +340,8 @@ bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& he
340340
} else {
341341
SLOGW(tid, shard_id, "try to seal an unopened shard");
342342
}
343-
break;
344-
}
345-
// in pre_commit of create_shard, we select_chunk(mark chunk as inuse state) for v_chunk, so that when rollbacking
346-
// create_shard, we can also release the chunk.
347-
case ReplicationMessageType::CREATE_SHARD_MSG: {
348-
const auto v_chunk_id = msg_header->vchunk_id;
349-
const auto pg_id = msg_header->pg_id;
350-
351-
// for leader, the chunk has been selected in create_shard, so when selecting, the chunk will be found as
352-
// state::used.
353-
auto chunk = chunk_selector_->select_specific_chunk(pg_id, v_chunk_id);
354-
RELEASE_ASSERT(chunk != nullptr, "chunk selection failed with v_chunk_id={} in pg={}", v_chunk_id, pg_id);
355-
const auto p_chunk_id = homestore::VChunk(chunk).get_chunk_id();
356-
SLOGD(tid, shard_id, "pchunk {} is selected for vchunk {} in pg {} for creating shard", p_chunk_id, v_chunk_id,
357-
pg_id);
358343
}
359344

360-
default: {
361-
break;
362-
}
363-
}
364345
return true;
365346
}
366347

@@ -372,35 +353,19 @@ void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& head
372353
}
373354
auto tid = hs_ctx ? hs_ctx->traceID() : 0;
374355
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
375-
const auto shard_id = msg_header->shard_id;
356+
376357
if (msg_header->corrupted()) {
377-
LOGW("replication message header is corrupted with crc error, lsn={}, traceID={}", lsn, tid);
378-
RELEASE_ASSERT(false, "shardID=0x{:x}, pg={}, shard=0x{:x}, failed to rollback create_shard msg", shard_id,
379-
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask));
358+
RELEASE_ASSERT(false,
359+
"replication message header is corrupted with crc error in on_rollback, lsn={}, traceID={}", lsn,
360+
tid);
380361
return;
381362
}
382363

364+
const auto shard_id = msg_header->shard_id;
383365
switch (msg_header->msg_type) {
384366
case ReplicationMessageType::CREATE_SHARD_MSG: {
385-
if (ctx) {
386-
ctx->promise_.setValue(folly::makeUnexpected(ShardError::RETRY_REQUEST));
387-
} else {
388-
// we have already added release_chunk logic to thenValue of hoemobject#create_shard in originator, so here
389-
// we just need to release_chunk for non-originater case since it will bring a bug if a chunk is released
390-
// for two times. for exampele, as a originator:
391-
392-
// t1 : chunk1 is released in the rollback of create_shard, the chunk state is marked as available
393-
// t2 : chunk1 is select by a new create shard (shard1), the chunk state is marked as inuse
394-
// t3 : chunk1 is released in thenValue of create_shard, the chunk state is marked as available
395-
// t4 : chunk1 is select by a new create shard (shard2), the chunk state is marked as inuse
396-
// now, shard1 and shard2 hold the same chunk.
397-
bool res = release_chunk_based_on_create_shard_message(header);
398-
if (!res) {
399-
RELEASE_ASSERT(false,
400-
"shardID=0x{:x}, pg={}, shard=0x{:x}, failed to release chunk based on create shard msg",
401-
shard_id, (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask));
402-
}
403-
}
367+
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::RETRY_REQUEST)); }
368+
SLOGD(tid, shard_id, "rollback create_shard message, lsn={}", lsn);
404369
break;
405370
}
406371
case ReplicationMessageType::SEAL_SHARD_MSG: {
@@ -419,8 +384,10 @@ void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& head
419384

420385
// TODO:set a proper error code
421386
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::RETRY_REQUEST)); }
387+
break;
422388
}
423389
default: {
390+
SLOGE(tid, shard_id, "unsupported op type={} for rollbacking shard message", msg_header->msg_type);
424391
break;
425392
}
426393
}
@@ -455,15 +422,12 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, sha
455422
}
456423
auto tid = hs_ctx ? hs_ctx->traceID() : 0;
457424
auto header = r_cast< const ReplicationMessageHeader* >(h.cbytes());
458-
const auto shard_id = header->shard_id;
459425
if (header->corrupted()) {
460-
LOGE("replication message header is corrupted with crc error when committing shard message, lsn={}, "
461-
"traceID={}, shard={}",
462-
lsn, tid, shard_id);
463-
RELEASE_ASSERT(false, "shardID=0x{:x}, pg={}, shard=0x{:x}, failed to pre_commit shard msg", shard_id,
464-
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask));
426+
RELEASE_ASSERT(false, "replication message header is corrupted with crc error in on_commit, lsn={}, traceID={}",
427+
lsn, tid);
465428
return;
466429
}
430+
const auto shard_id = header->shard_id;
467431

468432
RELEASE_ASSERT(header->msg_type == ReplicationMessageType::CREATE_SHARD_MSG ||
469433
header->msg_type == ReplicationMessageType::SEAL_SHARD_MSG,
@@ -539,6 +503,15 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, sha
539503

540504
switch (header->msg_type) {
541505
case ReplicationMessageType::CREATE_SHARD_MSG: {
506+
// 1 select sepecific chunk.
507+
// for leader, the chunk has been selected in create_shard, so when selecting, the chunk will be found as
508+
// state::used.
509+
auto chunk = chunk_selector_->select_specific_chunk(pg_id, vchunk_id);
510+
RELEASE_ASSERT(chunk != nullptr, "chunk selection failed with v_chunk_id={} in pg={}", vchunk_id, pg_id);
511+
SLOGD(tid, shard_id, "pchunk {} is selected for vchunk {} in pg {} for creating shard", pchunk, vchunk_id,
512+
pg_id);
513+
514+
// 2 add shard meta blk
542515
shard_info.id = shard_id;
543516
shard_info.placement_group = pg_id;
544517
shard_info.created_time = get_current_timestamp();
@@ -566,18 +539,6 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, sha
566539
if (shard_info.state == ShardInfo::State::SEALED) {
567540
bool res = chunk_selector()->release_chunk(pg_id, vchunk_id);
568541
RELEASE_ASSERT(res, "Failed to release v_chunk_id={}, pg={}", vchunk_id, pg_id);
569-
570-
const static uint64_t shard_super_blk_count{
571-
sisl::round_up(sizeof(shard_info_superblk), homestore::data_service().get_blk_size()) /
572-
homestore::data_service().get_blk_size()};
573-
574-
if (vchunk->available_blks() <= shard_super_blk_count * 2) {
575-
// try to gc this chunk to avoid emergent gc in the future
576-
auto gc_mgr = gc_manager();
577-
if (gc_mgr->is_started()) {
578-
gc_manager()->submit_gc_task(task_priority::normal, vchunk->get_chunk_id());
579-
}
580-
}
581542
update_shard_in_map(shard_info);
582543
} else
583544
RELEASE_ASSERT(false, "try to commit SEAL_SHARD_MSG but shard state is not sealed, shard_id={}", shard_id);
@@ -589,11 +550,15 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, sha
589550
break;
590551
}
591552

592-
// write shard header/footer blk
593-
// TODO:: check the return status of async if we care about the completeness
594-
595-
// in log replay case, one more shard header/footer will be written again. it does not matter.
596-
homestore::data_service().async_write(generate_shard_super_blk_sg_list(shard_id), blkids);
553+
// write shard header/footer blk. in log replay case, one more shard header/footer will be written again. it does
554+
// not matter.
555+
sisl::sg_list sgs = generate_shard_super_blk_sg_list(shard_id);
556+
RELEASE_ASSERT(sgs.iovs.size() == 1, "sgs.iovs.size() for shard header/footer should be 1, but not!");
557+
homestore::data_service().async_write(sgs, blkids).thenValue([sgs, lsn, msgtype = header->msg_type](auto&& err) {
558+
// it does not matter if fail to write shard header/footer, we never read them
559+
if (err) { LOGW("failed to write shard super blk, err={}, lsn={}, msgType={}", err.message(), lsn, msgtype); }
560+
iomanager.iobuf_free(reinterpret_cast< uint8_t* >(sgs.iovs[0].iov_base));
561+
});
597562

598563
auto hs_pg = get_hs_pg(pg_id);
599564
RELEASE_ASSERT(hs_pg != nullptr, "shardID=0x{:x}, pg={}, shard=0x{:x}, PG not found", shard_id,

src/lib/homestore_backend/replication_state_machine.cpp

Lines changed: 16 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header,
131131
}
132132

133133
default: {
134+
LOGW("unsupported message type in rollback, lsn={}, mesType={}", lsn, msg_header->msg_type);
134135
break;
135136
}
136137
}
@@ -572,51 +573,18 @@ folly::Future< std::error_code > ReplicationStateMachine::on_fetch_data(const in
572573

573574
LOGD("fetch data with lsn={}, msg type={}", lsn, msg_header->msg_type);
574575

575-
// for nuobject case, we can make this assumption, since we use append_blk_allocator.
576-
RELEASE_ASSERT(sgs.iovs.size() == 1, "sgs iovs size should be 1, lsn={}, msg_type={}", lsn, msg_header->msg_type);
576+
if (msg_header->msg_type == ReplicationMessageType::PUT_BLOB_MSG) {
577+
// for nuobject case, we can make this assumption, since we use append_blk_allocator.
578+
RELEASE_ASSERT(sgs.iovs.size() == 1, "sgs iovs size should be 1, lsn={}, msg_type={}", lsn,
579+
msg_header->msg_type);
577580

578-
auto const total_size = local_blk_id.blk_count() * repl_dev()->get_blk_size();
579-
RELEASE_ASSERT(total_size == sgs.size,
580-
"total_blk_size does not match, lsn={}, msg_type={}, expected size={}, given buffer size={}", lsn,
581-
msg_header->msg_type, total_size, sgs.size);
581+
auto const total_size = local_blk_id.blk_count() * repl_dev()->get_blk_size();
582+
RELEASE_ASSERT(total_size == sgs.size,
583+
"total_blk_size does not match, lsn={}, msg_type={}, expected size={}, given buffer size={}",
584+
lsn, msg_header->msg_type, total_size, sgs.size);
582585

583-
auto given_buffer = (uint8_t*)(sgs.iovs[0].iov_base);
584-
std::memset(given_buffer, 0, total_size);
585-
586-
// in homeobject, we have three kinds of requests that will write data(thus fetch_data might happen) to a
587-
// chunk:
588-
// 1 create_shard : will write a shard header to a chunk
589-
// 2 seal_shard : will write a shard footer to a chunk
590-
// 3 put_blob: will write user data to a chunk
591-
592-
// for any type that writes data to a chunk, we need to handle the fetch_data request for it.
593-
594-
switch (msg_header->msg_type) {
595-
case ReplicationMessageType::CREATE_SHARD_MSG:
596-
case ReplicationMessageType::SEAL_SHARD_MSG: {
597-
// this function only returns data, not care about raft related logic, so no need to check the existence of
598-
// shard, just return the shard header/footer directly. Also, no need to read the data from disk, generate
599-
// it from Header.
600-
auto sb =
601-
r_cast< HSHomeObject::shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
602-
auto const raw_size = sizeof(HSHomeObject::shard_info_superblk);
603-
auto const expected_size = sisl::round_up(raw_size, repl_dev()->get_blk_size());
604-
605-
RELEASE_ASSERT(
606-
sgs.size == expected_size,
607-
"shard metadata size does not match, lsn={}, msg_type={}, expected size={}, given buffer size={}", lsn,
608-
msg_header->msg_type, expected_size, sgs.size);
609-
610-
// TODO::return error_code if assert fails, so it will not crash here because of the assert failure.
611-
std::memcpy(given_buffer, sb, raw_size);
612-
return folly::makeFuture< std::error_code >(std::error_code{});
613-
}
614-
615-
// TODO: for shard header and footer, follower can generate it itself according to header, no need to fetch
616-
// it from leader. this can been done by adding another callback, which will be called before follower tries
617-
// to fetch data.
618-
619-
case ReplicationMessageType::PUT_BLOB_MSG: {
586+
auto given_buffer = (uint8_t*)(sgs.iovs[0].iov_base);
587+
std::memset(given_buffer, 0, total_size);
620588

621589
const auto blob_id = msg_header->blob_id;
622590
const auto shard_id = msg_header->shard_id;
@@ -632,23 +600,15 @@ folly::Future< std::error_code > ReplicationStateMachine::on_fetch_data(const in
632600
throw std::system_error(err);
633601
}
634602

635-
// folly future has no machenism to bypass the later thenValue in the then value chain. so for all
636-
// the case that no need to schedule the later async_read, we throw a system_error with no error
637-
// code to bypass the next thenValue.
638-
#ifdef _PRERELEASE
639-
if (iomgr_flip::instance()->test_flip("local_blk_data_invalid")) {
640-
LOGI("Simulating forcing to read by indextable");
641-
} else if (validate_blob(shard_id, blob_id, given_buffer, total_size)) {
642-
LOGD("local_blk_id matches blob data, lsn={}, blob_id={}, shard=0x{:x}", lsn, blob_id, shard_id);
643-
throw std::system_error(std::error_code{});
644-
}
645-
#else
603+
// folly future has no machenism to bypass the later thenValue in the then value chain. so for all
604+
// the case that no need to schedule the later async_read, we throw a system_error with no error
605+
// code to bypass the next thenValue.
606+
646607
// if data matches
647608
if (validate_blob(shard_id, blob_id, given_buffer, total_size)) {
648609
LOGD("local_blk_id matches blob data, lsn={}, blob_id={}, shard_id={}", lsn, blob_id, shard_id);
649610
throw std::system_error(std::error_code{});
650611
}
651-
#endif
652612

653613
// if data does not match, try to read data according to the index table. this might happen if the
654614
// chunk has once been gc.
@@ -729,12 +689,10 @@ folly::Future< std::error_code > ReplicationStateMachine::on_fetch_data(const in
729689

730690
return ec;
731691
});
732-
}
733-
default: {
692+
} else {
734693
LOGW("msg type={}, should not happen in fetch_data rpc", msg_header->msg_type);
735694
return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_not_supported));
736695
}
737-
}
738696
}
739697

740698
bool ReplicationStateMachine::validate_blob(shard_id_t shard_id, blob_id_t blob_id, void* data, size_t size) const {

src/lib/homestore_backend/tests/homeobj_misc_tests.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,9 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
233233
for (uint64_t i = 1; i <= num_shards_per_pg; i++) {
234234
shard_ids.push_back(i);
235235
}
236-
auto pg_entry = CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, pg->pg_info_.size, pg->pg_info_.expected_member_num, pg->pg_info_.chunk_size,
237-
blob_seq_num, num_shards_per_pg, &members, &shard_ids);
236+
auto pg_entry =
237+
CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, pg->pg_info_.size, pg->pg_info_.expected_member_num,
238+
pg->pg_info_.chunk_size, blob_seq_num, num_shards_per_pg, &members, &shard_ids);
238239
builder.Finish(pg_entry);
239240
auto pg_meta = GetResyncPGMetaData(builder.GetBufferPointer());
240241
auto ret = handler->process_pg_snapshot_data(*pg_meta);
@@ -264,7 +265,7 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
264265
shard.total_capacity_bytes = 1024 * Mi;
265266
shard.lsn = snp_lsn;
266267

267-
auto v_chunk_id = _obj_inst->chunk_selector()->get_most_available_blk_chunk(shard.id, pg_id);
268+
auto v_chunk_id = _obj_inst->chunk_selector()->pick_most_available_blk_chunk(shard.id, pg_id);
268269

269270
auto shard_entry = CreateResyncShardMetaData(builder, shard.id, pg_id, static_cast< uint8_t >(shard.state),
270271
shard.lsn, shard.created_time, shard.last_modified_time,

0 commit comments

Comments
 (0)