Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c6ec514
wip: do not build ITransactionFileTracker directly into the chunks, f…
timothycoleman Nov 21, 2024
9e87c73
add infra for setting/unsetting the tracker when checking out a reade…
timothycoleman Nov 21, 2024
701a4ca
builds! plumbing towards providing the tracker on read
timothycoleman Nov 21, 2024
f2a7408
more
timothycoleman Nov 21, 2024
242e1f6
create transaction file trackers on demand per user
timothycoleman Nov 21, 2024
a740bb3
plumb it to enough places to see $all reads from the webui
timothycoleman Nov 22, 2024
77df9d3
Enumerate source rather than just cached/uncached (allows for Archive…
timothycoleman Nov 22, 2024
24fa536
plumb more api
timothycoleman Nov 22, 2024
c0e3368
more plumbing
timothycoleman Nov 22, 2024
17c3413
more plumbing
timothycoleman Nov 22, 2024
0bc3919
more plumbing (tfchunkreadside complete)
timothycoleman Nov 22, 2024
fb1a59c
more plumbing (TFChunk complete)
timothycoleman Nov 22, 2024
5519488
more plumbing (TableIndex done)
timothycoleman Nov 22, 2024
fd29d2f
more plumbing (RedactionService done)
timothycoleman Nov 22, 2024
1600554
adjust tests
timothycoleman Nov 22, 2024
642c802
more plumbing (Index committer done)
timothycoleman Nov 22, 2024
e7ad9d6
more plumbing (LogV2StreamExistenceFilterInitializer done)
timothycoleman Nov 22, 2024
262b6a0
more plumbing (IndexReader done)
timothycoleman Nov 22, 2024
c407241
more plumbing (ReadIndex done)
timothycoleman Nov 22, 2024
04918f2
more plumbing (Persistent subscription service done)
timothycoleman Nov 22, 2024
5dd4b8e
more plumbing (StorageReaderWorker done)
timothycoleman Nov 23, 2024
8636ac7
more plumbing (old Scavenge done)
timothycoleman Nov 23, 2024
54ee607
more plumbing (new scavenge done)
timothycoleman Nov 23, 2024
1707e8f
more plumbing (subscriptions done)
timothycoleman Nov 23, 2024
a49835c
more plumbing (index writer done)
timothycoleman Nov 23, 2024
d637d46
pass tracker on each call to TFChunkReader..... it seemed appearing t…
timothycoleman Nov 23, 2024
8d2cc42
more plumbing (epoch manager and telemetry service done)
timothycoleman Nov 24, 2024
4a783c1
more plumbing (chaser done)
timothycoleman Nov 24, 2024
d1e8843
consistency
timothycoleman Nov 24, 2024
5f44775
no need to separate index-scavenge from scavenge
timothycoleman Nov 25, 2024
f2e763c
track replication reads
timothycoleman Nov 25, 2024
3927a1c
loose ends
timothycoleman Nov 26, 2024
9ac0579
rename Disk source to File, since when reading from File we might sti…
timothycoleman Nov 26, 2024
9ab9fc5
minor adjustment
timothycoleman Nov 26, 2024
544908c
reduce granularity
timothycoleman Nov 27, 2024
8e9f52d
disable archive source for 23.10
timothycoleman Nov 27, 2024
1b7d7e2
even less granular
timothycoleman Nov 27, 2024
1b30a3a
better way of removing archive
timothycoleman Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/EventStore.Core.Tests/Fakes/FakeTfReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ public void Reposition(long position) {
throw new NotImplementedException();
}

public SeqReadResult TryReadNext() {
public SeqReadResult TryReadNext(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public SeqReadResult TryReadPrev() {
public SeqReadResult TryReadPrev(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public RecordReadResult TryReadAt(long position, bool couldBeScavenged) {
public RecordReadResult TryReadAt(long position, bool couldBeScavenged, ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public bool ExistsAt(long position) {
public bool ExistsAt(long position, ITransactionFileTracker tracker) {
return true;
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/EventStore.Core.Tests/IIndexReaderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
using EventStore.Core.Services.Storage.ReaderIndex;
using EventStore.Core.TransactionLog;

namespace EventStore.Core.Tests {
public static class IIndexReaderExtensions {
public static IndexReadEventResult ReadEvent(this IIndexReader<string> index, string streamName, long eventNumber) =>
index.ReadEvent(streamName, streamName, eventNumber);
index.ReadEvent(streamName, streamName, eventNumber, ITransactionFileTracker.NoOp);

public static IndexReadStreamResult ReadStreamEventsBackward(this IIndexReader<string> index, string streamName, long fromEventNumber, int maxCount) =>
index.ReadStreamEventsBackward(streamName, streamName, fromEventNumber, maxCount);
index.ReadStreamEventsBackward(streamName, streamName, fromEventNumber, maxCount, ITransactionFileTracker.NoOp);

public static IndexReadStreamResult ReadStreamEventsForward(this IIndexReader<string> index, string streamName, long fromEventNumber, int maxCount) =>
index.ReadStreamEventsForward(streamName, streamName, fromEventNumber, maxCount);
index.ReadStreamEventsForward(streamName, streamName, fromEventNumber, maxCount, ITransactionFileTracker.NoOp);
}
}
8 changes: 4 additions & 4 deletions src/EventStore.Core.Tests/Index/FakeIndexReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ public void Reposition(long position) {
throw new NotImplementedException();
}

public SeqReadResult TryReadNext() {
public SeqReadResult TryReadNext(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public SeqReadResult TryReadPrev() {
public SeqReadResult TryReadPrev(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public RecordReadResult TryReadAt(long position, bool couldBeScavenged) {
public RecordReadResult TryReadAt(long position, bool couldBeScavenged, ITransactionFileTracker tracker) {
var record = (LogRecord)new PrepareLogRecord(position, Guid.NewGuid(), Guid.NewGuid(), 0, 0,
position.ToString(), null, -1, DateTime.UtcNow, PrepareFlags.None, "type", null,
new byte[0], null);
return new RecordReadResult(true, position + 1, record, 1);
}

public bool ExistsAt(long position) {
public bool ExistsAt(long position, ITransactionFileTracker tracker) {
return _existsAt(position);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public override async Task TestFixtureSetUp() {
_highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex<string>(PathName, _lowHasher, _highHasher, "",
() => new HashListMemTable(version: _ptableVersion, maxSize: 40),
() => { throw new InvalidOperationException(); },
_ => { throw new InvalidOperationException(); },
_ptableVersion,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public override async Task TestFixtureSetUp() {
await base.TestFixtureSetUp();

_indexDir = PathName;
var fakeReader = new TFReaderLease(new FakeTfReader());
var fakeReader = new TFReaderLease(new FakeTfReader(), ITransactionFileTracker.NoOp);
_lowHasher = new XXHashUnsafe();
_highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(_ptableVersion, maxSize: 10),
() => fakeReader,
_ => fakeReader,
_ptableVersion,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public override async Task TestFixtureSetUp() {
var highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex<string>(PathName, lowHasher, highHasher, "",
() => new HashListMemTable(_ptableVersion, maxSize: 20),
() => { throw new InvalidOperationException(); },
_ => { throw new InvalidOperationException(); },
_ptableVersion,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ public void ConstructTableIndexWithCorruptIndexEntries(byte version, bool skipIn
bool createForceVerifyFile = false) {
var lowHasher = new XXHashUnsafe();
var highHasher = new Murmur3AUnsafe();
var fakeReader = new TFReaderLease(new FakeIndexReader());
var fakeReader = new TFReaderLease(new FakeIndexReader(), ITransactionFileTracker.NoOp);

_tableIndex = new TableIndex<string>(PathName, lowHasher, highHasher, "",
() => new HashListMemTable(version, maxSize: NumIndexEntries),
() => fakeReader,
_ => fakeReader,
version,
int.MaxValue, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: NumIndexEntries,
Expand Down Expand Up @@ -67,7 +67,7 @@ public void ConstructTableIndexWithCorruptIndexEntries(byte version, bool skipIn
//load table index again
_tableIndex = new TableIndex<string>(PathName, lowHasher, highHasher, "",
() => new HashListMemTable(version, maxSize: NumIndexEntries),
() => fakeReader,
_ => fakeReader,
version,
int.MaxValue, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: NumIndexEntries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public override async Task TestFixtureSetUp() {
await base.TestFixtureSetUp();

_indexDir = PathName;
var fakeReader = new TFReaderLease(new FakeIndexReader());
var fakeReader = new TFReaderLease(new FakeIndexReader(), ITransactionFileTracker.NoOp);
_lowHasher = new FakeIndexHasher();
_highHasher = new FakeIndexHasher();
_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(_ptableVersion, maxSize: 10),
() => fakeReader,
_ => fakeReader,
_ptableVersion,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public override async Task TestFixtureSetUp() {
await base.TestFixtureSetUp();

_indexDir = PathName;
var fakeReader = new TFReaderLease(new FakeIndexReader());
var fakeReader = new TFReaderLease(new FakeIndexReader(), ITransactionFileTracker.NoOp);
_lowHasher = new XXHashUnsafe();
_highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV1, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV1,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 5 + _extraStreamHashesAtBeginning + _extraStreamHashesAtEnd,
Expand All @@ -66,7 +66,7 @@ public override async Task TestFixtureSetUp() {

_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(_ptableVersion, maxSize: 5),
() => fakeReader,
_ => fakeReader,
_ptableVersion,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 5,
Expand Down Expand Up @@ -150,22 +150,22 @@ public void Reposition(long position) {
throw new NotImplementedException();
}

public SeqReadResult TryReadNext() {
public SeqReadResult TryReadNext(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public SeqReadResult TryReadPrev() {
public SeqReadResult TryReadPrev(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public RecordReadResult TryReadAt(long position, bool couldBeScavenged) {
public RecordReadResult TryReadAt(long position, bool couldBeScavenged, ITransactionFileTracker tracker) {
var record = (LogRecord)new PrepareLogRecord(position, Guid.NewGuid(), Guid.NewGuid(), 0, 0,
position % 2 == 0 ? "account--696193173" : "LPN-FC002_LPK51001", null, -1, DateTime.UtcNow, PrepareFlags.None,
"type", null, new byte[0], null);
return new RecordReadResult(true, position + 1, record, 1);
}

public bool ExistsAt(long position) {
public bool ExistsAt(long position, ITransactionFileTracker tracker) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public override async Task TestFixtureSetUp() {

var emptyStreamId = LogFormatHelper<TLogFormat, TStreamId>.EmptyStreamId;
_indexDir = PathName;
var fakeReader = new TFReaderLease(new FakeIndexReader2());
var fakeReader = new TFReaderLease(new FakeIndexReader2(), ITransactionFileTracker.NoOp);
_tableIndex = new TableIndex<TStreamId>(_indexDir, _lowHasher, _highHasher, emptyStreamId,
() => new HashListMemTable(PTableVersions.IndexV1, maxSize: 3),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV1,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 3,
Expand All @@ -76,7 +76,7 @@ public override async Task TestFixtureSetUp() {

_tableIndex = new TableIndex<TStreamId>(_indexDir, _lowHasher, _highHasher, emptyStreamId,
() => new HashListMemTable(_ptableVersion, maxSize: 3),
() => fakeReader,
_ => fakeReader,
_ptableVersion,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 3,
Expand Down Expand Up @@ -139,15 +139,15 @@ public void Reposition(long position) {
throw new NotImplementedException();
}

public SeqReadResult TryReadNext() {
public SeqReadResult TryReadNext(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public SeqReadResult TryReadPrev() {
public SeqReadResult TryReadPrev(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public RecordReadResult TryReadAt(long position, bool couldBeScavenged) {
public RecordReadResult TryReadAt(long position, bool couldBeScavenged, ITransactionFileTracker tracker) {
TStreamId streamId;
switch (position) {
case 1:
Expand All @@ -170,7 +170,7 @@ public RecordReadResult TryReadAt(long position, bool couldBeScavenged) {
return new RecordReadResult(true, position + 1, record, 1);
}

public bool ExistsAt(long position) {
public bool ExistsAt(long position, ITransactionFileTracker tracker) {
return position != 2 && position != 1;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public override async Task TestFixtureSetUp() {
await base.TestFixtureSetUp();

_indexDir = PathName;
var fakeReader = new TFReaderLease(new FakeIndexReader());
var fakeReader = new TFReaderLease(new FakeIndexReader(), ITransactionFileTracker.NoOp);
_lowHasher = new XXHashUnsafe();
_highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV2, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV2,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 5,
Expand All @@ -47,7 +47,7 @@ public override async Task TestFixtureSetUp() {

_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(_ptableVersion, maxSize: 5),
() => fakeReader,
_ => fakeReader,
_ptableVersion,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 5,
Expand Down Expand Up @@ -131,22 +131,22 @@ public void Reposition(long position) {
throw new NotImplementedException();
}

public SeqReadResult TryReadNext() {
public SeqReadResult TryReadNext(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public SeqReadResult TryReadPrev() {
public SeqReadResult TryReadPrev(ITransactionFileTracker tracker) {
throw new NotImplementedException();
}

public RecordReadResult TryReadAt(long position, bool couldBeScavenged) {
public RecordReadResult TryReadAt(long position, bool couldBeScavenged, ITransactionFileTracker tracker) {
var record = (LogRecord)new PrepareLogRecord(position, Guid.NewGuid(), Guid.NewGuid(), 0, 0,
position % 2 == 0 ? "testStream-2" : "testStream-1", null, -1, DateTime.UtcNow, PrepareFlags.None, "type",
null, new byte[0], null);
return new RecordReadResult(true, position + 1, record, 1);
}

public bool ExistsAt(long position) {
public bool ExistsAt(long position, ITransactionFileTracker tracker) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ public override async Task TestFixtureSetUp() {

_indexDir = PathName;

var fakeReader = new TFReaderLease(new FakeIndexReader(l => !Deleted.Contains(l)));
var fakeReader = new TFReaderLease(new FakeIndexReader(l => !Deleted.Contains(l)), ITransactionFileTracker.NoOp);

_lowHasher = new XXHashUnsafe();
_highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV4,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 2,
Expand All @@ -63,7 +63,7 @@ public override async Task TestFixtureSetUp() {

_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV4,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ public override async Task TestFixtureSetUp() {
if (!scavengeBlocker.Wait(5000))
throw new Exception("Failed to continue.");
return false;
}));
}), ITransactionFileTracker.NoOp);

_lowHasher = new XXHashUnsafe();
_highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV4,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 2,
Expand Down Expand Up @@ -79,7 +79,7 @@ public override async Task TestFixtureSetUp() {

_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV4,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public override async Task TestFixtureSetUp() {
var fakeReader = new TFReaderLease(new FakeIndexReader(l => {
cancellationTokenSource.Cancel();
return true;
}));
}), ITransactionFileTracker.NoOp);

_lowHasher = new XXHashUnsafe();
_highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV4,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 2,
Expand All @@ -60,7 +60,7 @@ public override async Task TestFixtureSetUp() {

_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV4,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ public override async Task TestFixtureSetUp() {

_indexDir = PathName;

var fakeReader = new TFReaderLease(new FakeIndexReader());
var fakeReader = new TFReaderLease(new FakeIndexReader(), ITransactionFileTracker.NoOp);
_lowHasher = new XXHashUnsafe();
_highHasher = new Murmur3AUnsafe();
_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV4,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 2,
Expand All @@ -56,7 +56,7 @@ public override async Task TestFixtureSetUp() {

_tableIndex = new TableIndex<string>(_indexDir, _lowHasher, _highHasher, "",
() => new HashListMemTable(PTableVersions.IndexV4, maxSize: 5),
() => fakeReader,
_ => fakeReader,
PTableVersions.IndexV4,
5, Constants.PTableMaxReaderCountDefault,
maxSizeForMemory: 2,
Expand Down
Loading