Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeBlocksConan(ConanFile):
name = "homeblocks"
version = "1.0.22"
version = "1.0.24"
homepage = "https://github.com/eBay/HomeBlocks"
description = "Block Store built on HomeStore"
topics = ("ebay")
Expand Down
52 changes: 52 additions & 0 deletions src/lib/homeblks_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ extern std::shared_ptr< HomeBlocks > init_homeblocks(std::weak_ptr< HomeBlocksAp
auto inst = std::make_shared< HomeBlocksImpl >(std::move(application));
inst->init_homestore();
inst->init_cp();
inst->start_reaper_thread();
return inst;
}

Expand All @@ -57,6 +58,13 @@ HomeBlocksStats HomeBlocksImpl::get_stats() const {
}

HomeBlocksImpl::~HomeBlocksImpl() {
LOGI("Shutting down HomeBlocksImpl");

if (vol_gc_timer_hdl_ != iomgr::null_timer_handle) {
iomanager.cancel_timer(vol_gc_timer_hdl_);
vol_gc_timer_hdl_ = iomgr::null_timer_handle;
}

homestore::hs()->shutdown();
homestore::HomeStore::reset_instance();
iomanager.stop();
Expand Down Expand Up @@ -299,4 +307,48 @@ void HomeBlocksImpl::on_init_complete() {
}

void HomeBlocksImpl::init_cp() {}

uint64_t HomeBlocksImpl::gc_timer_secs() const {
if (SISL_OPTIONS.count("gc_timer_secs")) {
auto const n = SISL_OPTIONS["gc_timer_secs"].as< uint32_t >();
LOGINFO("Using gc_timer_secs option value: {}", n);
return n;
} else {
return HB_DYNAMIC_CONFIG(reaper_thread_timer_secs);
}
}

void HomeBlocksImpl::start_reaper_thread() {
auto const nsecs = gc_timer_secs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: nsecs/secs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

LOGI("Starting volume garbage collection timer with interval: {} seconds", nsecs);
vol_gc_timer_hdl_ = iomanager.schedule_global_timer(
nsecs * 1000 * 1000 * 1000, true /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_user,
[this](void*) { this->vol_gc(); }, true /* wait_to_schedule */);
}

void HomeBlocksImpl::vol_gc() {
LOGI("Running volume garbage collection");
// loop through every volume and call remove volume if volume's ref_cnt is zero;
std::vector< VolumePtr > vols_to_remove;
{
auto lg = std::shared_lock(vol_lock_);
for (auto& vol_pair : vol_map_) {
auto& vol = vol_pair.second;
LOGI("Checking volume with id: {}, is_destroying: {}, can_remove: {}, num_outstanding_reqs: {}",
vol->id_str(), vol->is_destroying(), vol->can_remove(), vol->num_outstanding_reqs());

if (vol->is_destroying() && vol->can_remove()) {
// 1. volume has been issued with removed command before
// 2. no one has already started removing it
// 3. volume is not in use anymore (ref_cnt == 0)
vols_to_remove.push_back(vol);
}
}
}

for (auto& vol : vols_to_remove) {
LOGI("Garbage Collecting removed volume with id: {}", vol->id_str());
remove_volume(vol->id());
}
}
} // namespace homeblocks
18 changes: 16 additions & 2 deletions src/lib/homeblks_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class HomeBlocksImpl : public HomeBlocks, public VolumeManager, public std::enab
std::weak_ptr< HomeBlocksApplication > _application;
folly::Executor::KeepAlive<> executor_;

///
/// Volume management
mutable std::shared_mutex vol_lock_;
std::map< volume_id_t, VolumePtr > vol_map_;

Expand All @@ -72,6 +72,9 @@ class HomeBlocksImpl : public HomeBlocks, public VolumeManager, public std::enab
superblk< homeblks_sb_t > sb_;
peer_id_t our_uuid_;

iomgr::io_fiber_t reaper_fiber_;
iomgr::timer_handle_t vol_gc_timer_hdl_{iomgr::null_timer_handle};

public:
explicit HomeBlocksImpl(std::weak_ptr< HomeBlocksApplication >&& application);

Expand Down Expand Up @@ -127,7 +130,7 @@ class HomeBlocksImpl : public HomeBlocks, public VolumeManager, public std::enab
void on_write(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const std::vector< homestore::MultiBlkId >& blkids, cintrusive< homestore::repl_req_ctx >& ctx);

// VolumeManager::Result< folly::Unit > verify_checksum(vol_read_ctx const& read_ctx);
void start_reaper_thread();

private:
// Should only be called for first-time-boot
Expand All @@ -143,6 +146,17 @@ class HomeBlocksImpl : public HomeBlocks, public VolumeManager, public std::enab
// recovery apis
void on_hb_meta_blk_found(sisl::byte_view const& buf, void* cookie);
void on_vol_meta_blk_found(sisl::byte_view const& buf, void* cookie);

void vol_gc();

