@@ -103,7 +103,7 @@ void VersionedPageEntries<Trait>::createNewEntry(const PageVersion & ver, const
103103 assert (last_iter->second .isEntry ());
104104 // It is ok to replace the entry with same sequence and newer epoch, but not valid
105105 // to replace the entry with newer sequence.
106- if (unlikely (last_iter->second .being_ref_count . getLatestRefCount () != 1 && last_iter->first .sequence < ver.sequence ))
106+ if (unlikely (last_iter->second .being_ref_count != 1 && last_iter->first .sequence < ver.sequence ))
107107 {
108108 throw Exception (
109109 fmt::format (" Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]" ,
@@ -151,7 +151,7 @@ typename VersionedPageEntries<Trait>::PageId VersionedPageEntries<Trait>::create
151151 assert (last_iter->second .isEntry ());
152152 // It is ok to replace the entry with same sequence and newer epoch, but not valid
153153 // to replace the entry with newer sequence.
154- if (unlikely (last_iter->second .being_ref_count . getLatestRefCount () != 1 && last_iter->first .sequence < ver.sequence ))
154+ if (unlikely (last_iter->second .being_ref_count != 1 && last_iter->first .sequence < ver.sequence ))
155155 {
156156 throw Exception (
157157 fmt::format (" Try to replace normal entry with an newer seq [ver={}] [prev_ver={}] [last_entry={}]" ,
@@ -218,6 +218,7 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
218218 is_deleted = false ;
219219 create_ver = ver;
220220 delete_ver = PageVersion (0 );
221+ being_ref_count = 1 ;
221222 RUNTIME_CHECK (entries.empty ());
222223 entries.emplace (create_ver, EntryOrDelete::newNormalEntry (entry));
223224 // return the new created holder to caller to set the page_id
@@ -235,6 +236,7 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
235236 is_deleted = false ;
236237 create_ver = ver;
237238 delete_ver = PageVersion (0 );
239+ being_ref_count = 1 ;
238240 entries.emplace (create_ver, EntryOrDelete::newNormalEntry (entry));
239241 // return the new created holder to caller to set the page_id
240242 external_holder = std::make_shared<typename Trait::PageId>();
@@ -400,15 +402,15 @@ std::shared_ptr<typename VersionedPageEntries<Trait>::PageId> VersionedPageEntri
400402 type = EditRecordType::VAR_EXTERNAL;
401403 is_deleted = false ;
402404 create_ver = rec.version ;
403- being_ref_count. restoreFrom (rec. version , rec.being_ref_count ) ;
404- entries.emplace (rec.version , EntryOrDelete::newFromRestored (rec.entry , rec.version , 1 /* meaningless */ ));
405+ being_ref_count = rec.being_ref_count ;
406+ entries.emplace (rec.version , EntryOrDelete::newFromRestored (rec.entry , rec.being_ref_count ));
405407 external_holder = std::make_shared<typename Trait::PageId>(rec.page_id );
406408 return external_holder;
407409 }
408410 case EditRecordType::VAR_ENTRY:
409411 {
410412 type = EditRecordType::VAR_ENTRY;
411- entries.emplace (rec.version , EntryOrDelete::newFromRestored (rec.entry , rec.version , rec. being_ref_count ));
413+ entries.emplace (rec.version , EntryOrDelete::newFromRestored (rec.entry , rec.being_ref_count ));
412414 return nullptr ;
413415 }
414416 default :
@@ -612,12 +614,12 @@ bool VersionedPageEntries<Trait>::isVisible(UInt64 seq) const
612614}
613615
614616template <typename Trait>
615- Int64 VersionedPageEntries<Trait>::incrRefCount(const PageVersion & target_ver, const PageVersion & ref_ver )
617+ Int64 VersionedPageEntries<Trait>::incrRefCount(const PageVersion & ver )
616618{
617619 auto page_lock = acquireLock ();
618620 if (type == EditRecordType::VAR_ENTRY)
619621 {
620- if (auto iter = MapUtils::findMutLess (entries, PageVersion (target_ver .sequence + 1 ));
622+ if (auto iter = MapUtils::findMutLess (entries, PageVersion (ver .sequence + 1 ));
621623 iter != entries.end ())
622624 {
623625 // ignore all "delete"
@@ -630,27 +632,23 @@ Int64 VersionedPageEntries<Trait>::incrRefCount(const PageVersion & target_ver,
630632 // Then `iter` point to an entry or the `entries.begin()`, return if entry found
631633 if (iter->second .isEntry ())
632634 {
633- auto ref_count_value = iter->second .being_ref_count .getLatestRefCount ();
634- if (unlikely (met_delete && ref_count_value == 1 ))
635+ if (unlikely (met_delete && iter->second .being_ref_count == 1 ))
635636 {
636- throw Exception (fmt::format (" Try to add ref to a completely deleted entry [entry={}] [ver={}]" , iter->second , target_ver ), ErrorCodes::LOGICAL_ERROR);
637+ throw Exception (fmt::format (" Try to add ref to a completely deleted entry [entry={}] [ver={}]" , iter->second , ver ), ErrorCodes::LOGICAL_ERROR);
637638 }
638- iter->second .being_ref_count .incrRefCount (ref_ver, 1 );
639- return ref_count_value + 1 ;
639+ return ++iter->second .being_ref_count ;
640640 }
641641 } // fallthrough to FAIL
642642 }
643643 else if (type == EditRecordType::VAR_EXTERNAL)
644644 {
645- if (create_ver <= target_ver )
645+ if (create_ver <= ver )
646646 {
647647 // We may add reference to an external id even if it is logically deleted.
648- auto ref_count_value = being_ref_count.getLatestRefCount ();
649- being_ref_count.incrRefCount (ref_ver, 1 );
650- return ref_count_value + 1 ;
648+ return ++being_ref_count;
651649 }
652650 }
653- throw Exception (fmt::format (" The entry to be added ref count is not found [ver={}] [state={}]" , target_ver , toDebugString ()), ErrorCodes::LOGICAL_ERROR);
651+ throw Exception (fmt::format (" The entry to be added ref count is not found [ver={}] [state={}]" , ver , toDebugString ()), ErrorCodes::LOGICAL_ERROR);
654652}
655653
656654template <typename Trait>
@@ -711,7 +709,7 @@ bool VersionedPageEntries<Trait>::cleanOutdatedEntries(
711709{
712710 if (type == EditRecordType::VAR_EXTERNAL)
713711 {
714- return (being_ref_count. getLatestRefCount () == 1 && is_deleted && delete_ver.sequence <= lowest_seq);
712+ return (being_ref_count == 1 && is_deleted && delete_ver.sequence <= lowest_seq);
715713 }
716714 else if (type == EditRecordType::VAR_REF)
717715 {
@@ -789,7 +787,7 @@ bool VersionedPageEntries<Trait>::cleanOutdatedEntries(
789787 {
790788 if (last_entry_is_delete)
791789 {
792- if (iter->second .being_ref_count . getLatestRefCount () == 1 )
790+ if (iter->second .being_ref_count == 1 )
793791 {
794792 if (entries_removed)
795793 {
@@ -831,8 +829,12 @@ bool VersionedPageEntries<Trait>::derefAndClean(
831829 auto page_lock = acquireLock ();
832830 if (type == EditRecordType::VAR_EXTERNAL)
833831 {
834- being_ref_count.decrRefCountInSnap (lowest_seq, deref_count);
835- return (is_deleted && delete_ver.sequence <= lowest_seq && being_ref_count.getLatestRefCount () == 1 );
832+ if (being_ref_count <= deref_count)
833+ {
834+ throw Exception (fmt::format (" Decreasing ref count error [page_id={}] [ver={}] [deref_count={}]" , page_id, deref_ver, deref_count));
835+ }
836+ being_ref_count -= deref_count;
837+ return (is_deleted && delete_ver.sequence <= lowest_seq && being_ref_count == 1 );
836838 }
837839 else if (type == EditRecordType::VAR_ENTRY)
838840 {
@@ -855,7 +857,11 @@ bool VersionedPageEntries<Trait>::derefAndClean(
855857 throw Exception (fmt::format (" Can not find entry for decreasing ref count till the begin [page_id={}] [ver={}] [deref_count={}]" , page_id, deref_ver, deref_count));
856858 }
857859 assert (iter->second .isEntry ());
858- iter->second .being_ref_count .decrRefCountInSnap (lowest_seq, deref_count);
860+ if (iter->second .being_ref_count <= deref_count)
861+ {
862+ throw Exception (fmt::format (" Decreasing ref count error [page_id={}] [ver={}] [deref_count={}] [entry={}]" , page_id, deref_ver, deref_count, iter->second ));
863+ }
864+ iter->second .being_ref_count -= deref_count;
859865
860866 if (lowest_seq == 0 )
861867 return false ;
@@ -892,7 +898,7 @@ void VersionedPageEntries<Trait>::collapseTo(const UInt64 seq, const PageId & pa
892898 return ;
893899 auto iter = entries.find (create_ver);
894900 RUNTIME_CHECK (iter != entries.end ());
895- edit.varExternal (page_id, create_ver, iter->second .entry , being_ref_count. getRefCountInSnap (seq) );
901+ edit.varExternal (page_id, create_ver, iter->second .entry , being_ref_count);
896902 if (is_deleted && delete_ver.sequence <= seq)
897903 {
898904 edit.varDel (page_id, delete_ver);
@@ -910,7 +916,7 @@ void VersionedPageEntries<Trait>::collapseTo(const UInt64 seq, const PageId & pa
910916 if (last_iter->second .isEntry ())
911917 {
912918 const auto & entry = last_iter->second ;
913- edit.varEntry (page_id, /* ver*/ last_iter->first , entry.entry , entry.being_ref_count . getRefCountInSnap (seq) );
919+ edit.varEntry (page_id, /* ver*/ last_iter->first , entry.entry , entry.being_ref_count );
914920 return ;
915921 }
916922 else if (last_iter->second .isDelete ())
@@ -924,12 +930,11 @@ void VersionedPageEntries<Trait>::collapseTo(const UInt64 seq, const PageId & pa
924930 auto prev_iter = --last_iter; // Note that `last_iter` should not be used anymore
925931 if (prev_iter->second .isEntry ())
926932 {
927- auto ref_count_value = prev_iter->second .being_ref_count .getRefCountInSnap (seq);
928- if (ref_count_value == 1 )
933+ if (prev_iter->second .being_ref_count == 1 )
929934 return ;
930935 // It is being ref by another id, should persist the item and delete
931936 const auto & entry = prev_iter->second ;
932- edit.varEntry (page_id, prev_iter->first , entry.entry , ref_count_value );
937+ edit.varEntry (page_id, prev_iter->first , entry.entry , entry. being_ref_count );
933938 edit.varDel (page_id, last_version);
934939 }
935940 }
@@ -1437,7 +1442,7 @@ void PageDirectory<Trait>::applyRefEditRecord(
14371442 // Add the ref-count of being-ref entry
14381443 if (auto resolved_iter = mvcc_table_directory.find (resolved_id); resolved_iter != mvcc_table_directory.end ())
14391444 {
1440- resolved_iter->second ->incrRefCount (resolved_ver, version );
1445+ resolved_iter->second ->incrRefCount (resolved_ver);
14411446 }
14421447 else
14431448 {
@@ -1559,7 +1564,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
15591564 watch.restart ();
15601565 SCOPE_EXIT ({ GET_METRIC (tiflash_storage_page_write_duration_seconds, type_commit).Observe (watch.elapsedSeconds ()); });
15611566
1562- SYNC_FOR (" before_PageDirectory::apply_to_memory" );
15631567 std::unordered_set<String> applied_data_files;
15641568 {
15651569 std::unique_lock table_lock (table_rw_mutex);
@@ -1831,34 +1835,59 @@ PageDirectory<Trait>::getEntriesByBlobIdsForDifferentPageTypes(const typename Pa
18311835}
18321836
18331837template <typename Trait>
1834- bool PageDirectory<Trait>::tryDumpSnapshot(const WriteLimiterPtr & write_limiter, bool force)
1838+ bool PageDirectory<Trait>::tryDumpSnapshot(const ReadLimiterPtr & read_limiter, const WriteLimiterPtr & write_limiter, bool force)
18351839{
1836- auto identifier = fmt::format (" {}.dump" , wal->name ());
1837- auto snap = createSnapshot (identifier);
1838-
18391840 // Only apply compact logs when files snapshot is valid
1840- auto files_snap = wal->tryGetFilesSnapshot (
1841- max_persisted_log_files,
1842- snap->sequence ,
1843- details::getMaxSequenceForRecord<Trait>,
1844- force);
1841+ auto files_snap = wal->tryGetFilesSnapshot (max_persisted_log_files, force);
18451842 if (!files_snap.isValid ())
18461843 return false ;
18471844
1845+ // To prevent writes from affecting dumping snapshot (and vice versa), old log files
1846+ // are read from disk and a temporary PageDirectory is generated for dumping snapshot.
1847+ // The main reason write affect dumping snapshot is that we can not get a read-only
1848+ // `being_ref_count` by the function `createSnapshot()`.
18481849 assert (!files_snap.persisted_log_files .empty ()); // should not be empty
1850+ auto log_num = files_snap.persisted_log_files .rbegin ()->log_num ;
1851+ auto identifier = fmt::format (" {}.dump_{}" , wal->name (), log_num);
18491852
18501853 Stopwatch watch;
1851- auto edit = dumpSnapshotToEdit (snap);
1852- files_snap.num_records = edit.size ();
1853- files_snap.dump_elapsed_ms = watch.elapsedMilliseconds ();
1854+ auto snapshot_reader = wal->createReaderForFiles (identifier, files_snap.persisted_log_files , read_limiter);
1855+ // we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that
1856+ // persist new logs into disk. So we pass `nullptr` as `wal` to the factory.
1857+ auto collapsed_dir = [&]() {
1858+ // we just use the `collapsed_dir` to dump edit of the snapshot, should never call functions like `apply` that
1859+ // persist new logs into disk. So we pass `nullptr` as `wal` to the factory.
1860+ static_assert (std::is_same_v<Trait, u128 ::PageDirectoryTrait> || std::is_same_v<Trait, universal::PageDirectoryTrait>,
1861+ " unknown impl" );
1862+ if constexpr (std::is_same_v<Trait, u128 ::PageDirectoryTrait>)
1863+ {
1864+ u128 ::PageDirectoryFactory factory;
1865+ return factory.createFromReader (
1866+ identifier,
1867+ std::move (snapshot_reader),
1868+ /* wal */ nullptr );
1869+ }
1870+ else if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
1871+ {
1872+ universal::PageDirectoryFactory factory;
1873+ return factory.createFromReader (
1874+ identifier,
1875+ std::move (snapshot_reader),
1876+ /* wal */ nullptr );
1877+ }
1878+ }();
1879+ // The records persisted in `files_snap` is older than or equal to all records in `edit`
1880+ auto edit_from_disk = collapsed_dir->dumpSnapshotToEdit ();
1881+ files_snap.num_records = edit_from_disk.size ();
1882+ files_snap.read_elapsed_ms = watch.elapsedMilliseconds ();
18541883 if constexpr (std::is_same_v<Trait, u128 ::PageDirectoryTrait>)
18551884 {
1856- bool done_any_io = wal->saveSnapshot (std::move (files_snap), Trait::Serializer::serializeTo (edit ), write_limiter);
1885+ bool done_any_io = wal->saveSnapshot (std::move (files_snap), Trait::Serializer::serializeTo (edit_from_disk ), write_limiter);
18571886 return done_any_io;
18581887 }
18591888 else if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
18601889 {
1861- bool done_any_io = wal->saveSnapshot (std::move (files_snap), Trait::Serializer::serializeInCompressedFormTo (edit ), write_limiter);
1890+ bool done_any_io = wal->saveSnapshot (std::move (files_snap), Trait::Serializer::serializeInCompressedFormTo (edit_from_disk ), write_limiter);
18621891 return done_any_io;
18631892 }
18641893}
@@ -1939,8 +1968,6 @@ typename PageDirectory<Trait>::PageEntries PageDirectory<Trait>::gcInMemEntries(
19391968 }
19401969 }
19411970
1942- SYNC_FOR (" after_PageDirectory::doGC_getLowestSeq" );
1943-
19441971 PageEntriesV3 all_del_entries;
19451972 typename MVCCMapType::iterator iter;
19461973 {
0 commit comments