Skip to content

Commit 54ee607

Browse files
more plumbing (new scavenge done)
1 parent 8636ac7 commit 54ee607

File tree

9 files changed

+63
-35
lines changed

9 files changed

+63
-35
lines changed

src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,15 @@ private async Task<DbResult> RunInternalAsync(
247247
}
248248

249249
var hasher = new CompositeHasher<TStreamId>(lowHasher, highHasher);
250+
var tracker = ITransactionFileTracker.NoOp;
250251

251252
var tableIndex = new TableIndex<TStreamId>(
252253
directory: indexPath,
253254
lowHasher: lowHasher,
254255
highHasher: highHasher,
255256
emptyStreamId: logFormat.EmptyStreamId,
256257
memTableFactory: () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 200),
257-
tfReaderFactory: _ => new TFReaderLease(readerPool, ITransactionFileTracker.NoOp),
258+
tfReaderFactory: _ => new TFReaderLease(readerPool, tracker),
258259
ptableVersion: PTableVersions.IndexV4,
259260
maxAutoMergeIndexLevel: int.MaxValue,
260261
pTableMaxReaderCount: ESConsts.PTableInitialReaderCount,
@@ -317,9 +318,10 @@ private async Task<DbResult> RunInternalAsync(
317318
metastreamLookup,
318319
logFormat.StreamIdConverter,
319320
dbResult.Db.Config.ReplicationCheckpoint,
321+
tracker,
320322
dbConfig.ChunkSize);
321323

322-
var indexReader = new IndexReaderForAccumulator<TStreamId>(readIndex);
324+
var indexReader = new IndexReaderForAccumulator<TStreamId>(readIndex, tracker);
323325

