Skip to content

Commit 5b859da

Browse files
committed
Add recovery changes for write path.
Add recovery changes for write path. Support non overlapping io ranges in UT. Add to index in the recovery path.
1 parent ca1d0fa commit 5b859da

File tree

4 files changed

+135
-39
lines changed

4 files changed

+135
-39
lines changed

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomeBlocksConan(ConanFile):
1111
name = "homeblocks"
12-
version = "1.0.16"
12+
version = "1.0.17"
1313
homepage = "https://github.com/eBay/HomeBlocks"
1414
description = "Block Store built on HomeStore"
1515
topics = ("ebay")

src/lib/volume/index.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,6 @@ class VolumeIndexValue : public homestore::BtreeIntervalValue {
249249
// Get the next blk num and checksum
250250
m_blkid_suffix += n;
251251
auto curr_lba = ctx->start_lba + n;
252-
LOGINFO("shift n={} blk_num={} curr_lba={}", n, m_blkid_suffix, curr_lba);
253252
DEBUG_ASSERT(ctx->block_info->find(curr_lba) != ctx->block_info->end(), "Invalid index");
254253
m_checksum = (*ctx->block_info)[curr_lba].checksum;
255254
}

src/lib/volume/tests/test_volume_io.cpp

Lines changed: 95 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include <string>
1717

