@@ -1430,48 +1430,18 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
14301430 if (!s.ok ()) {
14311431 break ;
14321432 }
1433- if (flags & AR_REPLICATE_EPOCH_NUM) {
1434- // replicate epoch number on follower
14351433
1436- s = CheckNextEpochNumberConsistency (e, cfd);
1437- if (!s.ok ()) {
1438- break ;
1439- }
1440-
1441- auto & newFiles = e.GetNewFiles ();
1442- auto & deletedFiles = e.GetDeletedFiles ();
1443-
1444- if (flags & AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION) {
1445- if (deletedFiles.empty () && !newFiles.empty ()) {
1446- // Set next epoch number properly before epoch number consistency check.
1447- // This is necessary if next_epoch_number changes during db reopen.
1448- cfd->SetNextEpochNumber (newFiles.begin ()->second .epoch_number );
1449- }
1434+ s = CheckNextEpochNumberConsistency (e, cfd);
1435+ if (!s.ok ()) {
1436+ break ;
1437+ }
14501438
1451- // do consistency check by comparing the replicated epoch number
1452- // against inferred epoch number No need to
1453- // `reset_next_epoch_number` here since we have already done it
1454- s = InferEpochNumber (&e, cfd, info,
1455- false /* reset_next_epoch_number */ );
1456- if (s.ok () && info->mismatched_epoch_num > 0 ) {
1457- s = Status::Poison (" epoch number consistency check fails" );
1458- }
1459- if (!s.ok ()) {
1460- break ;
1461- }
1462- }
1439+ auto & newFiles = e.GetNewFiles ();
1440+ auto & deletedFiles = e.GetDeletedFiles ();
14631441
1464- // Maintain next epoch number on follower
1465- if (deletedFiles.empty () && !newFiles.empty ()) {
1466- cfd->SetNextEpochNumber (newFiles.rbegin ()->second .epoch_number + 1 );
1467- }
1468- } else {
1469- // infer epoch number on follower
1470- s = InferEpochNumber (&e, cfd, info,
1471- flags & AR_RESET_IF_EPOCH_MISMATCH);
1472- if (!s.ok ()) {
1473- break ;
1474- }
1442+ // Maintain next epoch number on follower
1443+ if (deletedFiles.empty () && !newFiles.empty ()) {
1444+ cfd->SetNextEpochNumber (newFiles.rbegin ()->second .epoch_number + 1 );
14751445 }
14761446 }
14771447 if (!s.ok ()) {
@@ -1548,119 +1518,7 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
15481518 return s;
15491519}
15501520
1551- Status DBImpl::InferEpochNumber (VersionEdit* e, ColumnFamilyData* cfd,
1552- ApplyReplicationLogRecordInfo* info,
1553- bool reset_next_epoch_number) {
1554- auto & newFiles = e->GetNewFiles ();
1555- // Epoch number calculation on the fly.
1556- // There are two cases in which we need to calculate epoch number
1557- // when applying `kManifestWrite`
1558- // 1. flush which generates L0 files. epoch number is allocated
1559- // based on `next_epoch_number` of each CF. The L0 files are sorted
1560- // based on `largest seqno`.
1561- // 2. compaction which merges files in lower levels to higher
1562- // levels. epoch number = min epoch number of input files.
1563- const auto & deletedFiles = e->GetDeletedFiles ();
1564- if (deletedFiles.empty () && !newFiles.empty ()) {
1565- // case 1: flush into L0 files. New files must be level 0
1566-
1567- for (auto & p : newFiles) {
1568- if (p.first != 0 ) {
1569- ROCKS_LOG_ERROR (
1570- immutable_db_options_.info_log ,
1571- " [%s] newly flushed file: %" PRIu64 " < is not at L0 but Level: %d" ,
1572- cfd->GetName ().c_str (), p.second .fd .GetNumber (), p.first );
1573- return Status::Corruption (" Newly flushed file is not at L0" );
1574- }
1575- }
1576-
1577- // sort added files by largest seqno
1578- std::vector<FileMetaData*> added_files;
1579- for (auto & p : newFiles) {
1580- added_files.push_back (&p.second );
1581- }
1582-
1583- NewestFirstBySeqNo cmp;
1584- std::sort (added_files.begin (), added_files.end (), cmp);
1585- auto first_file = added_files[0 ];
1586- // Rewind/advance next_epoch_number. This is necessary if next_epoch_number
1587- // mismtaches due to db reopen.
1588- if (first_file->epoch_number != kUnknownEpochNumber &&
1589- first_file->epoch_number != cfd->GetNextEpochNumber () &&
1590- reset_next_epoch_number) {
1591- auto max_epoch_number =
1592- cfd->current ()->storage_info ()->GetMaxEpochNumberOfFiles ();
1593- if (first_file->epoch_number < cfd->GetNextEpochNumber () &&
1594- (first_file->epoch_number == max_epoch_number + 1 )) {
1595- ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1596- " [%s] rewind next_epoch_number from: %" PRIu64
1597- " to %" PRIu64,
1598- cfd->GetName ().c_str (), cfd->GetNextEpochNumber (),
1599- max_epoch_number + 1 );
1600- cfd->SetNextEpochNumber (max_epoch_number + 1 );
1601- } else if (first_file->epoch_number > cfd->GetNextEpochNumber () &&
1602- (cfd->GetNextEpochNumber () == max_epoch_number + 1 )) {
1603- ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1604- " [%s] advance next_epoch_number from: %" PRIu64
1605- " to %" PRIu64,
1606- cfd->GetName ().c_str (), cfd->GetNextEpochNumber (),
1607- first_file->epoch_number );
1608- cfd->SetNextEpochNumber (first_file->epoch_number );
1609- } else {
1610- // Not safe to rewind/advance `next_epoch_number`. This can happen
1611- // when we do epoch recovery during db open (i.e., nodes run
1612- // with different rocksdb versions and nodes upgrading from old version
1613- // to new version need to recover epoch). Poison is the best we can do
1614- return Status::Poison (" Poison due to diverged next epoch number" );
1615- }
1616- }
1617-
1618- for (auto meta : added_files) {
1619- auto replicated_epoch_number = meta->epoch_number ;
1620- auto inferred_epoch_number = cfd->NewEpochNumber ();
1621- if (replicated_epoch_number != inferred_epoch_number) {
1622- ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1623- " [%s] mismatched epoch for file: %" PRIu64
1624- " ; incoming: %" PRIu64 " , calculated: %" PRIu64,
1625- cfd->GetName ().c_str (), meta->fd .GetNumber (),
1626- replicated_epoch_number, inferred_epoch_number);
1627- info->mismatched_epoch_num += 1 ;
1628- meta->epoch_number = inferred_epoch_number;
1629- }
1630- }
1631- } else if (!deletedFiles.empty () && !newFiles.empty ()) {
1632- // case 2: compaction
1633- uint64_t min_input_epoch_number = std::numeric_limits<uint64_t >::max ();
1634- const auto & storage_info = cfd->current ()->storage_info ();
1635- for (auto [level, file_number] : deletedFiles) {
1636- auto meta = storage_info->GetFileMetaDataByNumber (file_number);
1637- if (!meta) {
1638- ROCKS_LOG_ERROR (immutable_db_options_.info_log ,
1639- " [%s] deleted file: %" PRIu64 " at level: %d not found" ,
1640- cfd->GetName ().c_str (), file_number, level);
1641- return Status::Corruption (" Deleted file not found" );
1642- }
1643- min_input_epoch_number =
1644- std::min (meta->epoch_number , min_input_epoch_number);
1645- }
1646-
1647- for (auto & p : newFiles) {
1648- auto replicated_epoch_number = p.second .epoch_number ;
1649- if (replicated_epoch_number != min_input_epoch_number) {
1650- ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1651- " [%s] mismatched epoch for file: %" PRIu64
1652- " ; incoming: %" PRIu64 " , calculated: %" PRIu64,
1653- cfd->GetName ().c_str (), p.second .fd .GetNumber (),
1654- replicated_epoch_number, min_input_epoch_number);
1655- info->mismatched_epoch_num += 1 ;
1656- p.second .epoch_number = min_input_epoch_number;
1657- }
1658- }
1659- }
1660- return Status::OK ();
1661- }
1662-
1663- Status DBImpl::CheckNextEpochNumberConsistency (VersionEdit& e, ColumnFamilyData* cfd) {
1521+ Status DBImpl::CheckNextEpochNumberConsistency (const VersionEdit& e, ColumnFamilyData* cfd) {
16641522 auto & newFiles = e.GetNewFiles ();
16651523 auto & deletedFiles = e.GetDeletedFiles ();
16661524
0 commit comments