Skip to content

Commit 5d7fbb7

Browse files
authored
[sys-4816] handle rocksdb upgrade to 7.10 in backwards compatible way (#277)
A new per-file entity called epoch number was introduced in v7.10, which is used to sort L0 files (previously sorted by largest_seqno). We need to make sure this number matches in leader follower. This is quite tricky during leaf deploy. * leader is on new version while follower is on old version. This is fine actually. epoch number will just be skipped on follower * follower is on new version while leader is on old version. This is the tricky one and this entire PR is to deal with this case. The hack we use is to make follower to ignore epoch number from leader but simply calculating the epoch number on the fly. There are two cases: * flush which generates L0 files. epoch number is allocated based on next_epoch_number of each CF. The L0 files are sorted based on largest seqno. * compaction which merges files in lower levels to higher levels. epoch number = min epoch number of input files. This is mostly fine, except when db is reopened. Rocksdb doesn't track next_epoch_number in manifest file. When db is reopened, it calculates the next_epoch_number based on max epoch number of existing live files. So it's possible for the next_epoch_number to go backwards when db is reopened. Following two cases are possible: * leader reopens db, causing next_epoch_number on leader to go backwards. So follower needs to rewind it. * follower reopens db, causing next_epoch_number on follower to go backwards. So follower needs to advance it. A new replication option: AR_RESET_IF_EPOCH_MISMATCH is added to help with this issue. rewind and advance are handled carefully to make sure it doesn't break existing file ordering. The change in this PR is quite hacky, but fortunately we can remove most of them once rocksdb is fully upgraded
1 parent 2ad9544 commit 5d7fbb7

File tree

7 files changed

+271
-31
lines changed

7 files changed

+271
-31
lines changed

cloud/replication_test.cc

Lines changed: 120 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,11 @@ class ReplicationTest : public testing::Test {
224224
DB* currentLeader() const {
225225
return leader_db_.get();
226226
}
227+
228+
DBImpl* leaderFull() const {
229+
return static_cast_with_check<DBImpl>(currentLeader());
230+
}
231+
227232
DB* currentFollower() const {
228233
return follower_db_.get();
229234
}
@@ -265,12 +270,39 @@ class ReplicationTest : public testing::Test {
265270
return keys;
266271
}
267272

273+
// verify that the current log structured merge tree of two CFs to be the same
274+
void verifyLSMTEqual(ColumnFamilyHandle* h1, ColumnFamilyHandle* h2) {
275+
auto cf1 = static_cast_with_check<ColumnFamilyHandleImpl>(h1)->cfd(),
276+
cf2 = static_cast_with_check<ColumnFamilyHandleImpl>(h2)->cfd();
277+
ASSERT_EQ(cf1->NumberLevels(), cf2->NumberLevels())
278+
<< h1->GetName() << ", " << h2->GetName();
279+
280+
for (int level = 0; level < cf1->NumberLevels(); level++) {
281+
auto files1 = cf1->current()->storage_info()->LevelFiles(level),
282+
files2 = cf2->current()->storage_info()->LevelFiles(level);
283+
ASSERT_EQ(files1.size(), files2.size())
284+
<< "mismatched number of files at level: " << level
285+
<< " between cf: " << cf1->GetName()
286+
<< " and cf: " << cf2->GetName();
287+
for (size_t i = 0; i < files1.size(); i++) {
288+
auto f1 = files1[i], f2 = files2[i];
289+
ASSERT_EQ(f1->fd.file_size, f2->fd.file_size);
290+
ASSERT_EQ(f1->fd.smallest_seqno, f2->fd.smallest_seqno);
291+
ASSERT_EQ(f1->fd.largest_seqno, f2->fd.largest_seqno);
292+
ASSERT_EQ(f1->epoch_number, f2->epoch_number);
293+
ASSERT_EQ(f1->file_checksum, f2->file_checksum);
294+
ASSERT_EQ(f1->unique_id, f2->unique_id);
295+
}
296+
}
297+
}
298+
268299
void verifyEqual() {
269300
ASSERT_EQ(leader_cfs_.size(), follower_cfs_.size());
270301
auto leader = leader_db_.get(), follower = follower_db_.get();
271302
for (auto& [name, cf1]: leader_cfs_) {
272303
auto cf2 = followerCF(name);
273304
verifyNextLogNumAndReplSeqConsistency(name);
305+
verifyLSMTEqual(cf1.get(), cf2);
274306

275307
auto itrLeader = std::unique_ptr<Iterator>(
276308
leader->NewIterator(ReadOptions(), cf1.get()));
@@ -290,6 +322,7 @@ class ReplicationTest : public testing::Test {
290322

291323
protected:
292324
std::shared_ptr<Logger> info_log_;
325+
bool replicate_epoch_number_{true};
293326
void resetFollowerSequence(int new_seq) {
294327
followerSequence_ = new_seq;
295328
}
@@ -420,6 +453,12 @@ size_t ReplicationTest::catchUpFollower(
420453
MutexLock lock(&log_records_mutex_);
421454
DB::ApplyReplicationLogRecordInfo info;
422455
size_t ret = 0;
456+
unsigned flags = DB::AR_EVICT_OBSOLETE_FILES;
457+
if (replicate_epoch_number_) {
458+
flags |= DB::AR_REPLICATE_EPOCH_NUM;
459+
} else {
460+
flags |= DB::AR_RESET_IF_EPOCH_MISMATCH;
461+
}
423462
for (; followerSequence_ < (int)log_records_.size(); ++followerSequence_) {
424463
if (num_records && ret >= *num_records) {
425464
break;
@@ -430,8 +469,9 @@ size_t ReplicationTest::catchUpFollower(
430469
[this](Slice) {
431470
return ColumnFamilyOptions(follower_db_->GetOptions());
432471
},
433-
allow_new_manifest_writes, &info, DB::AR_EVICT_OBSOLETE_FILES);
472+
allow_new_manifest_writes, &info, flags);
434473
assert(s.ok());
474+
assert(info.mismatched_epoch_num == 0);
435475
++ret;
436476
}
437477
if (info.has_new_manifest_writes) {
@@ -1098,7 +1138,18 @@ TEST_F(ReplicationTest, EvictObsoleteFiles) {
10981138
static_cast_with_check<DBImpl>(follower)->TEST_table_cache()->GetUsage());
10991139
}
11001140

1101-
TEST_F(ReplicationTest, Stress) {
1141+
class ReplicationTestWithParam : public ReplicationTest,
1142+
public testing::WithParamInterface<bool> {
1143+
public:
1144+
ReplicationTestWithParam()
1145+
: ReplicationTest() {}
1146+
1147+
void SetUp() override {
1148+
replicate_epoch_number_ = GetParam();
1149+
}
1150+
};
1151+
1152+
TEST_P(ReplicationTestWithParam, Stress) {
11021153
std::string val;
11031154
auto leader = openLeader();
11041155
openFollower();
@@ -1114,49 +1165,74 @@ TEST_F(ReplicationTest, Stress) {
11141165
createColumnFamily(cf(i));
11151166
}
11161167

1117-
auto do_writes = [&](int n) {
1118-
auto rand = Random::GetTLSInstance();
1119-
while (n > 0) {
1120-
auto cfi = rand->Uniform(kColumnFamilyCount);
1121-
rocksdb::WriteBatch wb;
1122-
for (size_t i = 0; i < 3; ++i) {
1123-
--n;
1124-
wb.Put(leaderCF(cf(cfi)), std::to_string(rand->Uniform(kMaxKey)),
1125-
std::to_string(rand->Next()));
1168+
auto do_writes = [&]() {
1169+
auto writes_per_thread = [&](int n) {
1170+
auto rand = Random::GetTLSInstance();
1171+
while (n > 0) {
1172+
auto cfi = rand->Uniform(kColumnFamilyCount);
1173+
rocksdb::WriteBatch wb;
1174+
for (size_t i = 0; i < 3; ++i) {
1175+
--n;
1176+
wb.Put(leaderCF(cf(cfi)), std::to_string(rand->Uniform(kMaxKey)),
1177+
std::to_string(rand->Next()));
1178+
}
1179+
ASSERT_OK(leader->Write(wo(), &wb));
11261180
}
1127-
ASSERT_OK(leader->Write(wo(), &wb));
1181+
};
1182+
1183+
std::vector<std::thread> threads;
1184+
for (size_t i = 0; i < kThreadCount; ++i) {
1185+
threads.emplace_back([&]() { writes_per_thread(kWritesPerThread); });
1186+
}
1187+
for (auto& t : threads) {
1188+
t.join();
11281189
}
1190+
1191+
ASSERT_OK(
1192+
leaderFull()->TEST_WaitForBackgroundWork());
11291193
};
11301194

1131-
std::vector<std::thread> threads;
1132-
for (size_t i = 0; i < kThreadCount; ++i) {
1133-
threads.emplace_back([&]() { do_writes(kWritesPerThread); });
1134-
}
1135-
for (auto& t : threads) {
1136-
t.join();
1137-
}
1138-
ASSERT_OK(
1139-
static_cast_with_check<DBImpl>(leader)->TEST_WaitForBackgroundWork());
1195+
auto verifyNextEpochNumber = [&]() {
1196+
for (int i = 0; i < kColumnFamilyCount; i++) {
1197+
auto cf1 = leaderCFD(cf(i)), cf2 = followerCFD(cf(i));
1198+
ASSERT_EQ(cf1->GetNextEpochNumber(), cf2->GetNextEpochNumber());
1199+
}
1200+
};
11401201

1141-
catchUpFollower();
1202+
do_writes();
11421203

1204+
catchUpFollower();
11431205
verifyEqual();
1206+
verifyNextEpochNumber();
1207+
1208+
ROCKS_LOG_INFO(info_log_, "reopen leader");
11441209

11451210
// Reopen leader
11461211
closeLeader();
11471212
leader = openLeader();
1148-
ASSERT_OK(leader->Flush(FlushOptions()));
1149-
1213+
// memtable might not be empty after reopening leader, since we recover
1214+
// replication log when opening it.
1215+
ASSERT_OK(leader->Flush({}));
1216+
ASSERT_OK(leaderFull()->TEST_WaitForBackgroundWork());
1217+
catchUpFollower();
11501218
verifyEqual();
11511219

1220+
do_writes();
1221+
1222+
ROCKS_LOG_INFO(info_log_, "reopen follower");
1223+
11521224
// Reopen follower
11531225
closeFollower();
11541226
openFollower();
11551227
catchUpFollower();
11561228

11571229
verifyEqual();
1230+
verifyNextEpochNumber();
11581231
}
11591232

1233+
INSTANTIATE_TEST_CASE_P(ReplicationTest, ReplicationTestWithParam,
1234+
::testing::Values(false, true));
1235+
11601236
TEST_F(ReplicationTest, DeleteRange) {
11611237
auto leader = openLeader();
11621238
openFollower();
@@ -1201,6 +1277,26 @@ TEST_F(ReplicationTest, DeleteRange) {
12011277
verifyEqual();
12021278
}
12031279

1280+
TEST_F(ReplicationTest, EpochNumberSimple) {
1281+
auto options = leaderOptions();
1282+
options.disable_auto_compactions = true;
1283+
auto leader = openLeader();
1284+
openFollower();
1285+
1286+
ASSERT_OK(leader->Put(wo(), "k1", "v1"));
1287+
ASSERT_OK(leader->Flush({}));
1288+
catchUpFollower();
1289+
1290+
ASSERT_OK(leader->Put(wo(), "k1", "v2"));
1291+
ASSERT_OK(leader->Flush({}));
1292+
auto leaderFull = static_cast_with_check<DBImpl>(leader);
1293+
ASSERT_OK(leaderFull->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
1294+
1295+
catchUpFollower();
1296+
1297+
verifyEqual();
1298+
}
1299+
12041300
} // namespace ROCKSDB_NAMESPACE
12051301

12061302
// A black-box test for the cloud wrapper around rocksdb

db/compaction/compaction_job.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1590,11 +1590,12 @@ Status CompactionJob::FinishCompactionOutputFile(
15901590
outputs.UpdateTableProperties();
15911591
ROCKS_LOG_INFO(db_options_.info_log,
15921592
"[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
1593-
" keys, %" PRIu64 " bytes%s, temperature: %s",
1593+
" keys, %" PRIu64 " bytes%s, temperature: %s, epoch number: %" PRIu64,
15941594
cfd->GetName().c_str(), job_id_, output_number,
15951595
current_entries, meta->fd.file_size,
15961596
meta->marked_for_compaction ? " (need compaction)" : "",
1597-
temperature_to_string[meta->temperature].c_str());
1597+
temperature_to_string[meta->temperature].c_str(),
1598+
meta->epoch_number);
15981599
}
15991600
std::string fname;
16001601
FileDescriptor output_fd;

db/db_impl/db_impl.cc

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1217,6 +1217,7 @@ std::string DescribeVersionEdit(const VersionEdit& e, ColumnFamilyData* cfd) {
12171217
}
12181218
first = false;
12191219
oss << f.second.fd.GetNumber();
1220+
oss << ":" << f.second.epoch_number;
12201221
}
12211222
oss << "] ";
12221223
}
@@ -1424,9 +1425,132 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
14241425
autovector<VersionEdit*> el;
14251426
el.push_back(&e);
14261427
edit_lists.push_back(std::move(el));
1427-
14281428
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
14291429
DescribeVersionEdit(e, cfd).c_str());
1430+
auto& newFiles = e.GetNewFiles();
1431+
bool epoch_recovery_succeeded = true;
1432+
std::ostringstream err_oss;
1433+
if (!(flags & AR_REPLICATE_EPOCH_NUM)) {
1434+
// Epoch number calculation on the fly.
1435+
// There are two cases in which we need to calculate epoch number
1436+
// when applying `kManifestWrite`
1437+
// 1. flush which generates L0 files. epoch number is allocated
1438+
// based on `next_epoch_number` of each CF. The L0 files are sorted
1439+
// based on `largest seqno`.
1440+
// 2. compaction which merges files in lower levels to higher
1441+
// levels. epoch number = min epoch number of input files.
1442+
const auto& deletedFiles = e.GetDeletedFiles();
1443+
if (deletedFiles.empty() && !newFiles.empty()) {
1444+
// case 1: flush into L0 files. New files must be level 0
1445+
1446+
for (auto& p : newFiles) {
1447+
if (p.first != 0) {
1448+
epoch_recovery_succeeded = false;
1449+
err_oss << "newly flushed file: " << p.first << " is not at L0";
1450+
break;
1451+
}
1452+
}
1453+
1454+
// sort added files by largest seqno
1455+
std::vector<FileMetaData*> added_files;
1456+
for(auto& p: newFiles) {
1457+
added_files.push_back(&p.second);
1458+
}
1459+
1460+
NewestFirstBySeqNo cmp;
1461+
std::sort(added_files.begin(), added_files.end(), cmp);
1462+
auto first_file = added_files[0];
1463+
// Rewind/advance next_epoch_number. This is necessary if epoch_number
1464+
// mismtaches due to db reopen.
1465+
if (first_file->epoch_number != kUnknownEpochNumber &&
1466+
first_file->epoch_number != cfd->GetNextEpochNumber() &&
1467+
(flags & AR_RESET_IF_EPOCH_MISMATCH)) {
1468+
auto max_epoch_number =
1469+
cfd->current()->storage_info()->GetMaxEpochNumberOfFiles();
1470+
if (first_file->epoch_number < cfd->GetNextEpochNumber() &&
1471+
(first_file->epoch_number == max_epoch_number + 1)) {
1472+
ROCKS_LOG_INFO(immutable_db_options_.info_log,
1473+
"[%s] rewind next_epoch_number from: %" PRIu64
1474+
" to %" PRIu64,
1475+
cfd->GetName().c_str(),
1476+
cfd->GetNextEpochNumber(),
1477+
max_epoch_number + 1);
1478+
cfd->SetNextEpochNumber(max_epoch_number + 1);
1479+
} else if (first_file->epoch_number >
1480+
cfd->GetNextEpochNumber() &&
1481+
(cfd->GetNextEpochNumber() ==
1482+
max_epoch_number + 1)) {
1483+
ROCKS_LOG_INFO(immutable_db_options_.info_log,
1484+
"[%s] advance next_epoch_number from: %" PRIu64
1485+
" to %" PRIu64,
1486+
cfd->GetName().c_str(),
1487+
cfd->GetNextEpochNumber(),
1488+
first_file->epoch_number);
1489+
cfd->SetNextEpochNumber(first_file->epoch_number);
1490+
} else {
1491+
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1492+
"[%s] unexpected epoch number: %" PRIu64
1493+
" for file: %" PRIu64
1494+
" ; max epoch number: %" PRIu64,
1495+
cfd->GetName().c_str(),
1496+
first_file->epoch_number,
1497+
first_file->fd.GetNumber(),
1498+
max_epoch_number);
1499+
s = Status::Corruption("unexpected epoch number for added file");
1500+
break;
1501+
}
1502+
}
1503+
1504+
for (auto meta: added_files) {
1505+
auto old_epoch_number = meta->epoch_number;
1506+
meta->epoch_number = cfd->NewEpochNumber();
1507+
if (old_epoch_number != meta->epoch_number) {
1508+
info->mismatched_epoch_num += 1;
1509+
}
1510+
}
1511+
} else if (!deletedFiles.empty() && !newFiles.empty()) {
1512+
// case 2: compaction
1513+
uint64_t min_input_epoch_number =
1514+
std::numeric_limits<uint64_t>::max();
1515+
const auto& storage_info = cfd->current()->storage_info();
1516+
for (auto [level, file_number] : deletedFiles) {
1517+
auto meta = storage_info->GetFileMetaDataByNumber(file_number);
1518+
if (!meta) {
1519+
err_oss << "deleted file: " << file_number
1520+
<< " at level: " << level << " not found";
1521+
break;
1522+
}
1523+
min_input_epoch_number =
1524+
std::min(meta->epoch_number, min_input_epoch_number);
1525+
}
1526+
1527+
for (auto& p: newFiles) {
1528+
auto old_epoch_number = p.second.epoch_number;
1529+
p.second.epoch_number = min_input_epoch_number;
1530+
if (old_epoch_number != p.second.epoch_number) {
1531+
info->mismatched_epoch_num += 1;
1532+
}
1533+
}
1534+
}
1535+
} else if (newFiles.size() > 0) {
1536+
// Maintain next epoch number on follower
1537+
auto next_epoch_number = cfd->GetNextEpochNumber();
1538+
for (auto& p : newFiles) {
1539+
auto epoch_number = p.second.epoch_number;
1540+
// advance next epoch number. next_epoch_number never goes
1541+
// backwards
1542+
if (epoch_number != kUnknownEpochNumber &&
1543+
(epoch_number >= next_epoch_number)) {
1544+
next_epoch_number = epoch_number + 1;
1545+
}
1546+
}
1547+
cfd->SetNextEpochNumber(next_epoch_number);
1548+
}
1549+
1550+
if (!epoch_recovery_succeeded) {
1551+
s = Status::Corruption(err_oss.str());
1552+
break;
1553+
}
14301554
}
14311555
if (!s.ok()) {
14321556
break;

db/flush_job.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -895,9 +895,9 @@ Status FlushJob::WriteLevel0Table() {
895895
NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
896896
static_cast<int>(memtables.size()), &arena));
897897
ROCKS_LOG_INFO(db_options_.info_log,
898-
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
898+
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started. Epoch number: %" PRIu64,
899899
cfd_->GetName().c_str(), job_context_->job_id,
900-
meta_.fd.GetNumber());
900+
meta_.fd.GetNumber(), meta_.epoch_number);
901901

902902
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
903903
&output_compression_);

0 commit comments

Comments
 (0)