Skip to content

Commit 4628b6a

Browse files
authored
Merge pull request #95 from sanebay/recovery_test
Add recovery changes for write path.
2 parents 3297adb + 5b859da commit 4628b6a

File tree

4 files changed

+135
-39
lines changed

4 files changed

+135
-39
lines changed

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomeBlocksConan(ConanFile):
1111
name = "homeblocks"
12-
version = "1.0.16"
12+
version = "1.0.17"
1313
homepage = "https://github.com/eBay/HomeBlocks"
1414
description = "Block Store built on HomeStore"
1515
topics = ("ebay")

src/lib/volume/index.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,6 @@ class VolumeIndexValue : public homestore::BtreeIntervalValue {
249249
// Get the next blk num and checksum
250250
m_blkid_suffix += n;
251251
auto curr_lba = ctx->start_lba + n;
252-
LOGINFO("shift n={} blk_num={} curr_lba={}", n, m_blkid_suffix, curr_lba);
253252
DEBUG_ASSERT(ctx->block_info->find(curr_lba) != ctx->block_info->end(), "Invalid index");
254253
m_checksum = (*ctx->block_info)[curr_lba].checksum;
255254
}

src/lib/volume/tests/test_volume_io.cpp

Lines changed: 95 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include <string>
1717

18+
#include <boost/icl/interval_set.hpp>
1819
#include <folly/init/Init.h>
1920
#include <gtest/gtest.h>
2021
#include <sisl/options/options.h>
@@ -56,6 +57,7 @@ class VolumeIOImpl {
5657
public:
5758
void create_volume() {
5859
auto vinfo = gen_vol_info(m_volume_id_++);
60+
m_vol_name = vinfo.name;
5961
m_vol_id = vinfo.id;
6062

6163
auto vol_mgr = g_helper->inst()->volume_manager();
@@ -78,13 +80,39 @@ class VolumeIOImpl {
7880
ASSERT_TRUE(m_vol_ptr != nullptr);
7981
}
8082

83+
void get_random_non_overlapping_lba(lba_t& start_lba, uint32_t& nblks, uint64_t max_blks) {
84+
if (start_lba != 0 && nblks != 0) {
85+
lba_t end_lba = start_lba + nblks - 1;
86+
auto new_range = boost::icl::interval< int >::closed(start_lba, end_lba);
87+
// For user provided lba and nblks, check if they are already in flight.
88+
std::lock_guard lock(m_mutex);
89+
ASSERT_TRUE(m_inflight_ios.find(new_range) == m_inflight_ios.end());
90+
m_inflight_ios.insert(new_range);
91+
return;
92+
}
93+
94+
do {
95+
// Generate lba which are not overlapped with the inflight ios, otherwise
96+
// we cant decide which io completed last and cant verify the data.
97+
start_lba = rand() % max_blks;
98+
nblks = std::max(1, rand() % 64);
99+
lba_t end_lba = start_lba + nblks - 1;
100+
auto new_range = boost::icl::interval< int >::closed(start_lba, end_lba);
101+
std::lock_guard lock(m_mutex);
102+
if (m_inflight_ios.find(new_range) == m_inflight_ios.end()) {
103+
m_inflight_ios.insert(new_range);
104+
break;
105+
}
106+
107+
} while (true);
108+
}
109+
81110
auto build_random_data(lba_t& start_lba, uint32_t& nblks) {
82111
// Write upto 1-64 nblks * 4k = 256k size.
83112
auto info = m_vol_ptr->info();
84113
uint64_t page_size = info->page_size;
85114
uint64_t max_blks = info->size_bytes / page_size;
86-
start_lba = start_lba == 0 ? rand() % max_blks : start_lba;
87-
nblks = nblks == 0 ? std::max(1, rand() % 64) : nblks;
115+
get_random_non_overlapping_lba(start_lba, nblks, max_blks);
88116
nblks = std::min(static_cast< uint64_t >(nblks), max_blks - static_cast< uint64_t >(start_lba));
89117

90118
auto data_size = nblks * page_size;
@@ -94,9 +122,13 @@ class VolumeIOImpl {
94122
uint64_t data_pattern = ((long long)rand() << 32) | rand();
95123
test_common::HBTestHelper::fill_data_buf(data_bytes, page_size, data_pattern);
96124
data_bytes += page_size;
97-
// Store the lba to pattern mapping
98-
m_lba_data[lba] = data_pattern;
99-
LOGINFO("Generate data lba={} pattern={}", lba, data_pattern);
125+
{
126+
// Store the lba to pattern mapping
127+
std::lock_guard lock(m_mutex);
128+
m_lba_data[lba] = data_pattern;
129+
}
130+
131+
LOGDEBUG("Generate data vol={} lba={} pattern={}", m_vol_name, lba, data_pattern);
100132
lba++;
101133
}
102134

@@ -112,8 +144,13 @@ class VolumeIOImpl {
112144
auto vol_mgr = g_helper->inst()->volume_manager();
113145
vol_mgr->write(m_vol_ptr, req)
114146
.via(&folly::InlineExecutor::instance())
115-
.thenValue([this, data, &waiter](auto&& result) {
147+
.thenValue([this, data, req, &waiter](auto&& result) {
116148
ASSERT_FALSE(result.hasError());
149+
{
150+
std::lock_guard lock(m_mutex);
151+
m_inflight_ios.erase(boost::icl::interval< int >::closed(req->lba, req->lba + req->nlbas - 1));
152+
}
153+
117154
waiter.one_complete();
118155
});
119156
});
@@ -125,22 +162,28 @@ class VolumeIOImpl {
125162
auto data = build_random_data(start_lba, nblks);
126163
vol_interface_req_ptr req(new vol_interface_req{data->bytes(), start_lba, nblks});
127164
auto vol_mgr = g_helper->inst()->volume_manager();
128-
vol_mgr->write(m_vol_ptr, req).via(&folly::InlineExecutor::instance()).thenValue([this, data](auto&& result) {
129-
ASSERT_FALSE(result.hasError());
130-
g_helper->runner().next_task();
131-
});
165+
vol_mgr->write(m_vol_ptr, req)
166+
.via(&folly::InlineExecutor::instance())
167+
.thenValue([this, req, data](auto&& result) {
168+
ASSERT_FALSE(result.hasError());
169+
{
170+
std::lock_guard lock(m_mutex);
171+
m_inflight_ios.erase(boost::icl::interval< int >::closed(req->lba, req->lba + req->nlbas - 1));
172+
}
173+
g_helper->runner().next_task();
174+
});
132175
}
133176

134-
void verify_data() {
177+
void verify_all_data() {
135178
for (auto& [lba, data_pattern] : m_lba_data) {
136179
auto buffer = iomanager.iobuf_alloc(512, 4096);
137180
vol_interface_req_ptr req(new vol_interface_req{buffer, lba, 1});
138181

139182
auto vol_mgr = g_helper->inst()->volume_manager();
140183
vol_mgr->read(m_vol_ptr, req).get();
141-
// LOGINFO("Data read={}", fmt::format("{}", spdlog::to_hex((buffer), (buffer) + (128))));
142184
test_common::HBTestHelper::validate_data_buf(buffer, 4096, data_pattern);
143-
LOGINFO("Verify data lba={} pattern={} {}", lba, data_pattern, *r_cast< uint64_t* >(buffer));
185+
LOGDEBUG("Verify data vol={} lba={} pattern={} {}", m_vol_name, lba, data_pattern,
186+
*r_cast< uint64_t* >(buffer));
144187
iomanager.iobuf_free(buffer);
145188
}
146189
}
@@ -160,10 +203,14 @@ class VolumeIOImpl {
160203
#ifdef _PRERELEASE
161204
flip::FlipClient m_fc{iomgr_flip::instance()};
162205
#endif
206+
std::mutex m_mutex;
207+
std::string m_vol_name;
163208
VolumePtr m_vol_ptr;
164209
volume_id_t m_vol_id;
165210
static inline uint32_t m_volume_id_{1};
211+
// Mapping from lba to data patttern.
166212
std::map< lba_t, uint64_t > m_lba_data;
213+
boost::icl::interval_set< int > m_inflight_ios;
167214
};
168215

169216
class VolumeIOTest : public ::testing::Test {
@@ -199,14 +246,21 @@ class VolumeIOTest : public ::testing::Test {
199246
LOGINFO("IO completed");
200247
}
201248

202-
void verify_data(shared< VolumeIOImpl > vol_impl = nullptr) {
249+
void verify_all_data(shared< VolumeIOImpl > vol_impl = nullptr) {
203250
if (vol_impl) {
204-
vol_impl->verify_data();
251+
vol_impl->verify_all_data();
205252
return;
206253
}
207254

208255
for (auto& vol_impl : m_vols_impl) {
209-
vol_impl->verify_data();
256+
vol_impl->verify_all_data();
257+
}
258+
}
259+
260+
void restart(int shutdown_delay) {
261+
g_helper->restart(shutdown_delay);
262+
for (auto& vol_impl : m_vols_impl) {
263+
vol_impl->reset();
210264
}
211265
}
212266

@@ -225,23 +279,22 @@ TEST_F(VolumeIOTest, SingleVolumeWriteData) {
225279
LOGINFO("Write and verify data with num_iter={} start={} nblks={}", num_iter, start_lba, nblks);
226280
for (uint32_t i = 0; i < num_iter; i++) {
227281
generate_io_single(vol, start_lba, nblks);
228-
verify_data(vol);
282+
verify_all_data(vol);
229283
}
230284

231285
// Verify data after restart.
232-
g_helper->restart(10);
233-
vol->reset();
286+
restart(5);
234287

235288
LOGINFO("Verify data");
236-
verify_data(vol);
289+
verify_all_data(vol);
237290

238291
// Write and verify again on same LBA range to single volume multiple times.
239292
LOGINFO("Write and verify data with num_iter={} start={} nblks={}", num_iter, start_lba, nblks);
240293
for (uint32_t i = 0; i < num_iter; i++) {
241294
generate_io_single(vol, start_lba, nblks);
242295
}
243296

244-
verify_data(vol);
297+
verify_all_data(vol);
245298
LOGINFO("SingleVolumeWriteData test done.");
246299
}
247300

@@ -251,24 +304,38 @@ TEST_F(VolumeIOTest, MultipleVolumeWriteData) {
251304
generate_io();
252305

253306
LOGINFO("Verify data");
254-
verify_data();
307+
verify_all_data();
308+
309+
restart(5);
310+
311+
LOGINFO("Verify data again");
312+
verify_all_data();
313+
314+
LOGINFO("Write data randomly");
315+
generate_io();
316+
317+
LOGINFO("Verify data");
318+
verify_all_data();
319+
255320
LOGINFO("MultipleVolumeWriteData test done.");
256321
}
257322

258323
int main(int argc, char* argv[]) {
259324
int parsed_argc = argc;
260-
::testing::InitGoogleTest(&parsed_argc, argv);
261-
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_common_setup, test_volume_io_setup, homeblocks);
262-
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");
263-
parsed_argc = 1;
264-
auto f = ::folly::Init(&parsed_argc, &argv, true);
325+
char** orig_argv = argv;
265326

266327
std::vector< std::string > args;
267328
for (int i = 0; i < argc; ++i) {
268329
args.emplace_back(argv[i]);
269330
}
270331

271-
g_helper = std::make_unique< test_common::HBTestHelper >("test_volume_io", args, argv);
332+
::testing::InitGoogleTest(&parsed_argc, argv);
333+
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_common_setup, test_volume_io_setup, homeblocks);
334+
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");
335+
parsed_argc = 1;
336+
auto f = ::folly::Init(&parsed_argc, &argv, true);
337+
338+
g_helper = std::make_unique< test_common::HBTestHelper >("test_volume_io", args, orig_argv);
272339
g_helper->setup();
273340
auto ret = RUN_ALL_TESTS();
274341
g_helper->teardown();

src/lib/volume_mgr.cpp

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -260,27 +260,57 @@ VolumeManager::NullAsyncResult HomeBlocksImpl::unmap(const VolumePtr& vol, const
260260
void HomeBlocksImpl::submit_io_batch() { RELEASE_ASSERT(false, "submit_io_batch Not implemented"); }
261261

262262
void HomeBlocksImpl::on_write(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
263-
const std::vector< homestore::MultiBlkId >& blkids,
263+
const std::vector< homestore::MultiBlkId >& new_blkids,
264264
cintrusive< homestore::repl_req_ctx >& ctx) {
265265
repl_result_ctx< VolumeManager::NullResult >* repl_ctx{nullptr};
266266
if (ctx) { repl_ctx = boost::static_pointer_cast< repl_result_ctx< VolumeManager::NullResult > >(ctx).get(); }
267267
auto msg_header = r_cast< HomeBlksMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));
268268

269-
// Avoid expensive lock during normal write flow.
269+
// Key contains the list of checksums and old blkids. Before we ack the client
270+
// request, we free the old blkid's. Also if its recovery we overwrite the index
271+
// with checksum and new blkid's. We need to overwrite index during recovery as all the
272+
// index writes may not be flushed to disk during crash.
270273
VolumePtr vol_ptr{nullptr};
271-
if (repl_ctx) {
272-
vol_ptr = repl_ctx->vol_ptr_;
273-
} else {
274-
// For recovery path there wont repl_ctx and vol_ptr.
274+
auto journal_entry = r_cast< const VolJournalEntry* >(key.cbytes());
275+
auto key_buffer = r_cast< const uint8_t* >(journal_entry + 1);
276+
277+
if (repl_ctx == nullptr) {
278+
// For recovery path repl_ctx and vol_ptr wont be available.
275279
auto lg = std::shared_lock(vol_lock_);
276280
auto it = vol_map_.find(msg_header->volume_id);
277281
RELEASE_ASSERT(it != vol_map_.end(), "Didnt find volume {}", boost::uuids::to_string(msg_header->volume_id));
278282
vol_ptr = it->second;
283+
284+
// During log recovery overwrite new blkid and checksum to index.
285+
std::unordered_map< lba_t, BlockInfo > blocks_info;
286+
lba_t start_lba = journal_entry->start_lba;
287+
for (auto& blkid : new_blkids) {
288+
for (uint32_t i = 0; i < blkid.blk_count(); i++) {
289+
auto new_bid = BlkId{blkid.blk_num() + i, 1 /* nblks */, blkid.chunk_num()};
290+
auto csum = *r_cast< const homestore::csum_t* >(key_buffer);
291+
blocks_info.emplace(start_lba + i, BlockInfo{new_bid, BlkId{}, csum});
292+
key_buffer += sizeof(homestore::csum_t);
293+
}
294+
295+
// We ignore the existing values we got in blocks_info from index as it will be
296+
// same checksum, blkid we see in the journal entry.
297+
lba_t end_lba = start_lba + blkid.blk_count() - 1;
298+
auto status = write_to_index(vol_ptr, start_lba, end_lba, blocks_info);
299+
RELEASE_ASSERT(status, "Index error during recovery");
300+
start_lba = end_lba + 1;
301+
}
302+
} else {
303+
// Avoid expensive lock during normal write flow.
304+
vol_ptr = repl_ctx->vol_ptr_;
305+
key_buffer += (journal_entry->nlbas * sizeof(homestore::csum_t));
279306
}
280307

281-
// Free all the old blkids.
282-
for (auto& blkid : blkids) {
283-
vol_ptr->rd()->async_free_blks(lsn, blkid);
308+
// Free all the old blkids. This happens for both normal writes
309+
// and crash recovery.
310+
for (uint32_t i = 0; i < journal_entry->num_old_blks; i++) {
311+
BlkId old_blkid = *r_cast< const BlkId* >(key_buffer);
312+
vol_ptr->rd()->async_free_blks(lsn, old_blkid);
313+
key_buffer += sizeof(BlkId);
284314
}
285315

286316
if (repl_ctx) { repl_ctx->promise_.setValue(folly::Unit()); }

0 commit comments

Comments
 (0)