Skip to content

Commit a188b4e

Browse files
more plumbing (new scavenge done)
1 parent 8636ac7 commit a188b4e

File tree

6 files changed

+30
-14
lines changed

6 files changed

+30
-14
lines changed

src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,15 @@ private async Task<DbResult> RunInternalAsync(
247247
}
248248

249249
var hasher = new CompositeHasher<TStreamId>(lowHasher, highHasher);
250+
var tracker = ITransactionFileTracker.NoOp;
250251

251252
var tableIndex = new TableIndex<TStreamId>(
252253
directory: indexPath,
253254
lowHasher: lowHasher,
254255
highHasher: highHasher,
255256
emptyStreamId: logFormat.EmptyStreamId,
256257
memTableFactory: () => new HashListMemTable(PTableVersions.IndexV4, maxSize: 200),
257-
tfReaderFactory: _ => new TFReaderLease(readerPool, ITransactionFileTracker.NoOp),
258+
tfReaderFactory: _ => new TFReaderLease(readerPool, tracker),
258259
ptableVersion: PTableVersions.IndexV4,
259260
maxAutoMergeIndexLevel: int.MaxValue,
260261
pTableMaxReaderCount: ESConsts.PTableInitialReaderCount,
@@ -317,6 +318,7 @@ private async Task<DbResult> RunInternalAsync(
317318
metastreamLookup,
318319
logFormat.StreamIdConverter,
319320
dbResult.Db.Config.ReplicationCheckpoint,
321+
tracker,
320322
dbConfig.ChunkSize);
321323

322324
var indexReader = new IndexReaderForAccumulator<TStreamId>(readIndex);
@@ -403,7 +405,8 @@ private async Task<DbResult> RunInternalAsync(
403405
new ChunkManagerForExecutor<TStreamId>(
404406
logger,
405407
dbResult.Db.Manager,
406-
dbConfig),
408+
dbConfig,
409+
tracker),
407410
Tracer),
408411
chunkSize: dbConfig.ChunkSize,
409412
unsafeIgnoreHardDeletes: _unsafeIgnoreHardDeletes,
@@ -420,7 +423,7 @@ private async Task<DbResult> RunInternalAsync(
420423
IIndexExecutor<TStreamId> indexExecutor = new IndexExecutor<TStreamId>(
421424
logger: logger,
422425
indexScavenger: cancellationWrappedIndexScavenger,
423-
streamLookup: new ChunkReaderForIndexExecutor<TStreamId>(tracker => new TFReaderLease(readerPool, tracker)),
426+
streamLookup: new ChunkReaderForIndexExecutor<TStreamId>(() => new TFReaderLease(readerPool, tracker)),
424427
unsafeIgnoreHardDeletes: _unsafeIgnoreHardDeletes,
425428
restPeriod: restPeriod,
426429
throttle: throttle);

src/EventStore.Core/ClusterVNode.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
using Microsoft.Data.Sqlite;
6767
using Mono.Unix.Native;
6868
using ILogger = Serilog.ILogger;
69+
using EventStore.Core.Services.UserManagement;
6970

7071
namespace EventStore.Core {
7172
public abstract class ClusterVNode {
@@ -1278,6 +1279,8 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
12781279
},
12791280
dispose: backend => backend.Dispose());
12801281

1282+
var tracker = trackers.TransactionFileTrackers.GetOrAdd(SystemAccounts.SystemScavengeName);
1283+
12811284
var state = new ScavengeState<TStreamId>(
12821285
logger,
12831286
longHasher,
@@ -1294,6 +1297,7 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
12941297
logFormat.Metastreams,
12951298
logFormat.StreamIdConverter,
12961299
Db.Config.ReplicationCheckpoint,
1300+
tracker,
12971301
TFConsts.ChunkSize),
12981302
index: new IndexReaderForAccumulator<TStreamId>(readIndex),
12991303
cancellationCheckPeriod: cancellationCheckPeriod,
@@ -1313,7 +1317,7 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
13131317
var chunkExecutor = new ChunkExecutor<TStreamId, ILogRecord>(
13141318
logger,
13151319
logFormat.Metastreams,
1316-
new ChunkManagerForExecutor<TStreamId>(logger, Db.Manager, Db.Config),
1320+
new ChunkManagerForExecutor<TStreamId>(logger, Db.Manager, Db.Config, tracker),
13171321
chunkSize: Db.Config.ChunkSize,
13181322
unsafeIgnoreHardDeletes: options.Database.UnsafeIgnoreHardDelete,
13191323
cancellationCheckPeriod: cancellationCheckPeriod,
@@ -1329,7 +1333,7 @@ GossipAdvertiseInfo GetGossipAdvertiseInfo() {
13291333
var indexExecutor = new IndexExecutor<TStreamId>(
13301334
logger,
13311335
new IndexScavenger(tableIndex),
1332-
new ChunkReaderForIndexExecutor<TStreamId>(tracker => new TFReaderLease(readerPool, tracker)),
1336+
new ChunkReaderForIndexExecutor<TStreamId>(() => new TFReaderLease(readerPool, tracker)),
13331337
unsafeIgnoreHardDeletes: options.Database.UnsafeIgnoreHardDelete,
13341338
restPeriod: 32_768,
13351339
throttle: throttle);

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkManagerForExecutor.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@ public class ChunkManagerForExecutor<TStreamId> : IChunkManagerForChunkExecutor<
99
private readonly ILogger _logger;
1010
private readonly TFChunkManager _manager;
1111
private readonly TFChunkDbConfig _dbConfig;
12+
private readonly ITransactionFileTracker _tracker;
1213

13-
public ChunkManagerForExecutor(ILogger logger, TFChunkManager manager, TFChunkDbConfig dbConfig) {
14+
public ChunkManagerForExecutor(ILogger logger, TFChunkManager manager, TFChunkDbConfig dbConfig,
15+
ITransactionFileTracker tracker) {
1416
_logger = logger;
1517
_manager = manager;
1618
_dbConfig = dbConfig;
19+
_tracker = tracker;
1720
}
1821

1922
public IChunkWriterForExecutor<TStreamId, ILogRecord> CreateChunkWriter(
@@ -24,7 +27,7 @@ public IChunkWriterForExecutor<TStreamId, ILogRecord> CreateChunkWriter(
2427

2528
public IChunkReaderForExecutor<TStreamId, ILogRecord> GetChunkReaderFor(long position) {
2629
var tfChunk = _manager.GetChunkFor(position);
27-
return new ChunkReaderForExecutor<TStreamId>(tfChunk);
30+
return new ChunkReaderForExecutor<TStreamId>(tfChunk, _tracker);
2831
}
2932

3033
public void SwitchChunk(

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkReaderForAccumulator.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class ChunkReaderForAccumulator<TStreamId> : IChunkReaderForAccumulator<T
1414
private readonly IMetastreamLookup<TStreamId> _metaStreamLookup;
1515
private readonly IStreamIdConverter<TStreamId> _streamIdConverter;
1616
private readonly ICheckpoint _replicationChk;
17+
private readonly ITransactionFileTracker _tracker;
1718
private readonly int _chunkSize;
1819

1920
private readonly Func<int, byte[]> _getBuffer;
@@ -24,12 +25,14 @@ public ChunkReaderForAccumulator(
2425
IMetastreamLookup<TStreamId> metastreamLookup,
2526
IStreamIdConverter<TStreamId> streamIdConverter,
2627
ICheckpoint replicationChk,
28+
ITransactionFileTracker tracker,
2729
int chunkSize) {
2830

2931
_manager = manager;
3032
_metaStreamLookup = metastreamLookup;
3133
_streamIdConverter = streamIdConverter;
3234
_replicationChk = replicationChk;
35+
_tracker = tracker;
3336
_chunkSize = chunkSize;
3437

3538
var reusableRecordBuffer = new ReusableBuffer(8192);
@@ -62,7 +65,7 @@ public IEnumerable<AccumulatorRecordType> ReadChunkInto(
6265

6366
var localPos = chunk.ChunkHeader.GetLocalLogPosition(nextPos);
6467

65-
var result = chunk.TryReadClosestForwardRaw(localPos, _getBuffer, ITransactionFileTracker.NoOp); //qq plumb through all occurrences of noop
68+
var result = chunk.TryReadClosestForwardRaw(localPos, _getBuffer, _tracker);
6669

6770
if (!result.Success) {
6871
// there is no need to release the reusable buffer here since result.Success is false

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkReaderForExecutor.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
namespace EventStore.Core.TransactionLog.Scavenging {
77
public class ChunkReaderForExecutor<TStreamId> : IChunkReaderForExecutor<TStreamId, ILogRecord> {
88
private readonly TFChunk _chunk;
9+
private readonly ITransactionFileTracker _tracker;
910

10-
public ChunkReaderForExecutor(TFChunk chunk) {
11+
public ChunkReaderForExecutor(TFChunk chunk,
12+
ITransactionFileTracker tracker) {
1113
_chunk = chunk;
14+
_tracker = tracker;
1215
}
1316

1417
public string Name => _chunk.ToString();
@@ -30,7 +33,7 @@ public IEnumerable<bool> ReadInto(
3033
RecordForExecutor<TStreamId, ILogRecord>.NonPrepare nonPrepare,
3134
RecordForExecutor<TStreamId, ILogRecord>.Prepare prepare) {
3235

33-
var result = _chunk.TryReadFirst(ITransactionFileTracker.NoOp); //qq
36+
var result = _chunk.TryReadFirst(_tracker);
3437
while (result.Success) {
3538
var record = result.LogRecord;
3639
if (record.RecordType != LogRecordType.Prepare) {
@@ -51,7 +54,7 @@ public IEnumerable<bool> ReadInto(
5154
yield return true;
5255
}
5356

54-
result = _chunk.TryReadClosestForward(result.NextPosition, ITransactionFileTracker.NoOp); //qq
57+
result = _chunk.TryReadClosestForward(result.NextPosition, _tracker);
5558
}
5659
}
5760
}

src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkReaderForIndexExecutor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33

44
namespace EventStore.Core.TransactionLog.Scavenging {
55
public class ChunkReaderForIndexExecutor<TStreamId> : IChunkReaderForIndexExecutor<TStreamId> {
6-
private readonly Func<ITransactionFileTracker, TFReaderLease> _tfReaderFactory;
6+
private readonly Func<TFReaderLease> _tfReaderFactory;
77

8-
public ChunkReaderForIndexExecutor(Func<ITransactionFileTracker, TFReaderLease> tfReaderFactory) {
8+
public ChunkReaderForIndexExecutor(Func<TFReaderLease> tfReaderFactory) {
99
_tfReaderFactory = tfReaderFactory;
1010
}
1111

1212
public bool TryGetStreamId(long position, out TStreamId streamId) {
13-
using (var reader = _tfReaderFactory(ITransactionFileTracker.NoOp)) { //qq
13+
using (var reader = _tfReaderFactory()) {
1414
var result = reader.TryReadAt(position, couldBeScavenged: true);
1515
if (!result.Success) {
1616
streamId = default;

0 commit comments

Comments
 (0)