18+
#include <boost/icl/interval_set.hpp>
1819
#include <folly/init/Init.h>
1920
#include <gtest/gtest.h>
2021
#include <sisl/options/options.h>
@@ -56,6 +57,7 @@ class VolumeIOImpl {
5657
public:
5758
void create_volume() {
5859
auto vinfo = gen_vol_info(m_volume_id_++);
60+
m_vol_name = vinfo.name;
5961
m_vol_id = vinfo.id;
6062

6163
auto vol_mgr = g_helper->inst()->volume_manager();
@@ -78,13 +80,39 @@ class VolumeIOImpl {
7880
ASSERT_TRUE(m_vol_ptr != nullptr);
7981
}
8082

83+
void get_random_non_overlapping_lba(lba_t& start_lba, uint32_t& nblks, uint64_t max_blks) {
84+
if (start_lba != 0 && nblks != 0) {
85+
lba_t end_lba = start_lba + nblks - 1;
86+
auto new_range = boost::icl::interval< int >::closed(start_lba, end_lba);
87+
// For user provided lba and nblks, check if they are already in flight.
88+
std::lock_guard lock(m_mutex);
89+
ASSERT_TRUE(m_inflight_ios.find(new_range) == m_inflight_ios.end());
90+
m_inflight_ios.insert(new_range);
91+
return;
92+
}
93+
94+
do {
95+
// Generate lba which are not overlapped with the inflight ios, otherwise
96+
// we cant decide which io completed last and cant verify the data.
97+
start_lba = rand() % max_blks;
98+
nblks = std::max(1, rand() % 64);
99+
lba_t end_lba = start_lba + nblks - 1;
100+
auto new_range = boost::icl::interval< int >::closed(start_lba, end_lba);
101+
std::lock_guard lock(m_mutex);
102+
if (m_inflight_ios.find(new_range) == m_inflight_ios.end()) {
103+
m_inflight_ios.insert(new_range);
104+
break;
105+
}
106+
107+
} while (true);
108+
}
109+
81110
auto build_random_data(lba_t& start_lba, uint32_t& nblks) {
82111
// Write upto 1-64 nblks * 4k = 256k size.
83112
auto info = m_vol_ptr->info();
84113
uint64_t page_size = info->page_size;
85114
uint64_t max_blks = info->size_bytes / page_size;
86-
start_lba = start_lba == 0 ? rand() % max_blks : start_lba;
87-
nblks = nblks == 0 ? std::max(1, rand() % 64) : nblks;
115+
get_random_non_overlapping_lba(start_lba, nblks, max_blks);
88116
nblks = std::min(static_cast< uint64_t >(nblks), max_blks - static_cast< uint64_t >(start_lba));
89117

90118
auto data_size = nblks * page_size;
@@ -94,9 +122,13 @@ class VolumeIOImpl {
94122
uint64_t data_pattern = ((long long)rand() << 32) | rand();
95123
test_common::HBTestHelper::fill_data_buf(data_bytes, page_size, data_pattern);
96124
data_bytes += page_size;
97-
// Store the lba to pattern mapping
98-
m_lba_data[lba] = data_pattern;
99-
LOGINFO("Generate data lba={} pattern={}", lba, data_pattern);
125+
{
126+
// Store the lba to pattern mapping
127+
std::lock_guard lock(m_mutex);
128+
m_lba_data[lba] = data_pattern;
129+
}
130+
131+
LOGDEBUG("Generate data vol={} lba={} pattern={}", m_vol_name, lba, data_pattern);
100132
lba++;
101133
}
102134

@@ -112,8 +144,13 @@ class VolumeIOImpl {
112144
auto vol_mgr = g_helper->inst()->volume_manager();
113145
vol_mgr->write(m_vol_ptr, req)
114146
.via(&folly::InlineExecutor::instance())
115-
.thenValue([this, data, &waiter](auto&& result) {
147+
.thenValue([this, data, req, &waiter](auto&& result) {
116148
ASSERT_FALSE(result.hasError());
149+
{
150+
std::lock_guard lock(m_mutex);
151+
m_inflight_ios.erase(boost::icl::interval< int >::closed(req->lba, req->lba + req->nlbas - 1));
152+
}
153+
117154
waiter.one_complete();
118155
});
119156
});
@@ -125,22 +162,28 @@ class VolumeIOImpl {
125162
auto data = build_random_data(start_lba, nblks);
126163
vol_interface_req_ptr req(new vol_interface_req{data->bytes(), start_lba, nblks});
127164
auto vol_mgr = g_helper->inst()->volume_manager();
128-
vol_mgr->write(m_vol_ptr, req).via(&folly::InlineExecutor::instance()).thenValue([this, data](auto&& result) {
129-
ASSERT_FALSE(result.hasError());
130-
g_helper->runner().next_task();
131-
});
165+
vol_mgr->write(m_vol_ptr, req)
166+
.via(&folly::InlineExecutor::instance())
167+
.thenValue([this, req, data](auto&& result) {
168+
ASSERT_FALSE(result.hasError());
169+
{
170+
std::lock_guard lock(m_mutex);
171+
m_inflight_ios.erase(boost::icl::interval< int >::closed(req->lba, req->lba + req->nlbas - 1));
172+
}
173+
g_helper->runner().next_task();
174+
});
132175
}
133176

134-
void verify_data() {
177+
void verify_all_data() {
135178
for (auto& [lba, data_pattern] : m_lba_data) {
136179
auto buffer = iomanager.iobuf_alloc(512, 4096);
137180
vol_interface_req_ptr req(new vol_interface_req{buffer, lba, 1});
138181

139182
auto vol_mgr = g_helper->inst()->volume_manager();
140183
vol_mgr->read(m_vol_ptr, req).get();
141-
// LOGINFO("Data read={}", fmt::format("{}", spdlog::to_hex((buffer), (buffer) + (128))));
142184
test_common::HBTestHelper::validate_data_buf(buffer, 4096, data_pattern);
143-
LOGINFO("Verify data lba={} pattern={} {}", lba, data_pattern, *r_cast< uint64_t* >(buffer));
185+
LOGDEBUG("Verify data vol={} lba={} pattern={} {}", m_vol_name, lba, data_pattern,
186+
*r_cast< uint64_t* >(buffer));
144187
iomanager.iobuf_free(buffer);
145188
}
146189
}
@@ -160,10 +203,14 @@ class VolumeIOImpl {
160203
#ifdef _PRERELEASE
161204
flip::FlipClient m_fc{iomgr_flip::instance()};
162205
#endif
206+
std::mutex m_mutex;
207+
std::string m_vol_name;
163208
VolumePtr m_vol_ptr;
164209
volume_id_t m_vol_id;
165210
static inline uint32_t m_volume_id_{1};
211+
// Mapping from lba to data patttern.
166212
std::map< lba_t, uint64_t > m_lba_data;
213+
boost::icl::interval_set< int > m_inflight_ios;
167214
};
168215

169216
class VolumeIOTest : public ::testing::Test {
@@ -199,14 +246,21 @@ class VolumeIOTest : public ::testing::Test {
199246
LOGINFO("IO completed");
200247
}
201248

202-
void verify_data(shared< VolumeIOImpl > vol_impl = nullptr) {
249+
void verify_all_data(shared< VolumeIOImpl > vol_impl = nullptr) {
203250
if (vol_impl) {
204-
vol_impl->verify_data();
251+
vol_impl->verify_all_data();
205252
return;
206253
}
207254

208255
for (auto& vol_impl : m_vols_impl) {
209-
vol_impl->verify_data();
256+
vol_impl->verify_all_data();
257+
}
258+
}
259+
260+
void restart(int shutdown_delay) {
261+
g_helper->restart(shutdown_delay);
262+
for (auto& vol_impl : m_vols_impl) {
263+
vol_impl->reset();
210264
}
211265
}
212266

@@ -225,23 +279,22 @@ TEST_F(VolumeIOTest, SingleVolumeWriteData) {
225279
LOGINFO("Write and verify data with num_iter={} start={} nblks={}", num_iter, start_lba, nblks);
226280
for (uint32_t i = 0; i < num_iter; i++) {
227281
generate_io_single(vol, start_lba, nblks);
228-
verify_data(vol);
282+
verify_all_data(vol);
229283
}
230284

231285
// Verify data after restart.
232-
g_helper->restart(10);
233-
vol->reset();
286+
restart(5);
234287

235288
LOGINFO("Verify data");
236-
verify_data(vol);
289+
verify_all_data(vol);
237290

238291
// Write and verify again on same LBA range to single volume multiple times.
239292
LOGINFO("Write and verify data with num_iter={} start={} nblks={}", num_iter, start_lba, nblks);
240293
for (uint32_t i = 0; i < num_iter; i++) {
241294
generate_io_single(vol, start_lba, nblks);
242295
}
243296

244-
verify_data(vol);
297+
verify_all_data(vol);
245298
LOGINFO("SingleVolumeWriteData test done.");
246299
}
247300

@@ -251,24 +304,38 @@ TEST_F(VolumeIOTest, MultipleVolumeWriteData) {
251304
generate_io();
252305

253306
LOGINFO("Verify data");
254-
verify_data();
307+
verify_all_data();
308+
309+
restart(5);
310+
311+
LOGINFO("Verify data again");
312+
verify_all_data();
313+
314+
LOGINFO("Write data randomly");
315+
generate_io();
316+
317+
LOGINFO("Verify data");
318+
verify_all_data();
319+
255320
LOGINFO("MultipleVolumeWriteData test done.");
256321
}
257322

258323
int main(int argc, char* argv[]) {
259324
int parsed_argc = argc;
260-
::testing::InitGoogleTest(&parsed_argc, argv);
261-
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_common_setup, test_volume_io_setup, homeblocks);
262-
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");
263-
parsed_argc = 1;
264-
auto f = ::folly::Init(&parsed_argc, &argv, true);
325+
char** orig_argv = argv;
265326

266327
std::vector< std::string > args;
267328
for (int i = 0; i < argc; ++i) {
268329
args.emplace_back(argv[i]);
269330
}
270331

271-
g_helper = std::make_unique< test_common::HBTestHelper >("test_volume_io", args, argv);
332+
::testing::InitGoogleTest(&parsed_argc, argv);
333+
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_common_setup, test_volume_io_setup, homeblocks);
334+
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");
335+
parsed_argc = 1;
336+
auto f = ::folly::Init(&parsed_argc, &argv, true);
337+
338+
g_helper = std::make_unique< test_common::HBTestHelper >("test_volume_io", args, orig_argv);
272339
g_helper->setup();
273340
auto ret = RUN_ALL_TESTS();
274341
g_helper->teardown();

src/lib/volume_mgr.cpp

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -260,27 +260,57 @@ VolumeManager::NullAsyncResult HomeBlocksImpl::unmap(const VolumePtr& vol, const
260260
void HomeBlocksImpl::submit_io_batch() { RELEASE_ASSERT(false, "submit_io_batch Not implemented"); }
261261

262262
void HomeBlocksImpl::on_write(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
263-
const std::vector< homestore::MultiBlkId >& blkids,
263+
const std::vector< homestore::MultiBlkId >& new_blkids,
264264
cintrusive< homestore::repl_req_ctx >& ctx) {
265265
repl_result_ctx< VolumeManager::NullResult >* repl_ctx{nullptr};
266266
if (ctx) { repl_ctx = boost::static_pointer_cast< repl_result_ctx< VolumeManager::NullResult > >(ctx).get(); }
267267
auto msg_header = r_cast< HomeBlksMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));
268268

269-
// Avoid expensive lock during normal write flow.
269+
// Key contains the list of checksums and old blkids. Before we ack the client
270+
// request, we free the old blkid's. Also if its recovery we overwrite the index
271+
// with checksum and new blkid's. We need to overwrite index during recovery as all the
272+
// index writes may not be flushed to disk during crash.
270273
VolumePtr vol_ptr{nullptr};
271-
if (repl_ctx) {
272-
vol_ptr = repl_ctx->vol_ptr_;
273-
} else {
274-
// For recovery path there wont repl_ctx and vol_ptr.
274+
auto journal_entry = r_cast< const VolJournalEntry* >(key.cbytes());
275+
auto key_buffer = r_cast< const uint8_t* >(journal_entry + 1);
276+
277+
if (repl_ctx == nullptr) {
278+
// For recovery path repl_ctx and vol_ptr wont be available.
275279
auto lg = std::shared_lock(vol_lock_);
276280
auto it = vol_map_.find(msg_header->volume_id);
277281
RELEASE_ASSERT(it != vol_map_.end(), "Didnt find volume {}", boost::uuids::to_string(msg_header->volume_id));
278282
vol_ptr = it->second;
283+
284+
// During log recovery overwrite new blkid and checksum to index.
285+
std::unordered_map< lba_t, BlockInfo > blocks_info;
286+
lba_t start_lba = journal_entry->start_lba;
287+
for (auto& blkid : new_blkids) {
288+
for (uint32_t i = 0; i < blkid.blk_count(); i++) {
289+
auto new_bid = BlkId{blkid.blk_num() + i, 1 /* nblks */, blkid.chunk_num()};
290+
auto csum = *r_cast< const homestore::csum_t* >(key_buffer);
291+
blocks_info.emplace(start_lba + i, BlockInfo{new_bid, BlkId{}, csum});
292+
key_buffer += sizeof(homestore::csum_t);
293+
}
294+
295+
// We ignore the existing values we got in blocks_info from index as it will be
296+
// same checksum, blkid we see in the journal entry.
297+
lba_t end_lba = start_lba + blkid.blk_count() - 1;
298+
auto status = write_to_index(vol_ptr, start_lba, end_lba, blocks_info);
299+
RELEASE_ASSERT(status, "Index error during recovery");
300+
start_lba = end_lba + 1;
301+
}
302+
} else {
303+
// Avoid expensive lock during normal write flow.
304+
vol_ptr = repl_ctx->vol_ptr_;
305+
key_buffer += (journal_entry->nlbas * sizeof(homestore::csum_t));
279306
}
280307

281-
// Free all the old blkids.
282-
for (auto& blkid : blkids) {
283-
vol_ptr->rd()->async_free_blks(lsn, blkid);
308+
// Free all the old blkids. This happens for both normal writes
309+
// and crash recovery.
310+
for (uint32_t i = 0; i < journal_entry->num_old_blks; i++) {
311+
BlkId old_blkid = *r_cast< const BlkId* >(key_buffer);
312+
vol_ptr->rd()->async_free_blks(lsn, old_blkid);
313+
key_buffer += sizeof(BlkId);
284314
}
285315

286316
if (repl_ctx) { repl_ctx->promise_.setValue(folly::Unit()); }

0 commit comments

Comments
 (0)