Skip to content

Commit 9adb0c2

Browse files
committed
Add recovery changes for write path.
Add recovery changes for write path. Support non overlapping io ranges in UT. Add to index in the recovery path.
1 parent ca1d0fa commit 9adb0c2

File tree

3 files changed

+130
-35
lines changed

3 files changed

+130
-35
lines changed

src/lib/volume/index.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,6 @@ class VolumeIndexValue : public homestore::BtreeIntervalValue {
249249
// Get the next blk num and checksum
250250
m_blkid_suffix += n;
251251
auto curr_lba = ctx->start_lba + n;
252-
LOGINFO("shift n={} blk_num={} curr_lba={}", n, m_blkid_suffix, curr_lba);
253252
DEBUG_ASSERT(ctx->block_info->find(curr_lba) != ctx->block_info->end(), "Invalid index");
254253
m_checksum = (*ctx->block_info)[curr_lba].checksum;
255254
}

src/lib/volume/tests/test_volume_io.cpp

Lines changed: 95 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include <string>
1717

18+
#include <boost/icl/interval_set.hpp>
1819
#include <folly/init/Init.h>
1920
#include <gtest/gtest.h>
2021
#include <sisl/options/options.h>
@@ -56,6 +57,7 @@ class VolumeIOImpl {
5657
public:
5758
void create_volume() {
5859
auto vinfo = gen_vol_info(m_volume_id_++);
60+
m_vol_name = vinfo.name;
5961
m_vol_id = vinfo.id;
6062

6163
auto vol_mgr = g_helper->inst()->volume_manager();
@@ -78,13 +80,39 @@ class VolumeIOImpl {
7880
ASSERT_TRUE(m_vol_ptr != nullptr);
7981
}
8082

83+
void get_random_non_overlapping_lba(lba_t& start_lba, uint32_t& nblks, uint64_t max_blks) {
84+
if (start_lba != 0 && nblks != 0) {
85+
lba_t end_lba = start_lba + nblks - 1;
86+
auto new_range = boost::icl::interval< int >::closed(start_lba, end_lba);
87+
// For user provided lba and nblks, check if they are already in flight.
88+
std::lock_guard lock(m_mutex);
89+
ASSERT_TRUE(m_inflight_ios.find(new_range) == m_inflight_ios.end());
90+
m_inflight_ios.insert(new_range);
91+
return;
92+
}
93+
94+
do {
95+
// Generate lba which are not overlapped with the inflight ios, otherwise
96+
// we cant decide which io completed last and cant verify the data.
97+
start_lba = rand() % max_blks;
98+
nblks = std::max(1, rand() % 64);
99+
lba_t end_lba = start_lba + nblks - 1;
100+
auto new_range = boost::icl::interval< int >::closed(start_lba, end_lba);
101+
std::lock_guard lock(m_mutex);
102+
if (m_inflight_ios.find(new_range) == m_inflight_ios.end()) {
103+
m_inflight_ios.insert(new_range);
104+
break;
105+
}
106+
107+
} while (true);
108+
}
109+
81110
auto build_random_data(lba_t& start_lba, uint32_t& nblks) {
82111
// Write upto 1-64 nblks * 4k = 256k size.
83112
auto info = m_vol_ptr->info();
84113
uint64_t page_size = info->page_size;
85114
uint64_t max_blks = info->size_bytes / page_size;
86-
start_lba = start_lba == 0 ? rand() % max_blks : start_lba;
87-
nblks = nblks == 0 ? std::max(1, rand() % 64) : nblks;
115+
get_random_non_overlapping_lba(start_lba, nblks, max_blks);
88116
nblks = std::min(static_cast< uint64_t >(nblks), max_blks - static_cast< uint64_t >(start_lba));
89117

90118
auto data_size = nblks * page_size;
@@ -94,9 +122,13 @@ class VolumeIOImpl {
94122
uint64_t data_pattern = ((long long)rand() << 32) | rand();
95123
test_common::HBTestHelper::fill_data_buf(data_bytes, page_size, data_pattern);
96124
data_bytes += page_size;
97-
// Store the lba to pattern mapping
98-
m_lba_data[lba] = data_pattern;
99-
LOGINFO("Generate data lba={} pattern={}", lba, data_pattern);
125+
{
126+
// Store the lba to pattern mapping
127+
std::lock_guard lock(m_mutex);
128+
m_lba_data[lba] = data_pattern;
129+
}
130+
131+
LOGDEBUG("Generate data vol={} lba={} pattern={}", m_vol_name, lba, data_pattern);
100132
lba++;
101133
}
102134

@@ -112,8 +144,13 @@ class VolumeIOImpl {
112144
auto vol_mgr = g_helper->inst()->volume_manager();
113145
vol_mgr->write(m_vol_ptr, req)
114146
.via(&folly::InlineExecutor::instance())
115-
.thenValue([this, data, &waiter](auto&& result) {
147+
.thenValue([this, data, req, &waiter](auto&& result) {
116148
ASSERT_FALSE(result.hasError());
149+
{
150+
std::lock_guard lock(m_mutex);
151+
m_inflight_ios.erase(boost::icl::interval< int >::closed(req->lba, req->lba + req->nlbas - 1));
152+
}
153+
117154
waiter.one_complete();
118155
});
119156
});
@@ -125,22 +162,28 @@ class VolumeIOImpl {
125162
auto data = build_random_data(start_lba, nblks);
126163
vol_interface_req_ptr req(new vol_interface_req{data->bytes(), start_lba, nblks});
127164
auto vol_mgr = g_helper->inst()->volume_manager();
128-
vol_mgr->write(m_vol_ptr, req).via(&folly::InlineExecutor::instance()).thenValue([this, data](auto&& result) {
129-
ASSERT_FALSE(result.hasError());
130-
g_helper->runner().next_task();
131-
});
165+
vol_mgr->write(m_vol_ptr, req)
166+
.via(&folly::InlineExecutor::instance())
167+
.thenValue([this, req, data](auto&& result) {
168+
ASSERT_FALSE(result.hasError());
169+
{
170+
std::lock_guard lock(m_mutex);
171+
m_inflight_ios.erase(boost::icl::interval< int >::closed(req->lba, req->lba + req->nlbas - 1));
172+
}
173+
g_helper->runner().next_task();
174+
});
132175
}
133176

134-
void verify_data() {
177+
void verify_all_data() {
135178
for (auto& [lba, data_pattern] : m_lba_data) {
136179
auto buffer = iomanager.iobuf_alloc(512, 4096);
137180
vol_interface_req_ptr req(new vol_interface_req{buffer, lba, 1});
138181

139182
auto vol_mgr = g_helper->inst()->volume_manager();
140183
vol_mgr->read(m_vol_ptr, req).get();
141-
// LOGINFO("Data read={}", fmt::format("{}", spdlog::to_hex((buffer), (buffer) + (128))));
142184
test_common::HBTestHelper::validate_data_buf(buffer, 4096, data_pattern);
143-
LOGINFO("Verify data lba={} pattern={} {}", lba, data_pattern, *r_cast< uint64_t* >(buffer));
185+
LOGDEBUG("Verify data vol={} lba={} pattern={} {}", m_vol_name, lba, data_pattern,
186+
*r_cast< uint64_t* >(buffer));
144187
iomanager.iobuf_free(buffer);
145188
}
146189
}
@@ -160,10 +203,13 @@ class VolumeIOImpl {
160203
#ifdef _PRERELEASE
161204
flip::FlipClient m_fc{iomgr_flip::instance()};
162205
#endif
206+
std::mutex m_mutex;
207+
std::string m_vol_name;
163208
VolumePtr m_vol_ptr;
164209
volume_id_t m_vol_id;
165210
static inline uint32_t m_volume_id_{1};
166211
std::map< lba_t, uint64_t > m_lba_data;
212+
boost::icl::interval_set< int > m_inflight_ios;
167213
};
168214

169215
class VolumeIOTest : public ::testing::Test {
@@ -199,14 +245,21 @@ class VolumeIOTest : public ::testing::Test {
199245
LOGINFO("IO completed");
200246
}
201247

202-
void verify_data(shared< VolumeIOImpl > vol_impl = nullptr) {
248+
void verify_all_data(shared< VolumeIOImpl > vol_impl = nullptr) {
203249
if (vol_impl) {
204-
vol_impl->verify_data();
250+
vol_impl->verify_all_data();
205251
return;
206252
}
207253

208254
for (auto& vol_impl : m_vols_impl) {
209-
vol_impl->verify_data();
255+
vol_impl->verify_all_data();
256+
}
257+
}
258+
259+
void restart(int shutdown_delay) {
260+
g_helper->restart(shutdown_delay);
261+
for (auto& vol_impl : m_vols_impl) {
262+
vol_impl->reset();
210263
}
211264
}
212265

@@ -219,29 +272,28 @@ class VolumeIOTest : public ::testing::Test {
219272
TEST_F(VolumeIOTest, SingleVolumeWriteData) {
220273
// Write and verify fixed LBA range to single volume multiple times.
221274
auto vol = volume_list().back();
222-
uint32_t nblks = 100;
275+
uint32_t nblks = 10;
223276
lba_t start_lba = 1;
224277
uint32_t num_iter = 1;
225278
LOGINFO("Write and verify data with num_iter={} start={} nblks={}", num_iter, start_lba, nblks);
226279
for (uint32_t i = 0; i < num_iter; i++) {
227280
generate_io_single(vol, start_lba, nblks);
228-
verify_data(vol);
281+
verify_all_data(vol);
229282
}
230283

231284
// Verify data after restart.
232-
g_helper->restart(10);
233-
vol->reset();
285+
restart(5);
234286

235287
LOGINFO("Verify data");
236-
verify_data(vol);
288+
verify_all_data(vol);
237289

238290
// Write and verify again on same LBA range to single volume multiple times.
239291
LOGINFO("Write and verify data with num_iter={} start={} nblks={}", num_iter, start_lba, nblks);
240292
for (uint32_t i = 0; i < num_iter; i++) {
241293
generate_io_single(vol, start_lba, nblks);
242294
}
243295

244-
verify_data(vol);
296+
verify_all_data(vol);
245297
LOGINFO("SingleVolumeWriteData test done.");
246298
}
247299

@@ -251,24 +303,38 @@ TEST_F(VolumeIOTest, MultipleVolumeWriteData) {
251303
generate_io();
252304

253305
LOGINFO("Verify data");
254-
verify_data();
306+
verify_all_data();
307+
308+
restart(5);
309+
310+
LOGINFO("Verify data again");
311+
verify_all_data();
312+
313+
LOGINFO("Write data randomly");
314+
generate_io();
315+
316+
LOGINFO("Verify data");
317+
verify_all_data();
318+
255319
LOGINFO("MultipleVolumeWriteData test done.");
256320
}
257321

258322
int main(int argc, char* argv[]) {
259323
int parsed_argc = argc;
260-
::testing::InitGoogleTest(&parsed_argc, argv);
261-
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_common_setup, test_volume_io_setup, homeblocks);
262-
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");
263-
parsed_argc = 1;
264-
auto f = ::folly::Init(&parsed_argc, &argv, true);
324+
char** orig_argv = argv;
265325

266326
std::vector< std::string > args;
267327
for (int i = 0; i < argc; ++i) {
268328
args.emplace_back(argv[i]);
269329
}
270330

271-
g_helper = std::make_unique< test_common::HBTestHelper >("test_volume_io", args, argv);
331+
::testing::InitGoogleTest(&parsed_argc, argv);
332+
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_common_setup, test_volume_io_setup, homeblocks);
333+
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");
334+
parsed_argc = 1;
335+
auto f = ::folly::Init(&parsed_argc, &argv, true);
336+
337+
g_helper = std::make_unique< test_common::HBTestHelper >("test_volume_io", args, orig_argv);
272338
g_helper->setup();
273339
auto ret = RUN_ALL_TESTS();
274340
g_helper->teardown();

src/lib/volume_mgr.cpp

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,27 +260,57 @@ VolumeManager::NullAsyncResult HomeBlocksImpl::unmap(const VolumePtr& vol, const
260260
void HomeBlocksImpl::submit_io_batch() { RELEASE_ASSERT(false, "submit_io_batch Not implemented"); }
261261

262262
void HomeBlocksImpl::on_write(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
263-
const std::vector< homestore::MultiBlkId >& blkids,
263+
const std::vector< homestore::MultiBlkId >& new_blkids,
264264
cintrusive< homestore::repl_req_ctx >& ctx) {
265265
repl_result_ctx< VolumeManager::NullResult >* repl_ctx{nullptr};
266266
if (ctx) { repl_ctx = boost::static_pointer_cast< repl_result_ctx< VolumeManager::NullResult > >(ctx).get(); }
267267
auto msg_header = r_cast< HomeBlksMessageHeader* >(const_cast< uint8_t* >(header.cbytes()));
268268

269-
// Avoid expensive lock during normal write flow.
270269
VolumePtr vol_ptr{nullptr};
271270
if (repl_ctx) {
271+
// Avoid expensive lock during normal write flow.
272272
vol_ptr = repl_ctx->vol_ptr_;
273273
} else {
274-
// For recovery path there wont repl_ctx and vol_ptr.
274+
// For recovery path repl_ctx and vol_ptr wont be available.
275275
auto lg = std::shared_lock(vol_lock_);
276276
auto it = vol_map_.find(msg_header->volume_id);
277277
RELEASE_ASSERT(it != vol_map_.end(), "Didnt find volume {}", boost::uuids::to_string(msg_header->volume_id));
278278
vol_ptr = it->second;
279279
}
280280

281+
// Key contains the list of checksums and old blkids. Before we ack the client
282+
// request, we free the old blkid's. Also if its recovery we overwrite the index
283+
// with checksum and new blkid's. We need to overwrite index during recovery as all the
284+
// index writes may not be flushed to disk during crash.
285+
auto journal_entry = r_cast< const VolJournalEntry* >(key.cbytes());
286+
auto key_buffer = r_cast< const uint8_t* >(journal_entry + 1);
287+
288+
if (repl_ctx == nullptr) {
289+
// During log recovery overwrite new blkid and checksum to index.
290+
std::unordered_map< lba_t, BlockInfo > blocks_info;
291+
lba_t start_lba = journal_entry->start_lba;
292+
for (auto& blkid : new_blkids) {
293+
for (uint32_t i = 0; i < blkid.blk_count(); i++) {
294+
auto new_bid = BlkId{blkid.blk_num() + i, 1 /* nblks */, blkid.chunk_num()};
295+
auto csum = *r_cast< const homestore::csum_t* >(key_buffer);
296+
blocks_info.emplace(start_lba + i, BlockInfo{new_bid, BlkId{}, csum});
297+
key_buffer += sizeof(homestore::csum_t);
298+
}
299+
300+
lba_t end_lba = start_lba + blkid.blk_count() - 1;
301+
auto status = write_to_index(vol_ptr, start_lba, end_lba, blocks_info);
302+
RELEASE_ASSERT(status, "Index error during recovery");
303+
start_lba = end_lba + 1;
304+
}
305+
} else {
306+
key_buffer += (journal_entry->nlbas * sizeof(homestore::csum_t));
307+
}
308+
281309
// Free all the old blkids.
282-
for (auto& blkid : blkids) {
283-
vol_ptr->rd()->async_free_blks(lsn, blkid);
310+
for (uint32_t i = 0; i < journal_entry->num_old_blks; i++) {
311+
BlkId old_blkid = *r_cast< const BlkId* >(key_buffer);
312+
vol_ptr->rd()->async_free_blks(lsn, old_blkid);
313+
key_buffer += sizeof(BlkId);
284314
}
285315

286316
if (repl_ctx) { repl_ctx->promise_.setValue(folly::Unit()); }

0 commit comments

Comments
 (0)