1212using EventStore . Core . Index ;
1313using EventStore . Core . LogAbstraction ;
1414using EventStore . Core . Services . Storage . ReaderIndex ;
15+ using EventStore . Core . Services . UserManagement ;
1516using EventStore . Core . TransactionLog . Chunks . TFChunk ;
1617using EventStore . Core . TransactionLog . LogRecords ;
1718using EventStore . Core . TransactionLog . Scavenging ;
@@ -33,12 +34,15 @@ public class TFChunkScavenger<TStreamId> : TFChunkScavenger {
3334 private readonly ITableIndex < TStreamId > _tableIndex ;
3435 private readonly IReadIndex < TStreamId > _readIndex ;
3536 private readonly IMetastreamLookup < TStreamId > _metastreams ;
37+ private readonly ITransactionFileTracker _tfTracker ;
3638 private readonly long _maxChunkDataSize ;
3739 private readonly bool _unsafeIgnoreHardDeletes ;
3840 private readonly int _threads ;
3941
4042 public TFChunkScavenger ( ILogger logger , TFChunkDb db , ITFChunkScavengerLog scavengerLog , ITableIndex < TStreamId > tableIndex ,
41- IReadIndex < TStreamId > readIndex , IMetastreamLookup < TStreamId > metastreams , long ? maxChunkDataSize = null ,
43+ IReadIndex < TStreamId > readIndex , IMetastreamLookup < TStreamId > metastreams ,
44+ ITransactionFileTrackerFactory tfTrackers ,
45+ long ? maxChunkDataSize = null ,
4246 bool unsafeIgnoreHardDeletes = false , int threads = 1 ) {
4347 Ensure . NotNull ( logger , nameof ( logger ) ) ;
4448 Ensure . NotNull ( db , "db" ) ;
@@ -62,6 +66,7 @@ public TFChunkScavenger(ILogger logger, TFChunkDb db, ITFChunkScavengerLog scave
6266 _tableIndex = tableIndex ;
6367 _readIndex = readIndex ;
6468 _metastreams = metastreams ;
69+ _tfTracker = tfTrackers . GetOrAdd ( SystemAccounts . SystemScavengeName ) ;
6570 _maxChunkDataSize = maxChunkDataSize ?? db . Config . ChunkSize ;
6671 _unsafeIgnoreHardDeletes = unsafeIgnoreHardDeletes ;
6772 _threads = threads ;
@@ -158,6 +163,7 @@ private void ScavengeInternal(bool alwaysKeepScavenged, bool mergeChunks, int st
158163 maxChunkDataSize : _maxChunkDataSize ,
159164 scavengerLog : _scavengerLog ,
160165 throttle : new Throttle ( _logger , TimeSpan . Zero , TimeSpan . Zero , 100 ) ,
166+ tracker : _tfTracker ,
161167 ct : ct ) ;
162168 }
163169
@@ -204,7 +210,7 @@ private void ScavengeChunk(bool alwaysKeepScavenged, TFChunk.TFChunk oldChunk,
204210 }
205211
206212 try {
207- TraverseChunkBasic ( oldChunk , ct ,
213+ TraverseChunkBasic ( oldChunk , ct , _tfTracker ,
208214 result => {
209215 threadLocalCache . Records . Add ( result ) ;
210216
@@ -337,6 +343,7 @@ public static void MergePhase(
337343 long maxChunkDataSize ,
338344 ITFChunkScavengerLog scavengerLog ,
339345 Throttle throttle ,
346+ ITransactionFileTracker tracker ,
340347 CancellationToken ct ) {
341348
342349 bool mergedSomething ;
@@ -362,6 +369,7 @@ public static void MergePhase(
362369 db : db ,
363370 scavengerLog : scavengerLog ,
364371 oldChunks : chunksToMerge ,
372+ tracker : tracker ,
365373 ct : ct ) ) {
366374
367375 mergedSomething = true ;
@@ -382,6 +390,7 @@ public static void MergePhase(
382390 db : db ,
383391 scavengerLog : scavengerLog ,
384392 oldChunks : chunksToMerge ,
393+ tracker : tracker ,
385394 ct : ct ) ) {
386395
387396 mergedSomething = true ;
@@ -398,6 +407,7 @@ private static bool MergeChunks(
398407 TFChunkDb db ,
399408 ITFChunkScavengerLog scavengerLog ,
400409 IList < TFChunk . TFChunk > oldChunks ,
410+ ITransactionFileTracker tracker ,
401411 CancellationToken ct ) {
402412
403413 if ( oldChunks . IsEmpty ( ) ) throw new ArgumentException ( "Provided list of chunks to merge is empty." ) ;
@@ -444,7 +454,7 @@ private static bool MergeChunks(
444454 var positionMapping = new List < PosMap > ( ) ;
445455 foreach ( var oldChunk in oldChunks ) {
446456 var lastFlushedPage = - 1 ;
447- TraverseChunkBasic ( oldChunk , ct ,
457+ TraverseChunkBasic ( oldChunk , ct , tracker ,
448458 result => {
449459
450460 positionMapping . Add ( WriteRecord ( newChunk , result . LogRecord ) ) ;
@@ -613,7 +623,7 @@ private bool ShouldKeepPrepare(
613623 return true ;
614624 }
615625
616- var lastEventNumber = _readIndex . GetStreamLastEventNumber ( prepare . EventStreamId , ITransactionFileTracker . NoOp ) ;
626+ var lastEventNumber = _readIndex . GetStreamLastEventNumber ( prepare . EventStreamId , _tfTracker ) ;
617627 if ( lastEventNumber == EventNumber . DeletedStream ) {
618628 // The stream is hard deleted but this is not the tombstone.
619629 // When all prepares and commit of transaction belong to single chunk and the stream is deleted,
@@ -671,7 +681,7 @@ private bool ShouldKeepPrepare(
671681 return true ;
672682 }
673683
674- var meta = _readIndex . GetStreamMetadata ( prepare . EventStreamId , ITransactionFileTracker . NoOp ) ;
684+ var meta = _readIndex . GetStreamMetadata ( prepare . EventStreamId , _tfTracker ) ;
675685 bool canRemove = ( meta . MaxCount . HasValue && eventNumber < lastEventNumber - meta . MaxCount . Value + 1 )
676686 || ( meta . TruncateBefore . HasValue && eventNumber < meta . TruncateBefore . Value )
677687 || ( meta . MaxAge . HasValue && prepare . TimeStamp < DateTime . UtcNow - meta . MaxAge . Value ) ;
@@ -686,7 +696,7 @@ private bool ShouldKeepPrepare(
686696 }
687697
688698 private bool DiscardBecauseDuplicate ( IPrepareLogRecord < TStreamId > prepare , long eventNumber ) {
689- var result = _readIndex . ReadEvent ( IndexReader . UnspecifiedStreamName , prepare . EventStreamId , eventNumber , ITransactionFileTracker . NoOp ) ;
699+ var result = _readIndex . ReadEvent ( IndexReader . UnspecifiedStreamName , prepare . EventStreamId , eventNumber , _tfTracker ) ;
690700 if ( result . Result == ReadEventResult . Success && result . Record . LogPosition != prepare . LogPosition ) {
691701 // prepare isn't the record we get for an index read at its own stream/version.
692702 // therefore it is a duplicate that cannot be read from the index, discard it.
@@ -701,13 +711,13 @@ private bool IsSoftDeletedTempStreamWithinSameChunk(TStreamId eventStreamId, lon
701711 TStreamId msh ;
702712 if ( _metastreams . IsMetaStream ( eventStreamId ) ) {
703713 var originalStreamId = _metastreams . OriginalStreamOf ( eventStreamId ) ;
704- var meta = _readIndex . GetStreamMetadata ( originalStreamId , ITransactionFileTracker . NoOp ) ;
714+ var meta = _readIndex . GetStreamMetadata ( originalStreamId , _tfTracker ) ;
705715 if ( meta . TruncateBefore != EventNumber . DeletedStream || meta . TempStream != true )
706716 return false ;
707717 sh = originalStreamId ;
708718 msh = eventStreamId ;
709719 } else {
710- var meta = _readIndex . GetStreamMetadata ( eventStreamId , ITransactionFileTracker . NoOp ) ;
720+ var meta = _readIndex . GetStreamMetadata ( eventStreamId , _tfTracker ) ;
711721 if ( meta . TruncateBefore != EventNumber . DeletedStream || meta . TempStream != true )
712722 return false ;
713723 sh = eventStreamId ;
@@ -727,14 +737,15 @@ private bool IsSoftDeletedTempStreamWithinSameChunk(TStreamId eventStreamId, lon
727737 }
728738
729739 private static void TraverseChunkBasic ( TFChunk . TFChunk chunk , CancellationToken ct ,
740+ ITransactionFileTracker tracker ,
730741 Action < CandidateRecord > process ) {
731- var result = chunk . TryReadFirst ( ITransactionFileTracker . NoOp ) ; //qq
742+ var result = chunk . TryReadFirst ( tracker ) ;
732743 while ( result . Success ) {
733744 process ( new CandidateRecord ( result . LogRecord , result . RecordLength ) ) ;
734745
735746 ct . ThrowIfCancellationRequested ( ) ;
736747
737- result = chunk . TryReadClosestForward ( result . NextPosition , ITransactionFileTracker . NoOp ) ; //qq
748+ result = chunk . TryReadClosestForward ( result . NextPosition , tracker ) ;
738749 }
739750 }
740751
0 commit comments