@@ -1427,129 +1427,50 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
14271427 edit_lists.push_back (std::move (el));
14281428 ROCKS_LOG_INFO (immutable_db_options_.info_log , " %s" ,
14291429 DescribeVersionEdit (e, cfd).c_str ());
1430- auto & newFiles = e.GetNewFiles ();
1431- bool epoch_recovery_succeeded = true ;
1432- std::ostringstream err_oss;
1433- if (!(flags & AR_REPLICATE_EPOCH_NUM)) {
1434- // Epoch number calculation on the fly.
1435- // There are two cases in which we need to calculate epoch number
1436- // when applying `kManifestWrite`
1437- // 1. flush which generates L0 files. epoch number is allocated
1438- // based on `next_epoch_number` of each CF. The L0 files are sorted
1439- // based on `largest seqno`.
1440- // 2. compaction which merges files in lower levels to higher
1441- // levels. epoch number = min epoch number of input files.
1442- const auto & deletedFiles = e.GetDeletedFiles ();
1443- if (deletedFiles.empty () && !newFiles.empty ()) {
1444- // case 1: flush into L0 files. New files must be level 0
1445-
1446- for (auto & p : newFiles) {
1447- if (p.first != 0 ) {
1448- epoch_recovery_succeeded = false ;
1449- err_oss << " newly flushed file: " << p.first << " is not at L0" ;
1450- break ;
1451- }
1452- }
1430+ if (!s.ok ()) {
1431+ break ;
1432+ }
1433+ if (flags & AR_REPLICATE_EPOCH_NUM) {
1434+ // replicate epoch number on follower
14531435
1454- // sort added files by largest seqno
1455- std::vector<FileMetaData*> added_files;
1456- for (auto & p: newFiles) {
1457- added_files.push_back (&p.second );
1458- }
1436+ s = CheckNextEpochNumberConsistency (e, cfd);
1437+ if (!s.ok ()) {
1438+ break ;
1439+ }
14591440
1460- NewestFirstBySeqNo cmp;
1461- std::sort (added_files.begin (), added_files.end (), cmp);
1462- auto first_file = added_files[0 ];
1463- // Rewind/advance next_epoch_number. This is necessary if epoch_number
1464- // mismtaches due to db reopen.
1465- if (first_file->epoch_number != kUnknownEpochNumber &&
1466- first_file->epoch_number != cfd->GetNextEpochNumber () &&
1467- (flags & AR_RESET_IF_EPOCH_MISMATCH)) {
1468- auto max_epoch_number =
1469- cfd->current ()->storage_info ()->GetMaxEpochNumberOfFiles ();
1470- if (first_file->epoch_number < cfd->GetNextEpochNumber () &&
1471- (first_file->epoch_number == max_epoch_number + 1 )) {
1472- ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1473- " [%s] rewind next_epoch_number from: %" PRIu64
1474- " to %" PRIu64,
1475- cfd->GetName ().c_str (),
1476- cfd->GetNextEpochNumber (),
1477- max_epoch_number + 1 );
1478- cfd->SetNextEpochNumber (max_epoch_number + 1 );
1479- } else if (first_file->epoch_number >
1480- cfd->GetNextEpochNumber () &&
1481- (cfd->GetNextEpochNumber () ==
1482- max_epoch_number + 1 )) {
1483- ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1484- " [%s] advance next_epoch_number from: %" PRIu64
1485- " to %" PRIu64,
1486- cfd->GetName ().c_str (),
1487- cfd->GetNextEpochNumber (),
1488- first_file->epoch_number );
1489- cfd->SetNextEpochNumber (first_file->epoch_number );
1490- } else {
1491- ROCKS_LOG_ERROR (immutable_db_options_.info_log ,
1492- " [%s] unexpected epoch number: %" PRIu64
1493- " for file: %" PRIu64
1494- " ; max epoch number: %" PRIu64,
1495- cfd->GetName ().c_str (),
1496- first_file->epoch_number ,
1497- first_file->fd .GetNumber (),
1498- max_epoch_number);
1499- s = Status::Corruption (" unexpected epoch number for added file" );
1500- break ;
1501- }
1502- }
1441+ auto & newFiles = e.GetNewFiles ();
1442+ auto & deletedFiles = e.GetDeletedFiles ();
15031443
1504- for (auto meta: added_files) {
1505- auto old_epoch_number = meta->epoch_number ;
1506- meta->epoch_number = cfd->NewEpochNumber ();
1507- if (old_epoch_number != meta->epoch_number ) {
1508- info->mismatched_epoch_num += 1 ;
1509- }
1510- }
1511- } else if (!deletedFiles.empty () && !newFiles.empty ()) {
1512- // case 2: compaction
1513- uint64_t min_input_epoch_number =
1514- std::numeric_limits<uint64_t >::max ();
1515- const auto & storage_info = cfd->current ()->storage_info ();
1516- for (auto [level, file_number] : deletedFiles) {
1517- auto meta = storage_info->GetFileMetaDataByNumber (file_number);
1518- if (!meta) {
1519- err_oss << " deleted file: " << file_number
1520- << " at level: " << level << " not found" ;
1521- break ;
1522- }
1523- min_input_epoch_number =
1524- std::min (meta->epoch_number , min_input_epoch_number);
1444+ if (flags & AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION) {
1445+ if (deletedFiles.empty () && !newFiles.empty ()) {
1446+ // Set next epoch number properly before epoch number consistency check.
1447+ // This is necessary if next_epoch_number changes during db reopen.
1448+ cfd->SetNextEpochNumber (newFiles.begin ()->second .epoch_number );
15251449 }
15261450
1527- for ( auto & p: newFiles) {
1528- auto old_epoch_number = p. second . epoch_number ;
1529- p. second . epoch_number = min_input_epoch_number;
1530- if (old_epoch_number != p. second . epoch_number ) {
1531- info->mismatched_epoch_num += 1 ;
1532- }
1451+ // do consistency check by comparing the replicated epoch number against
1452+ // inferred epoch number
1453+ s = InferEpochNumber (&e, cfd, info,
1454+ false /* reset_next_epoch_number */ );
1455+ if (s. ok () && info->mismatched_epoch_num > 0 ) {
1456+ s = Status::Corruption ( " epoch number consistency check fails " );
15331457 }
1534- }
1535- } else if (newFiles.size () > 0 ) {
1536- // Maintain next epoch number on follower
1537- auto next_epoch_number = cfd->GetNextEpochNumber ();
1538- for (auto & p : newFiles) {
1539- auto epoch_number = p.second .epoch_number ;
1540- // advance next epoch number. next_epoch_number never goes
1541- // backwards
1542- if (epoch_number != kUnknownEpochNumber &&
1543- (epoch_number >= next_epoch_number)) {
1544- next_epoch_number = epoch_number + 1 ;
1458+ if (!s.ok ()) {
1459+ break ;
15451460 }
15461461 }
1547- cfd->SetNextEpochNumber (next_epoch_number);
1548- }
15491462
1550- if (!epoch_recovery_succeeded) {
1551- s = Status::Corruption (err_oss.str ());
1552- break ;
1463+ // Maintain next epoch number on follower
1464+ if (deletedFiles.empty () && !newFiles.empty ()) {
1465+ cfd->SetNextEpochNumber (newFiles.rbegin ()->second .epoch_number + 1 );
1466+ }
1467+ } else {
1468+ // infer epoch number on follower
1469+ s = InferEpochNumber (&e, cfd, info,
1470+ flags & AR_RESET_IF_EPOCH_MISMATCH);
1471+ if (!s.ok ()) {
1472+ break ;
1473+ }
15531474 }
15541475 }
15551476 if (!s.ok ()) {
@@ -1626,6 +1547,171 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
16261547 return s;
16271548}
16281549
1550+ Status DBImpl::InferEpochNumber (VersionEdit* e, ColumnFamilyData* cfd,
1551+ ApplyReplicationLogRecordInfo* info,
1552+ bool reset_next_epoch_number) {
1553+ auto & newFiles = e->GetNewFiles ();
1554+ // Epoch number calculation on the fly.
1555+ // There are two cases in which we need to calculate epoch number
1556+ // when applying `kManifestWrite`
1557+ // 1. flush which generates L0 files. epoch number is allocated
1558+ // based on `next_epoch_number` of each CF. The L0 files are sorted
1559+ // based on `largest seqno`.
1560+ // 2. compaction which merges files in lower levels to higher
1561+ // levels. epoch number = min epoch number of input files.
1562+ const auto & deletedFiles = e->GetDeletedFiles ();
1563+ if (deletedFiles.empty () && !newFiles.empty ()) {
1564+ // case 1: flush into L0 files. New files must be level 0
1565+
1566+ for (auto & p : newFiles) {
1567+ if (p.first != 0 ) {
1568+ ROCKS_LOG_ERROR (
1569+ immutable_db_options_.info_log ,
1570+ " [%s] newly flushed file: %" PRIu64 " < is not at L0 but Level: %d" ,
1571+ cfd->GetName ().c_str (), p.second .fd .GetNumber (), p.first );
1572+ return Status::Corruption (" Newly flushed file is not at L0" );
1573+ }
1574+ }
1575+
1576+ // sort added files by largest seqno
1577+ std::vector<FileMetaData*> added_files;
1578+ for (auto & p : newFiles) {
1579+ added_files.push_back (&p.second );
1580+ }
1581+
1582+ NewestFirstBySeqNo cmp;
1583+ std::sort (added_files.begin (), added_files.end (), cmp);
1584+ auto first_file = added_files[0 ];
1585+ // Rewind/advance next_epoch_number. This is necessary if next_epoch_number
1586+ // mismtaches due to db reopen.
1587+ if (first_file->epoch_number != kUnknownEpochNumber &&
1588+ first_file->epoch_number != cfd->GetNextEpochNumber () &&
1589+ reset_next_epoch_number) {
1590+ auto max_epoch_number =
1591+ cfd->current ()->storage_info ()->GetMaxEpochNumberOfFiles ();
1592+ if (first_file->epoch_number < cfd->GetNextEpochNumber () &&
1593+ (first_file->epoch_number == max_epoch_number + 1 )) {
1594+ ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1595+ " [%s] rewind next_epoch_number from: %" PRIu64
1596+ " to %" PRIu64,
1597+ cfd->GetName ().c_str (), cfd->GetNextEpochNumber (),
1598+ max_epoch_number + 1 );
1599+ cfd->SetNextEpochNumber (max_epoch_number + 1 );
1600+ } else if (first_file->epoch_number > cfd->GetNextEpochNumber () &&
1601+ (cfd->GetNextEpochNumber () == max_epoch_number + 1 )) {
1602+ ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1603+ " [%s] advance next_epoch_number from: %" PRIu64
1604+ " to %" PRIu64,
1605+ cfd->GetName ().c_str (), cfd->GetNextEpochNumber (),
1606+ first_file->epoch_number );
1607+ cfd->SetNextEpochNumber (first_file->epoch_number );
1608+ } else {
1609+ // Not safe to rewind/advance `next_epoch_number`. This can happen
1610+ // when we do epoch recovery during db open (i.e., nodes run
1611+ // with different rocksdb versions and nodes upgrading from old version
1612+ // to new version need to recover epoch). Poison is the best we can do
1613+ return Status::Poison (" Poison due to diverged next epoch number" );
1614+ }
1615+ }
1616+
1617+ for (auto meta : added_files) {
1618+ auto replicated_epoch_number = meta->epoch_number ;
1619+ auto inferred_epoch_number = cfd->NewEpochNumber ();
1620+ if (replicated_epoch_number != inferred_epoch_number) {
1621+ ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1622+ " [%s] mismatched epoch for file: %" PRIu64
1623+ " ; incoming: %" PRIu64 " , calculated: %" PRIu64,
1624+ cfd->GetName ().c_str (), meta->fd .GetNumber (),
1625+ replicated_epoch_number, inferred_epoch_number);
1626+ info->mismatched_epoch_num += 1 ;
1627+ meta->epoch_number = inferred_epoch_number;
1628+ }
1629+ }
1630+ } else if (!deletedFiles.empty () && !newFiles.empty ()) {
1631+ // case 2: compaction
1632+ uint64_t min_input_epoch_number = std::numeric_limits<uint64_t >::max ();
1633+ const auto & storage_info = cfd->current ()->storage_info ();
1634+ for (auto [level, file_number] : deletedFiles) {
1635+ auto meta = storage_info->GetFileMetaDataByNumber (file_number);
1636+ if (!meta) {
1637+ ROCKS_LOG_ERROR (immutable_db_options_.info_log ,
1638+ " [%s] deleted file: %" PRIu64 " at level: %d not found" ,
1639+ cfd->GetName ().c_str (), file_number, level);
1640+ return Status::Corruption (" Deleted file not found" );
1641+ }
1642+ min_input_epoch_number =
1643+ std::min (meta->epoch_number , min_input_epoch_number);
1644+ }
1645+
1646+ for (auto & p : newFiles) {
1647+ auto replicated_epoch_number = p.second .epoch_number ;
1648+ if (replicated_epoch_number != min_input_epoch_number) {
1649+ ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1650+ " [%s] mismatched epoch for file: %" PRIu64
1651+ " ; incoming: %" PRIu64 " , calculated: %" PRIu64,
1652+ cfd->GetName ().c_str (), p.second .fd .GetNumber (),
1653+ replicated_epoch_number, min_input_epoch_number);
1654+ info->mismatched_epoch_num += 1 ;
1655+ p.second .epoch_number = min_input_epoch_number;
1656+ }
1657+ }
1658+ }
1659+ return Status::OK ();
1660+ }
1661+
1662+ Status DBImpl::CheckNextEpochNumberConsistency (VersionEdit& e, ColumnFamilyData* cfd) {
1663+ auto & newFiles = e.GetNewFiles ();
1664+ auto & deletedFiles = e.GetDeletedFiles ();
1665+
1666+ if (deletedFiles.empty () && !newFiles.empty ()) {
1667+ // Case 1: new files generated after flushing.
1668+ // New files should be sorted by epoch number
1669+ for (size_t i = 0 ; i + 1 < newFiles.size (); i++) {
1670+ if (newFiles[i].second .epoch_number >= newFiles[i+1 ].second .epoch_number ) {
1671+ ROCKS_LOG_INFO (
1672+ immutable_db_options_.info_log ,
1673+ " [%s] unexpected epoch number ordering for file: %" PRIu64
1674+ " : %" PRIu64 " and file: %" PRIu64 " : %" PRIu64,
1675+ cfd->GetName ().c_str (), newFiles[i].second .fd .GetNumber (),
1676+ newFiles[i].second .epoch_number ,
1677+ newFiles[i + 1 ].second .fd .GetNumber (),
1678+ newFiles[i + 1 ].second .epoch_number );
1679+ return Status::Corruption (" New L0 files not sorted by epoch number" );
1680+ }
1681+ }
1682+
1683+ if (newFiles.begin ()->second .epoch_number < cfd->GetNextEpochNumber ()) {
1684+ // If we need to rewind next epoch number during epoch replication, let's
1685+ // make sure it doesn't break epoch number consistency
1686+ auto max_epoch_number = cfd->current ()->storage_info ()->GetMaxEpochNumberOfFiles ();
1687+ if (newFiles.begin ()->second .epoch_number <= max_epoch_number) {
1688+ ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1689+ " [%s] Out of order epoch number for file: %" PRIu64
1690+ " :%" PRIu64 " ; max epoch number: %" PRIu64,
1691+ cfd->GetName ().c_str (),
1692+ newFiles.begin ()->second .fd .GetNumber (),
1693+ newFiles.begin ()->second .epoch_number , max_epoch_number);
1694+ return Status::Corruption (" Out of order epoch number for flush" );
1695+ }
1696+ } // Otherwise, advance next epoch number
1697+ } else if (!newFiles.empty ()) {
1698+ // Case 2: compaction.
1699+ // New files should all have the same epoch number and it's smaller than next_epoch_number
1700+ auto next_epoch_number = cfd->GetNextEpochNumber ();
1701+ for (auto & f: newFiles) {
1702+ if (f.second .epoch_number >= next_epoch_number) {
1703+ ROCKS_LOG_INFO (immutable_db_options_.info_log ,
1704+ " [%s] Out of order epoch number for file: %" PRIu64
1705+ " :%" PRIu64 " ; next epoch number: %" PRIu64,
1706+ cfd->GetName ().c_str (), f.second .fd .GetNumber (),
1707+ f.second .epoch_number , next_epoch_number);
1708+ return Status::Corruption (" Out of order epoch number for compaction" );
1709+ }
1710+ }
1711+ }
1712+ return Status::OK ();
1713+ }
1714+
16291715Status DBImpl::GetReplicationRecordDebugString (
16301716 const ReplicationLogRecord& record, std::string* out) const {
16311717 std::ostringstream oss;
0 commit comments