324326
var accumulatorMetastreamLookup = new AdHocMetastreamLookupInterceptor<TStreamId>(
325327
metastreamLookup,
@@ -332,8 +334,9 @@ private async Task<DbResult> RunInternalAsync(
332334
var calculatorIndexReader = new AdHocIndexReaderInterceptor<TStreamId>(
333335
new IndexReaderForCalculator<TStreamId>(
334336
readIndex,
335-
tracker => new TFReaderLease(readerPool, tracker),
336-
scavengeState.LookupUniqueHashUser),
337+
() => new TFReaderLease(readerPool, tracker),
338+
scavengeState.LookupUniqueHashUser,
339+
tracker),
337340
(f, handle, from, maxCount, x) => {
338341
if (_calculatingCancellationTrigger != null)
339342
if ((handle.Kind == StreamHandle.Kind.Hash && handle.StreamHash == hasher.Hash(_calculatingCancellationTrigger)) ||
@@ -403,7 +406,8 @@ private async Task<DbResult> RunInternalAsync(
403406
new ChunkManagerForExecutor<TStreamId>(
404407
logger,
405408
dbResult.Db.Manager,
406-
dbConfig),
409+
dbConfig,
410+
tracker),
407411
Tracer),
408412
chunkSize: dbConfig.ChunkSize,
409413
unsafeIgnoreHardDeletes: _unsafeIgnoreHardDeletes,
@@ -414,13 +418,13 @@ private async Task<DbResult> RunInternalAsync(
414418
IChunkMerger chunkMerger = new ChunkMerger(
415419
logger: logger,
416420
mergeChunks: _mergeChunks,
417-
new OldScavengeChunkMergerBackend(logger, dbResult.Db),
421+
new OldScavengeChunkMergerBackend(logger, dbResult.Db, tracker),
418422
throttle: throttle);
419423

420424
IIndexExecutor<TStreamId> indexExecutor = new IndexExecutor<TStreamId>(
421425
logger: logger,
422426
indexScavenger: cancellationWrappedIndexScavenger,
423-
streamLookup: new ChunkReaderForIndexExecutor<TStreamId>(tracker => new TFReaderLease(readerPool, tracker)),
427+
streamLookup: new ChunkReaderForIndexExecutor<TStreamId>(() => new TFReaderLease(readerPool, tracker)),
424428
unsafeIgnoreHardDeletes: _unsafeIgnoreHardDeletes,
425429
restPeriod: restPeriod,
426430
throttle: throttle);

src/EventStore.Core/ClusterVNode.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
using Microsoft.Data.Sqlite;
6767
using Mono.Unix.Native;
6868
using ILogger = Serilog.ILogger;
69+
using EventStore.Core.Services.UserManagement;
6970

7071
namespace EventStore.Core {
7172
public abstract class ClusterVNode {
@@ -1278,6 +1279,8 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
12781279
},
12791280
dispose: backend => backend.Dispose());
12801281

1282+
var tracker = trackers.TransactionFileTrackers.GetOrAdd(SystemAccounts.SystemScavengeName);
1283+
12811284
var state = new ScavengeState<TStreamId>(
12821285
logger,
12831286
longHasher,
@@ -1294,17 +1297,19 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
12941297
logFormat.Metastreams,
12951298
logFormat.StreamIdConverter,
12961299
Db.Config.ReplicationCheckpoint,
1300+
tracker,
12971301
TFConsts.ChunkSize),
1298-
index: new IndexReaderForAccumulator<TStreamId>(readIndex),
1302+
index: new IndexReaderForAccumulator<TStreamId>(readIndex, tracker),
12991303
cancellationCheckPeriod: cancellationCheckPeriod,
13001304
throttle: throttle);
13011305

13021306
var calculator = new Calculator<TStreamId>(
13031307
logger: logger,
13041308
new IndexReaderForCalculator<TStreamId>(
13051309
readIndex,
1306-
tracker => new TFReaderLease(readerPool, tracker),
1307-
state.LookupUniqueHashUser),
1310+
() => new TFReaderLease(readerPool, tracker),
1311+
state.LookupUniqueHashUser,
1312+
tracker),
13081313
chunkSize: TFConsts.ChunkSize,
13091314
cancellationCheckPeriod: cancellationCheckPeriod,
13101315
buffer: calculatorBuffer,
@@ -1313,7 +1318,7 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
13131318
var chunkExecutor = new ChunkExecutor<TStreamId, ILogRecord>(
13141319
logger,
13151320
logFormat.Metastreams,
1316-
new ChunkManagerForExecutor<TStreamId>(logger, Db.Manager, Db.Config),
1321+
new ChunkManagerForExecutor<TStreamId>(logger, Db.Manager, Db.Config, tracker),
13171322
chunkSize: Db.Config.ChunkSize,
13181323
unsafeIgnoreHardDeletes: options.Database.UnsafeIgnoreHardDelete,
13191324
cancellationCheckPeriod: cancellationCheckPeriod,
@@ -1323,13 +1328,13 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
13231328
var chunkMerger = new ChunkMerger(
13241329
logger: logger,
13251330
mergeChunks: !options.Database.DisableScavengeMerging,
1326-
backend: new OldScavengeChunkMergerBackend(logger, db: Db),
1331+
backend: new OldScavengeChunkMergerBackend(logger, db: Db, tracker: tracker),
13271332
throttle: throttle);
13281333

13291334
var indexExecutor = new IndexExecutor<TStreamId>(
13301335
logger,
13311336
new IndexScavenger(tableIndex),
1332-
new ChunkReaderForIndexExecutor<TStreamId>(tracker => new TFReaderLease(readerPool, tracker)),
1337+
new ChunkReaderForIndexExecutor<TStreamId>(() => new TFReaderLease(readerPool, tracker)),
13331338
unsafeIgnoreHardDeletes: options.Database.UnsafeIgnoreHardDelete,
13341339
restPeriod: 32_768,
13351340
throttle: throttle);

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkManagerForExecutor.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@ public class ChunkManagerForExecutor<TStreamId> : IChunkManagerForChunkExecutor<
99
private readonly ILogger _logger;
1010
private readonly TFChunkManager _manager;
1111
private readonly TFChunkDbConfig _dbConfig;
12+
private readonly ITransactionFileTracker _tracker;
1213

13-
public ChunkManagerForExecutor(ILogger logger, TFChunkManager manager, TFChunkDbConfig dbConfig) {
14+
public ChunkManagerForExecutor(ILogger logger, TFChunkManager manager, TFChunkDbConfig dbConfig,
15+
ITransactionFileTracker tracker) {
1416
_logger = logger;
1517
_manager = manager;
1618
_dbConfig = dbConfig;
19+
_tracker = tracker;
1720
}
1821

1922
public IChunkWriterForExecutor<TStreamId, ILogRecord> CreateChunkWriter(
@@ -24,7 +27,7 @@ public IChunkWriterForExecutor<TStreamId, ILogRecord> CreateChunkWriter(
2427

2528
public IChunkReaderForExecutor<TStreamId, ILogRecord> GetChunkReaderFor(long position) {
2629
var tfChunk = _manager.GetChunkFor(position);
27-
return new ChunkReaderForExecutor<TStreamId>(tfChunk);
30+
return new ChunkReaderForExecutor<TStreamId>(tfChunk, _tracker);
2831
}
2932

3033
public void SwitchChunk(

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkReaderForAccumulator.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class ChunkReaderForAccumulator<TStreamId> : IChunkReaderForAccumulator<T
1414
private readonly IMetastreamLookup<TStreamId> _metaStreamLookup;
1515
private readonly IStreamIdConverter<TStreamId> _streamIdConverter;
1616
private readonly ICheckpoint _replicationChk;
17+
private readonly ITransactionFileTracker _tracker;
1718
private readonly int _chunkSize;
1819

1920
private readonly Func<int, byte[]> _getBuffer;
@@ -24,12 +25,14 @@ public ChunkReaderForAccumulator(
2425
IMetastreamLookup<TStreamId> metastreamLookup,
2526
IStreamIdConverter<TStreamId> streamIdConverter,
2627
ICheckpoint replicationChk,
28+
ITransactionFileTracker tracker,
2729
int chunkSize) {
2830

2931
_manager = manager;
3032
_metaStreamLookup = metastreamLookup;
3133
_streamIdConverter = streamIdConverter;
3234
_replicationChk = replicationChk;
35+
_tracker = tracker;
3336
_chunkSize = chunkSize;
3437

3538
var reusableRecordBuffer = new ReusableBuffer(8192);
@@ -62,7 +65,7 @@ public IEnumerable<AccumulatorRecordType> ReadChunkInto(
6265

6366
var localPos = chunk.ChunkHeader.GetLocalLogPosition(nextPos);
6467

65-
var result = chunk.TryReadClosestForwardRaw(localPos, _getBuffer, ITransactionFileTracker.NoOp); //qq plumb through all occurrences of noop
68+
var result = chunk.TryReadClosestForwardRaw(localPos, _getBuffer, _tracker);
6669

6770
if (!result.Success) {
6871
// there is no need to release the reusable buffer here since result.Success is false

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkReaderForExecutor.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
namespace EventStore.Core.TransactionLog.Scavenging {
77
public class ChunkReaderForExecutor<TStreamId> : IChunkReaderForExecutor<TStreamId, ILogRecord> {
88
private readonly TFChunk _chunk;
9+
private readonly ITransactionFileTracker _tracker;
910

10-
public ChunkReaderForExecutor(TFChunk chunk) {
11+
public ChunkReaderForExecutor(TFChunk chunk,
12+
ITransactionFileTracker tracker) {
1113
_chunk = chunk;
14+
_tracker = tracker;
1215
}
1316

1417
public string Name => _chunk.ToString();
@@ -30,7 +33,7 @@ public IEnumerable<bool> ReadInto(
3033
RecordForExecutor<TStreamId, ILogRecord>.NonPrepare nonPrepare,
3134
RecordForExecutor<TStreamId, ILogRecord>.Prepare prepare) {
3235

33-
var result = _chunk.TryReadFirst(ITransactionFileTracker.NoOp); //qq
36+
var result = _chunk.TryReadFirst(_tracker);
3437
while (result.Success) {
3538
var record = result.LogRecord;
3639
if (record.RecordType != LogRecordType.Prepare) {
@@ -51,7 +54,7 @@ public IEnumerable<bool> ReadInto(
5154
yield return true;
5255
}
5356

54-
result = _chunk.TryReadClosestForward(result.NextPosition, ITransactionFileTracker.NoOp); //qq
57+
result = _chunk.TryReadClosestForward(result.NextPosition, _tracker);
5558
}
5659
}
5760
}

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkReaderForIndexExecutor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33

44
namespace EventStore.Core.TransactionLog.Scavenging {
55
public class ChunkReaderForIndexExecutor<TStreamId> : IChunkReaderForIndexExecutor<TStreamId> {
6-
private readonly Func<ITransactionFileTracker, TFReaderLease> _tfReaderFactory;
6+
private readonly Func<TFReaderLease> _tfReaderFactory;
77

8-
public ChunkReaderForIndexExecutor(Func<ITransactionFileTracker, TFReaderLease> tfReaderFactory) {
8+
public ChunkReaderForIndexExecutor(Func<TFReaderLease> tfReaderFactory) {
99
_tfReaderFactory = tfReaderFactory;
1010
}
1111

1212
public bool TryGetStreamId(long position, out TStreamId streamId) {
13-
using (var reader = _tfReaderFactory(ITransactionFileTracker.NoOp)) { //qq
13+
using (var reader = _tfReaderFactory()) {
1414
var result = reader.TryReadAt(position, couldBeScavenged: true);
1515
if (!result.Success) {
1616
streamId = default;

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/IndexReaderForAccumulator.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
namespace EventStore.Core.TransactionLog.Scavenging {
55
public class IndexReaderForAccumulator<TStreamId> : IIndexReaderForAccumulator<TStreamId> {
66
private readonly IReadIndex<TStreamId> _readIndex;
7+
private readonly ITransactionFileTracker _tracker;
78

8-
public IndexReaderForAccumulator(IReadIndex<TStreamId> readIndex) {
9+
public IndexReaderForAccumulator(IReadIndex<TStreamId> readIndex, ITransactionFileTracker tracker) {
910
_readIndex = readIndex;
11+
_tracker = tracker;
1012
}
1113

1214
// reads a stream forward but only returns event info not the full event.
@@ -30,7 +32,8 @@ public IndexReadEventInfoResult ReadEventInfoForward(
3032
handle.StreamId,
3133
fromEventNumber,
3234
maxCount,
33-
scavengePoint.Position, ITransactionFileTracker.NoOp);
35+
scavengePoint.Position,
36+
_tracker);
3437
default:
3538
throw new ArgumentOutOfRangeException(nameof(handle), handle, null);
3639
}
@@ -52,14 +55,16 @@ public IndexReadEventInfoResult ReadEventInfoBackward(
5255
_ => streamId,
5356
fromEventNumber,
5457
maxCount,
55-
scavengePoint.Position, ITransactionFileTracker.NoOp);
58+
scavengePoint.Position,
59+
_tracker);
5660
case StreamHandle.Kind.Id:
5761
// uses log to check for hash collisions
5862
return _readIndex.ReadEventInfoBackward_KnownCollisions(
5963
handle.StreamId,
6064
fromEventNumber,
6165
maxCount,
62-
scavengePoint.Position, ITransactionFileTracker.NoOp);
66+
scavengePoint.Position,
67+
_tracker);
6368
default:
6469
throw new ArgumentOutOfRangeException(nameof(handle), handle, null);
6570
}

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/IndexReaderForCalculator.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@
55
namespace EventStore.Core.TransactionLog.Scavenging {
66
public class IndexReaderForCalculator<TStreamId> : IIndexReaderForCalculator<TStreamId> {
77
private readonly IReadIndex<TStreamId> _readIndex;
8-
private readonly Func<ITransactionFileTracker, TFReaderLease> _tfReaderFactory;
8+
private readonly Func<TFReaderLease> _tfReaderFactory;
99
private readonly Func<ulong, TStreamId> _lookupUniqueHashUser;
10+
private readonly ITransactionFileTracker _tracker;
1011

1112
public IndexReaderForCalculator(
1213
IReadIndex<TStreamId> readIndex,
13-
Func<ITransactionFileTracker, TFReaderLease> tfReaderFactory,
14-
Func<ulong, TStreamId> lookupUniqueHashUser) {
14+
Func<TFReaderLease> tfReaderFactory,
15+
Func<ulong, TStreamId> lookupUniqueHashUser,
16+
ITransactionFileTracker tracker) {
1517

1618
_readIndex = readIndex;
1719
_tfReaderFactory = tfReaderFactory;
1820
_lookupUniqueHashUser = lookupUniqueHashUser;
21+
_tracker = tracker;
1922
}
2023

2124
public long GetLastEventNumber(
@@ -28,12 +31,12 @@ public long GetLastEventNumber(
2831
return _readIndex.GetStreamLastEventNumber_NoCollisions(
2932
handle.StreamHash,
3033
_lookupUniqueHashUser,
31-
scavengePoint.Position, ITransactionFileTracker.NoOp);
34+
scavengePoint.Position, _tracker);
3235
case StreamHandle.Kind.Id:
3336
// uses the index and the log to fetch the last event number
3437
return _readIndex.GetStreamLastEventNumber_KnownCollisions(
3538
handle.StreamId,
36-
scavengePoint.Position, ITransactionFileTracker.NoOp);
39+
scavengePoint.Position, _tracker);
3740
default:
3841
throw new ArgumentOutOfRangeException(nameof(handle), handle, null);
3942
}
@@ -59,14 +62,14 @@ public IndexReadEventInfoResult ReadEventInfoForward(
5962
handle.StreamId,
6063
fromEventNumber,
6164
maxCount,
62-
scavengePoint.Position, ITransactionFileTracker.NoOp);
65+
scavengePoint.Position, _tracker);
6366
default:
6467
throw new ArgumentOutOfRangeException(nameof(handle), handle, null);
6568
}
6669
}
6770

6871
public bool IsTombstone(long logPosition) {
69-
using (var reader = _tfReaderFactory(ITransactionFileTracker.NoOp)) { //qq
72+
using (var reader = _tfReaderFactory()) {
7073
var result = reader.TryReadAt(logPosition, couldBeScavenged: true);
7174

7275
if (!result.Success)

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/OldScavengeChunkMergerBackend.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ namespace EventStore.Core.TransactionLog.Scavenging {
66
public class OldScavengeChunkMergerBackend : IChunkMergerBackend {
77
private readonly ILogger _logger;
88
private readonly TFChunkDb _db;
9+
private readonly ITransactionFileTracker _tracker;
910

10-
public OldScavengeChunkMergerBackend(ILogger logger, TFChunkDb db) {
11+
public OldScavengeChunkMergerBackend(ILogger logger, TFChunkDb db, ITransactionFileTracker tracker) {
1112
_logger = logger;
1213
_db = db;
14+
_tracker = tracker;
1315
}
1416

1517
public void MergeChunks(
@@ -29,7 +31,7 @@ public void MergeChunks(
2931
maxChunkDataSize: _db.Config.ChunkSize,
3032
scavengerLog: scavengerLogger,
3133
throttle: throttle,
32-
tracker: ITransactionFileTracker.NoOp,
34+
tracker: _tracker,
3335
ct: cancellationToken);
3436
}
3537
}

0 commit comments

Comments
 (0)