Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeBlocksConan(ConanFile):
name = "homeblocks"
version = "1.0.16"
version = "1.0.17"
homepage = "https://github.com/eBay/HomeBlocks"
description = "Block Store built on HomeStore"
topics = ("ebay")
Expand Down
1 change: 0 additions & 1 deletion src/lib/volume/index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ class VolumeIndexValue : public homestore::BtreeIntervalValue {
// Get the next blk num and checksum
m_blkid_suffix += n;
auto curr_lba = ctx->start_lba + n;
LOGINFO("shift n={} blk_num={} curr_lba={}", n, m_blkid_suffix, curr_lba);
DEBUG_ASSERT(ctx->block_info->find(curr_lba) != ctx->block_info->end(), "Invalid index");
m_checksum = (*ctx->block_info)[curr_lba].checksum;
}
Expand Down
123 changes: 95 additions & 28 deletions src/lib/volume/tests/test_volume_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <string>

#include <boost/icl/interval_set.hpp>
#include <folly/init/Init.h>
#include <gtest/gtest.h>
#include <sisl/options/options.h>
Expand Down Expand Up @@ -56,6 +57,7 @@ class VolumeIOImpl {
public:
void create_volume() {
auto vinfo = gen_vol_info(m_volume_id_++);
m_vol_name = vinfo.name;
m_vol_id = vinfo.id;

auto vol_mgr = g_helper->inst()->volume_manager();
Expand All @@ -78,13 +80,39 @@ class VolumeIOImpl {
ASSERT_TRUE(m_vol_ptr != nullptr);
}

void get_random_non_overlapping_lba(lba_t& start_lba, uint32_t& nblks, uint64_t max_blks) {
if (start_lba != 0 && nblks != 0) {
lba_t end_lba = start_lba + nblks - 1;
auto new_range = boost::icl::interval< int >::closed(start_lba, end_lba);
// For user provided lba and nblks, check if they are already in flight.
std::lock_guard lock(m_mutex);
ASSERT_TRUE(m_inflight_ios.find(new_range) == m_inflight_ios.end());
m_inflight_ios.insert(new_range);
return;
}

do {
// Generate lba which are not overlapped with the inflight ios, otherwise
// we cant decide which io completed last and cant verify the data.
start_lba = rand() % max_blks;
nblks = std::max(1, rand() % 64);
lba_t end_lba = start_lba + nblks - 1;
auto new_range = boost::icl::interval< int >::closed(start_lba, end_lba);
std::lock_guard lock(m_mutex);
if (m_inflight_ios.find(new_range) == m_inflight_ios.end()) {
m_inflight_ios.insert(new_range);
break;
}

} while (true);
}

auto build_random_data(lba_t& start_lba, uint32_t& nblks) {
// Write upto 1-64 nblks * 4k = 256k size.
auto info = m_vol_ptr->info();
uint64_t page_size = info->page_size;
uint64_t max_blks = info->size_bytes / page_size;
start_lba = start_lba == 0 ? rand() % max_blks : start_lba;
nblks = nblks == 0 ? std::max(1, rand() % 64) : nblks;
get_random_non_overlapping_lba(start_lba, nblks, max_blks);
nblks = std::min(static_cast< uint64_t >(nblks), max_blks - static_cast< uint64_t >(start_lba));

auto data_size = nblks * page_size;
Expand All @@ -94,9 +122,13 @@ class VolumeIOImpl {
uint64_t data_pattern = ((long long)rand() << 32) | rand();
test_common::HBTestHelper::fill_data_buf(data_bytes, page_size, data_pattern);
data_bytes += page_size;
// Store the lba to pattern mapping
m_lba_data[lba] = data_pattern;
LOGINFO("Generate data lba={} pattern={}", lba, data_pattern);
{
// Store the lba to pattern mapping
std::lock_guard lock(m_mutex);
m_lba_data[lba] = data_pattern;
}

LOGDEBUG("Generate data vol={} lba={} pattern={}", m_vol_name, lba, data_pattern);
lba++;
}

Expand All @@ -112,8 +144,13 @@ class VolumeIOImpl {
auto vol_mgr = g_helper->inst()->volume_manager();
vol_mgr->write(m_vol_ptr, req)
.via(&folly::InlineExecutor::instance())
.thenValue([this, data, &waiter](auto&& result) {
.thenValue([this, data, req, &waiter](auto&& result) {
ASSERT_FALSE(result.hasError());
{
std::lock_guard lock(m_mutex);
m_inflight_ios.erase(boost::icl::interval< int >::closed(req->lba, req->lba + req->nlbas - 1));
}

waiter.one_complete();
});
});
Expand All @@ -125,22 +162,28 @@ class VolumeIOImpl {
auto data = build_random_data(start_lba, nblks);
vol_interface_req_ptr req(new vol_interface_req{data->bytes(), start_lba, nblks});
auto vol_mgr = g_helper->inst()->volume_manager();
vol_mgr->write(m_vol_ptr, req).via(&folly::InlineExecutor::instance()).thenValue([this, data](auto&& result) {
ASSERT_FALSE(result.hasError());
g_helper->runner().next_task();
});
vol_mgr->write(m_vol_ptr, req)
.via(&folly::InlineExecutor::instance())
.thenValue([this, req, data](auto&& result) {
ASSERT_FALSE(result.hasError());
{
std::lock_guard lock(m_mutex);
m_inflight_ios.erase(boost::icl::interval< int >::closed(req->lba, req->lba + req->nlbas - 1));
}
g_helper->runner().next_task();
});
}

void verify_data() {
void verify_all_data() {
for (auto& [lba, data_pattern] : m_lba_data) {
auto buffer = iomanager.iobuf_alloc(512, 4096);
vol_interface_req_ptr req(new vol_interface_req{buffer, lba, 1});

auto vol_mgr = g_helper->inst()->volume_manager();
vol_mgr->read(m_vol_ptr, req).get();
// LOGINFO("Data read={}", fmt::format("{}", spdlog::to_hex((buffer), (buffer) + (128))));
test_common::HBTestHelper::validate_data_buf(buffer, 4096, data_pattern);
LOGINFO("Verify data lba={} pattern={} {}", lba, data_pattern, *r_cast< uint64_t* >(buffer));
LOGDEBUG("Verify data vol={} lba={} pattern={} {}", m_vol_name, lba, data_pattern,
*r_cast< uint64_t* >(buffer));
iomanager.iobuf_free(buffer);
}
}
Expand All @@ -160,10 +203,14 @@ class VolumeIOImpl {
#ifdef _PRERELEASE
flip::FlipClient m_fc{iomgr_flip::instance()};
#endif
std::mutex m_mutex;
std::string m_vol_name;
VolumePtr m_vol_ptr;
volume_id_t m_vol_id;
static inline uint32_t m_volume_id_{1};
// Mapping from lba to data patttern.
std::map< lba_t, uint64_t > m_lba_data;
boost::icl::interval_set< int > m_inflight_ios;
};

class VolumeIOTest : public ::testing::Test {
Expand Down Expand Up @@ -199,14 +246,21 @@ class VolumeIOTest : public ::testing::Test {
LOGINFO("IO completed");
}

void verify_data(shared< VolumeIOImpl > vol_impl = nullptr) {
void verify_all_data(shared< VolumeIOImpl > vol_impl = nullptr) {
if (vol_impl) {
vol_impl->verify_data();
vol_impl->verify_all_data();
return;
}

for (auto& vol_impl : m_vols_impl) {
vol_impl->verify_data();
vol_impl->verify_all_data();
}
}

void restart(int shutdown_delay) {
g_helper->restart(shutdown_delay);
for (auto& vol_impl : m_vols_impl) {
vol_impl->reset();
}
}

Expand All @@ -225,23 +279,22 @@ TEST_F(VolumeIOTest, SingleVolumeWriteData) {
LOGINFO("Write and verify data with num_iter={} start={} nblks={}", num_iter, start_lba, nblks);
for (uint32_t i = 0; i < num_iter; i++) {
generate_io_single(vol, start_lba, nblks);
verify_data(vol);
verify_all_data(vol);
}

// Verify data after restart.
g_helper->restart(10);
vol->reset();
restart(5);

LOGINFO("Verify data");
verify_data(vol);
verify_all_data(vol);

// Write and verify again on same LBA range to single volume multiple times.
LOGINFO("Write and verify data with num_iter={} start={} nblks={}", num_iter, start_lba, nblks);
for (uint32_t i = 0; i < num_iter; i++) {
generate_io_single(vol, start_lba, nblks);
}

verify_data(vol);
verify_all_data(vol);
LOGINFO("SingleVolumeWriteData test done.");
}

Expand All @@ -251,24 +304,38 @@ TEST_F(VolumeIOTest, MultipleVolumeWriteData) {
generate_io();

LOGINFO("Verify data");
verify_data();
verify_all_data();

restart(5);

LOGINFO("Verify data again");
verify_all_data();

LOGINFO("Write data randomly");
generate_io();

LOGINFO("Verify data");
verify_all_data();

LOGINFO("MultipleVolumeWriteData test done.");
}

int main(int argc, char* argv[]) {
int parsed_argc = argc;
::testing::InitGoogleTest(&parsed_argc, argv);
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_common_setup, test_volume_io_setup, homeblocks);
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");
parsed_argc = 1;
auto f = ::folly::Init(&parsed_argc, &argv, true);
char** orig_argv = argv;

std::vector< std::string > args;
for (int i = 0; i < argc; ++i) {
args.emplace_back(argv[i]);
}

g_helper = std::make_unique< test_common::HBTestHelper >("test_volume_io", args, argv);
::testing::InitGoogleTest(&parsed_argc, argv);
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_common_setup, test_volume_io_setup, homeblocks);
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");
parsed_argc = 1;
auto f = ::folly::Init(&parsed_argc, &argv, true);

g_helper = std::make_unique< test_common::HBTestHelper >("test_volume_io", args, orig_argv);
g_helper->setup();
auto ret = RUN_ALL_TESTS();
g_helper->teardown();
Expand Down
48 changes: 39 additions & 9 deletions src/lib/volume_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,27 +260,57 @@ VolumeManager::NullAsyncResult HomeBlocksImpl::unmap(const VolumePtr& vol, const
void HomeBlocksImpl::submit_io_batch() { RELEASE_ASSERT(false, "submit_io_batch Not implemented"); }

void HomeBlocksImpl::on_write(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const std::vector< homestore::MultiBlkId >& blkids,
const std::vector< homestore::MultiBlkId >& new_blkids,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add a detailed API signature in listener.hpp on_commit API, I know there is a detailed API signature from HomeStore side, however HomeBlocks write path we fill in header/key with some meaning, if we can document it from there, it would be great for others to follow this write path.

cintrusive< homestore::repl_req_ctx >& ctx) {
repl_result_ctx< VolumeManager::NullResult >* repl_ctx{nullptr};
if (ctx) { repl_ctx = boost::static_pointer_cast< repl_result_ctx< VolumeManager::NullResult > >(ctx).get(); }
auto msg_header = r_cast< HomeBlksMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));

// Avoid expensive lock during normal write flow.
// Key contains the list of checksums and old blkids. Before we ack the client
// request, we free the old blkid's. Also if its recovery we overwrite the index
// with checksum and new blkid's. We need to overwrite index during recovery as all the
// index writes may not be flushed to disk during crash.
VolumePtr vol_ptr{nullptr};
if (repl_ctx) {
vol_ptr = repl_ctx->vol_ptr_;
} else {
// For recovery path there wont repl_ctx and vol_ptr.
auto journal_entry = r_cast< const VolJournalEntry* >(key.cbytes());
auto key_buffer = r_cast< const uint8_t* >(journal_entry + 1);

if (repl_ctx == nullptr) {
// For recovery path repl_ctx and vol_ptr wont be available.
auto lg = std::shared_lock(vol_lock_);
auto it = vol_map_.find(msg_header->volume_id);
RELEASE_ASSERT(it != vol_map_.end(), "Didnt find volume {}", boost::uuids::to_string(msg_header->volume_id));
vol_ptr = it->second;

// During log recovery overwrite new blkid and checksum to index.
std::unordered_map< lba_t, BlockInfo > blocks_info;
lba_t start_lba = journal_entry->start_lba;
for (auto& blkid : new_blkids) {
for (uint32_t i = 0; i < blkid.blk_count(); i++) {
auto new_bid = BlkId{blkid.blk_num() + i, 1 /* nblks */, blkid.chunk_num()};
auto csum = *r_cast< const homestore::csum_t* >(key_buffer);
blocks_info.emplace(start_lba + i, BlockInfo{new_bid, BlkId{}, csum});
key_buffer += sizeof(homestore::csum_t);
}

// We ignore the existing values we got in blocks_info from index as it will be
// same checksum, blkid we see in the journal entry.
lba_t end_lba = start_lba + blkid.blk_count() - 1;
auto status = write_to_index(vol_ptr, start_lba, end_lba, blocks_info);
RELEASE_ASSERT(status, "Index error during recovery");
start_lba = end_lba + 1;
}
} else {
// Avoid expensive lock during normal write flow.
vol_ptr = repl_ctx->vol_ptr_;
key_buffer += (journal_entry->nlbas * sizeof(homestore::csum_t));
}

// Free all the old blkids.
for (auto& blkid : blkids) {
vol_ptr->rd()->async_free_blks(lsn, blkid);
// Free all the old blkids. This happens for both normal writes
// and crash recovery.
for (uint32_t i = 0; i < journal_entry->num_old_blks; i++) {
BlkId old_blkid = *r_cast< const BlkId* >(key_buffer);
vol_ptr->rd()->async_free_blks(lsn, old_blkid);
key_buffer += sizeof(BlkId);
}

if (repl_ctx) { repl_ctx->promise_.setValue(folly::Unit()); }
Expand Down
Loading