uint64_t gc_timer_secs() const;

#ifdef _PRERELEASE
// For testing purpose only
// If delay flip is not set, false will be returned;
// If delay flip is set, it will delay the IOs for a given VolumePtr
bool delay_fake_io(VolumePtr vol);
#endif
};

class HBIndexSvcCB : public homestore::IndexServiceCallbacks {
Expand Down
4 changes: 2 additions & 2 deletions src/lib/volume/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ target_link_libraries(test_volume_io
-rdynamic
)

add_test(NAME VolumeTest COMMAND test_volume)
add_test(NAME VolumeIOTest COMMAND test_volume_io)
add_test(NAME VolumeTest COMMAND test_volume --gc_timer_secs=3)
add_test(NAME VolumeIOTest COMMAND test_volume_io)
31 changes: 30 additions & 1 deletion src/lib/volume/tests/test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,31 @@ class HBTestHelper {
}
}

#ifdef _PRERELEASE
void set_flip_point(const std::string flip_name) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
freq.set_count(2);
freq.set_percent(100);
m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq);
LOGI("Flip {} set", flip_name);
}

void set_delay_flip(const std::string flip_name, uint64_t delay_usec, uint32_t count = 1, uint32_t percent = 100) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
freq.set_count(count);
freq.set_percent(percent);
m_fc.inject_delay_flip(flip_name, {null_cond}, freq, delay_usec);
LOGDEBUG("Flip {} set", flip_name);
}

void remove_flip(const std::string flip_name) {
m_fc.remove_flip(flip_name);
LOGDEBUG("Flip {} removed", flip_name);
}
#endif

private:
void init_devices(bool is_file, uint64_t dev_size = 0) {
if (is_file) {
Expand Down Expand Up @@ -275,6 +300,10 @@ class HBTestHelper {
peer_id_t svc_id_;
Runner io_runner_;
Waiter waiter_;

#ifdef _PRERELEASE
flip::FlipClient m_fc{iomgr_flip::instance()};
#endif
};

} // namespace test_common
} // namespace test_common
82 changes: 62 additions & 20 deletions src/lib/volume/tests/test_volume.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@

SISL_LOGGING_INIT(HOMEBLOCKS_LOG_MODS)
SISL_OPTION_GROUP(test_volume_setup,
(num_vols, "", "num_vols", "number of volumes", ::cxxopts::value< uint32_t >()->default_value("2"),
"number"));
(num_vols, "", "num_vols", "number of volumes", ::cxxopts::value< uint32_t >()->default_value("2"),
"number"),
(gc_timer_secs, "", "gc_timer_secs", "gc timer in seconds",
::cxxopts::value< uint32_t >()->default_value("5"), "seconds"));

SISL_OPTIONS_ENABLE(logging, test_common_setup, test_volume_setup, homeblocks)
SISL_LOGGING_DECL(test_volume)

Expand All @@ -47,23 +50,63 @@ class VolumeTest : public ::testing::Test {
vol_info.id = hb_utils::gen_random_uuid();
return vol_info;
}
};

#ifdef _PRERELEASE
void set_flip_point(const std::string flip_name) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
freq.set_count(2);
freq.set_percent(100);
m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq);
LOGI("Flip {} set", flip_name);
TEST_F(VolumeTest, CreateDestroyVolumeWithOutstandingIO) {
std::vector< volume_id_t > vol_ids;
{
auto hb = g_helper->inst();
auto vol_mgr = hb->volume_manager();

uint32_t delay_sec = 6;
g_helper->set_delay_flip("vol_fake_io_delay_simulation", delay_sec * 1000 * 1000 /*delay_usec*/, 2, 100);

auto num_vols = 1ul;

for (uint32_t i = 0; i < num_vols; ++i) {
auto vinfo = gen_vol_info(i);
auto id = vinfo.id;
vol_ids.emplace_back(id);
auto ret = vol_mgr->create_volume(std::move(vinfo)).get();
ASSERT_TRUE(ret);

auto vol_ptr = vol_mgr->lookup_volume(id);
// verify the volume is there
ASSERT_TRUE(vol_ptr != nullptr);

// fake a write that will be delayed;
vol_mgr->write(vol_ptr, nullptr);

// fake a read that will be delayed;
vol_mgr->read(vol_ptr, nullptr);
}

auto const s = hb->get_stats();
auto const dtype = hb->data_drive_type();
LOGINFO("Stats: {}, drive_type: {}", s.to_string(), dtype);

for (uint32_t i = 0; i < num_vols; ++i) {
auto id = vol_ids[i];
auto ret = vol_mgr->remove_volume(id).get();
ASSERT_TRUE(ret);
while (true) {
auto delay_secs = 1;
LOGINFO("Remove Volume {} triggered, waiting for {} seconds for IO to complete",
boost::uuids::to_string(id), delay_secs);
// sleep for a while
std::this_thread::sleep_for(std::chrono::milliseconds(delay_secs * 1000));
auto vol_ptr = vol_mgr->lookup_volume(id);
if (!vol_ptr) { break; }
}
}
}
#endif

private:
#ifdef _PRERELEASE
flip::FlipClient m_fc{iomgr_flip::instance()};
g_helper->remove_flip("vol_fake_io_delay_simulation");

g_helper->restart(2);
}
#endif
};

TEST_F(VolumeTest, CreateDestroyVolume) {
std::vector< volume_id_t > vol_ids;
Expand Down Expand Up @@ -101,7 +144,7 @@ TEST_F(VolumeTest, CreateDestroyVolume) {
}
}

g_helper->restart(5);
g_helper->restart(2);
}

TEST_F(VolumeTest, CreateVolumeThenRecover) {
Expand All @@ -125,7 +168,7 @@ TEST_F(VolumeTest, CreateVolumeThenRecover) {
}
}

g_helper->restart(5);
g_helper->restart(2);

// verify the volumes are still there
{
Expand All @@ -141,9 +184,8 @@ TEST_F(VolumeTest, CreateVolumeThenRecover) {
}

TEST_F(VolumeTest, DestroyVolumeCrashRecovery) {

#ifdef _PRERELEASE
set_flip_point("vol_destroy_crash_simulation");
g_helper->set_flip_point("vol_destroy_crash_simulation");
#endif
std::vector< volume_id_t > vol_ids;
{
Expand Down Expand Up @@ -171,7 +213,7 @@ TEST_F(VolumeTest, DestroyVolumeCrashRecovery) {
}
}

g_helper->restart(5);
g_helper->restart(2);
}

int main(int argc, char* argv[]) {
Expand All @@ -193,4 +235,4 @@ int main(int argc, char* argv[]) {
g_helper->teardown();

return ret;
}
}
7 changes: 3 additions & 4 deletions src/lib/volume/volume.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ bool Volume::init(bool is_recovery) {
}

void Volume::destroy() {
// 0. Set destroying state in superblock;
state_change(vol_state::DESTROYING);
LOGI("Start destroying volume: {}, uuid: {}", vol_info_->name, boost::uuids::to_string(id()));
destroy_started_ = true;

// 1. destroy the repl dev;
if (rd_) {
LOGI("Destroying repl dev for volume: {}, uuid: {}", vol_info_->name, boost::uuids::to_string(id()));
LOGI("Destroying repl dev for volume: {}", vol_info_->name);
homestore::hs()->repl_service().remove_repl_dev(id()).get();
rd_ = nullptr;
}
Expand Down Expand Up @@ -232,7 +232,6 @@ VolumeManager::NullAsyncResult Volume::write(const vol_interface_req_ptr& vol_re

VolumeManager::Result< folly::Unit > Volume::write_to_index(lba_t start_lba, lba_t end_lba,
std::unordered_map< lba_t, BlockInfo >& blocks_info) {

// Use filter callback to get the old blkid.
homestore::put_filter_cb_t filter_cb = [&blocks_info](BtreeKey const& key, BtreeValue const& existing_value,
BtreeValue const& value) {
Expand Down
18 changes: 14 additions & 4 deletions src/lib/volume/volume.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ class Volume : public std::enable_shared_from_this< Volume > {

VolumeManager::NullAsyncResult read(const vol_interface_req_ptr& req);

bool can_remove() { return !destroy_started_ && outstanding_reqs_.test_eq(0); }

void inc_ref(uint64_t n = 1) { outstanding_reqs_.increment(n); }
void dec_ref(uint64_t n = 1) { outstanding_reqs_.decrement(n); }
uint64_t num_outstanding_reqs() const { return outstanding_reqs_.get(); }

private:
//
// this API will be called to initialize volume in both volume creation and volume recovery;
Expand All @@ -178,10 +184,14 @@ class Volume : public std::enable_shared_from_this< Volume > {
VolumeManager::Result< folly::Unit > read_from_index(const vol_interface_req_ptr& req, index_kv_list_t& index_kvs);

private:
VolumeInfoPtr vol_info_;
ReplDevPtr rd_;
VolIdxTablePtr indx_tbl_;
superblk< vol_sb_t > sb_;
VolumeInfoPtr vol_info_; // volume info
ReplDevPtr rd_; // replication device for this volume, which provides read/write APIs to the volume;
VolIdxTablePtr indx_tbl_; // index table for this volume
superblk< vol_sb_t > sb_; // meta data of the volume

sisl::atomic_counter< uint64_t > outstanding_reqs_{0}; // number of outstanding requests
bool destroy_started_{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use enum instead vol_state::destroying ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is actually different states, so when we are rebooting, the state will be vol_state::destroying, but destroy has never been kicked off yet after reboot, that is why I introduced this field.

false}; // indicates if volume destroy has started, avoid destroy to be executed more than once.
};

struct vol_repl_ctx : public homestore::repl_req_ctx {
Expand Down
Loading
Loading