diff --git a/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs b/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs index cc3cc480..e5e10d40 100644 --- a/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs +++ b/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs @@ -48,8 +48,7 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark { if (bookmark.TryGet(out var bookmarkValue)) { - // TODO: initialize reader - // reader.AdvanceTo(bookmarkValue.Value); + reader.AdvanceTo(bookmarkValue.Value); } while (true) @@ -64,8 +63,8 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark await LogShipper.ShipBuffer(connection, apiKey, batch.Value.AsArraySegment(), SendFailureHandling.Retry); - if (bookmark.TrySet(new BookmarkValue(batch.Value.ReaderHead.Chunk, - batch.Value.ReaderHead.CommitHead))) + if (bookmark.TrySet(new BufferPosition(batch.Value.ReaderHead.ChunkId, + batch.Value.ReaderHead.Offset))) { reader.AdvanceTo(batch.Value.ReaderHead); } diff --git a/src/SeqCli/Forwarder/Storage/Bookmark.cs b/src/SeqCli/Forwarder/Storage/Bookmark.cs index 44d0834d..91bfa45a 100644 --- a/src/SeqCli/Forwarder/Storage/Bookmark.cs +++ b/src/SeqCli/Forwarder/Storage/Bookmark.cs @@ -32,9 +32,9 @@ sealed class Bookmark readonly Lock _sync = new(); BookmarkName _name; - BookmarkValue? _value; + BufferPosition? _value; - Bookmark(StoreDirectory storeDirectory, BookmarkName name, BookmarkValue? value) + Bookmark(StoreDirectory storeDirectory, BookmarkName name, BufferPosition? value) { _storeDirectory = storeDirectory; _name = name; @@ -48,7 +48,7 @@ public static Bookmark Open(StoreDirectory storeDirectory) return new Bookmark(storeDirectory, name, value); } - public bool TryGet([NotNullWhen(true)] out BookmarkValue? bookmark) + public bool TryGet([NotNullWhen(true)] out BufferPosition? bookmark) { lock (_sync) { @@ -63,7 +63,7 @@ public bool TryGet([NotNullWhen(true)] out BookmarkValue? bookmark) } } - public bool TrySet(BookmarkValue value, bool sync = true) + public bool TrySet(BufferPosition value, bool sync = true) { lock (_sync) { @@ -82,7 +82,7 @@ public bool TrySet(BookmarkValue value, bool sync = true) } } - static void Write(StoreDirectory storeDirectory, BookmarkName name, BookmarkValue value, bool fsync) + static void Write(StoreDirectory storeDirectory, BookmarkName name, BufferPosition value, bool fsync) { unsafe { @@ -93,7 +93,7 @@ static void Write(StoreDirectory storeDirectory, BookmarkName name, BookmarkValu } } - static (BookmarkName, BookmarkValue?) Read(StoreDirectory storeDirectory) + static (BookmarkName, BufferPosition?) Read(StoreDirectory storeDirectory) { // NOTE: This method shouldn't throw var bookmarks = new List<(string, BookmarkName, StoreFile)>(); @@ -131,14 +131,14 @@ static void Write(StoreDirectory storeDirectory, BookmarkName name, BookmarkValu Span bookmark = stackalloc byte[16]; if (file.CopyContentsTo(bookmark) != 16) throw new Exception("The bookmark is corrupted."); - return (bookmarkName, BookmarkValue.Decode(bookmark)); + return (bookmarkName, BufferPosition.Decode(bookmark)); } } catch { storeDirectory.TryDelete(fileName); - return (new BookmarkName(bookmarkName.Id + 1), new BookmarkValue()); + return (new BookmarkName(bookmarkName.Id + 1), new BufferPosition()); } } } diff --git a/src/SeqCli/Forwarder/Storage/BookmarkValue.cs b/src/SeqCli/Forwarder/Storage/BufferPosition.cs similarity index 70% rename from src/SeqCli/Forwarder/Storage/BookmarkValue.cs rename to src/SeqCli/Forwarder/Storage/BufferPosition.cs index 7d86f90b..4ee20e8b 100644 --- a/src/SeqCli/Forwarder/Storage/BookmarkValue.cs +++ b/src/SeqCli/Forwarder/Storage/BufferPosition.cs @@ -20,14 +20,14 @@ namespace SeqCli.Forwarder.Storage; /// /// The in-memory value of a bookmark. /// -readonly record struct BookmarkValue(ulong Id, long CommitHead) +readonly record struct BufferPosition(ulong ChunkId, long Offset) { public void EncodeTo(Span bookmark) { if (bookmark.Length != 16) throw new Exception($"Bookmark values must be 16 bytes (got {bookmark.Length})."); - BinaryPrimitives.WriteUInt64LittleEndian(bookmark, Id); - BinaryPrimitives.WriteInt64LittleEndian(bookmark[8..], CommitHead); + BinaryPrimitives.WriteUInt64LittleEndian(bookmark, ChunkId); + BinaryPrimitives.WriteInt64LittleEndian(bookmark[8..], Offset); } public byte[] Encode() @@ -38,17 +38,17 @@ public byte[] Encode() return buffer; } - public static BookmarkValue Decode(Span bookmark) + public static BufferPosition Decode(Span bookmark) { if (bookmark.Length != 16) throw new Exception($"Bookmark values must be 16 bytes (got {bookmark.Length})."); - var id = BinaryPrimitives.ReadUInt64LittleEndian(bookmark); - var commitHead = BinaryPrimitives.ReadInt64LittleEndian(bookmark[8..]); + var chunkId = BinaryPrimitives.ReadUInt64LittleEndian(bookmark[..8]); + var offset = BinaryPrimitives.ReadInt64LittleEndian(bookmark[8..]); - return new BookmarkValue + return new BufferPosition { - Id = id, - CommitHead = commitHead + ChunkId = chunkId, + Offset = offset }; } } diff --git a/src/SeqCli/Forwarder/Storage/BufferReader.cs b/src/SeqCli/Forwarder/Storage/BufferReader.cs index 6fba78e9..7a4721c9 100644 --- a/src/SeqCli/Forwarder/Storage/BufferReader.cs +++ b/src/SeqCli/Forwarder/Storage/BufferReader.cs @@ -27,16 +27,16 @@ namespace SeqCli.Forwarder.Storage; sealed class BufferReader { readonly StoreDirectory _storeDirectory; - BufferReaderHead? _discardingHead; - BufferReaderHead? _readHead; + BufferPosition? _discardingHead; + BufferPosition _readHead; List _sortedChunks; BufferReader(StoreDirectory storeDirectory) { - _sortedChunks = new List(); + _sortedChunks = []; _storeDirectory = storeDirectory; _discardingHead = null; - _readHead = null; + _readHead = new(0, 0); } public static BufferReader Open(StoreDirectory storeDirectory) @@ -86,7 +86,7 @@ from the underlying chunks. // If the chunk has changed (it may have been deleted externally) // then stop discarding - if (chunk.Name.Id != _discardingHead.Value.Chunk) + if (chunk.Name.Id != _discardingHead.Value.ChunkId) { _discardingHead = null; @@ -94,11 +94,11 @@ from the underlying chunks. break; } - var chunkHead = Head(chunk); + var chunkHead = Extents(chunk); // Attempt to fill the buffer with data from the underlying chunk if (!TryFillChunk(chunk, - chunkHead with { CommitHead = _discardingHead.Value.CommitHead }, + chunkHead with { CommitHead = _discardingHead.Value.Offset }, discardingBatchBuffer, out var fill)) { @@ -119,11 +119,11 @@ from the underlying chunks. _discardingHead = _discardingHead.Value with { - CommitHead = _discardingHead.Value.CommitHead + fill.Value + Offset = _discardingHead.Value.Offset + fill.Value }; - _readHead = _discardingHead; + _readHead = _discardingHead.Value; - var isChunkFinished = _discardingHead.Value.CommitHead == chunkHead.WriteHead; + var isChunkFinished = _discardingHead.Value.Offset == chunkHead.WriteHead; // If the chunk is finished or a newline is found then stop discarding if (firstNewlineIndex >= 0 || (isChunkFinished && _sortedChunks.Count > 1)) @@ -153,7 +153,7 @@ from the underlying chunks. var batchBuffer = rentedArray.AsSpan()[..maxSize]; var batchLength = 0; - BufferReaderHead? batchHead = null; + BufferPosition? batchHead = null; var chunkIndex = 0; // Try fill the buffer with as much data as possible @@ -161,7 +161,7 @@ from the underlying chunks. while (chunkIndex < _sortedChunks.Count) { var chunk = _sortedChunks[chunkIndex]; - var chunkHead = Head(chunk); + var chunkHead = Extents(chunk); if (!TryFillChunk(chunk, chunkHead, batchBuffer[batchLength..], out var fill)) { @@ -192,7 +192,7 @@ from the underlying chunks. // If this is the first chunk then we've hit an oversize payload if (chunkIndex == 0) { - _discardingHead = new BufferReaderHead(chunk.Name.Id, chunkHead.CommitHead + fill.Value); + _discardingHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + fill.Value); // Ensures we don't attempt to yield the data we've read batchHead = null; @@ -206,7 +206,7 @@ from the underlying chunks. } batchLength += fill.Value; - batchHead = new BufferReaderHead(chunk.Name.Id, chunkHead.CommitHead + fill.Value); + batchHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + fill.Value); chunkIndex += 1; } @@ -233,19 +233,26 @@ from the underlying chunks. /// This method does not throw. /// /// The new head to resume reading from. - public void AdvanceTo(BufferReaderHead newReaderHead) + public void AdvanceTo(BufferPosition newReaderHead) { var removeLength = 0; + foreach (var chunk in _sortedChunks) { // A portion of the chunk is being skipped - if (chunk.Name.Id == newReaderHead.Chunk) break; + if (chunk.Name.Id == newReaderHead.ChunkId) break; // The remainder of the chunk is being skipped - if (chunk.Name.Id < newReaderHead.Chunk) + if (chunk.Name.Id < newReaderHead.ChunkId) + { _storeDirectory.TryDelete(chunk.Name.ToString()); + } else - throw new Exception("Chunks are out of order."); + { + // We might end up here if a chunk in the middle of the range was + // deleted from disk, while a saved bookmark references that chunk. + break; + } removeLength += 1; } @@ -254,29 +261,27 @@ public void AdvanceTo(BufferReaderHead newReaderHead) _sortedChunks.RemoveRange(0, removeLength); } - BufferReaderChunkHead Head(BufferReaderChunk chunk) + BufferReaderChunkExtents Extents(BufferReaderChunk chunk) { - if (_readHead != null && chunk.Name.Id == _readHead.Value.Chunk) + if (chunk.Name.Id == _readHead.ChunkId) return chunk.Chunk.TryGetLength(out var writeHead) - ? new BufferReaderChunkHead(Math.Min(_readHead.Value.CommitHead, writeHead.Value), writeHead.Value) - : new BufferReaderChunkHead(_readHead.Value.CommitHead, _readHead.Value.CommitHead); + ? new BufferReaderChunkExtents(Math.Min(_readHead.Offset, writeHead.Value), writeHead.Value) + : new BufferReaderChunkExtents(_readHead.Offset, _readHead.Offset); chunk.Chunk.TryGetLength(out var length); - return new BufferReaderChunkHead(0, length ?? 0); + return new BufferReaderChunkExtents(0, length ?? 0); } void ReadChunks() { - var head = _readHead ?? new BufferReaderHead(0, 0); - - List chunks = new(); + List chunks = []; foreach (var (fileName, file) in _storeDirectory .List(candidateName => Path.GetExtension(candidateName) is ".clef")) { if (!ChunkName.TryParse(fileName, out var parsedChunkName)) continue; - if (parsedChunkName.Value.Id >= head.Chunk) + if (parsedChunkName.Value.Id >= _readHead.ChunkId) chunks.Add(new BufferReaderChunk(parsedChunkName.Value, file)); else // If the chunk is before the one we're expecting to read then delete it; we've already processed it @@ -299,15 +304,15 @@ void ReadChunks() } } - static bool TryFillChunk(BufferReaderChunk chunk, BufferReaderChunkHead chunkHead, Span buffer, + static bool TryFillChunk(BufferReaderChunk chunk, BufferReaderChunkExtents chunkExtents, Span buffer, [NotNullWhen(true)] out int? filled) { var remaining = buffer.Length; - var fill = (int)Math.Min(remaining, chunkHead.Unadvanced); + var fill = (int)Math.Min(remaining, chunkExtents.Unadvanced); try { - if (!chunk.TryCopyTo(buffer, chunkHead, fill)) + if (!chunk.TryCopyTo(buffer, chunkExtents, fill)) { filled = null; return false; diff --git a/src/SeqCli/Forwarder/Storage/BufferReaderBatch.cs b/src/SeqCli/Forwarder/Storage/BufferReaderBatch.cs index 34492a0b..ce72a974 100644 --- a/src/SeqCli/Forwarder/Storage/BufferReaderBatch.cs +++ b/src/SeqCli/Forwarder/Storage/BufferReaderBatch.cs @@ -27,7 +27,7 @@ readonly record struct BufferReaderBatch readonly ArrayPool? _pool; readonly byte[] _storage; - public BufferReaderBatch(BufferReaderHead readerHead, ArrayPool? pool, byte[] storage, int length) + public BufferReaderBatch(BufferPosition readerHead, ArrayPool? pool, byte[] storage, int length) { ReaderHead = readerHead; @@ -36,7 +36,7 @@ public BufferReaderBatch(BufferReaderHead readerHead, ArrayPool? pool, byt _length = length; } - public BufferReaderHead ReaderHead { get; } + public BufferPosition ReaderHead { get; } public ReadOnlySpan AsSpan() { diff --git a/src/SeqCli/Forwarder/Storage/BufferReaderChunk.cs b/src/SeqCli/Forwarder/Storage/BufferReaderChunk.cs index e957f3fc..d85cb20e 100644 --- a/src/SeqCli/Forwarder/Storage/BufferReaderChunk.cs +++ b/src/SeqCli/Forwarder/Storage/BufferReaderChunk.cs @@ -38,9 +38,9 @@ public void Dispose() _reader?.Item2.Dispose(); } - public bool TryCopyTo(Span buffer, BufferReaderChunkHead head, int fill) + public bool TryCopyTo(Span buffer, BufferReaderChunkExtents chunkExtents, int fill) { - var readEnd = head.CommitHead + fill; + var readEnd = chunkExtents.CommitHead + fill; if (_reader != null) if (_reader.Value.Item1 < readEnd) @@ -53,12 +53,12 @@ public bool TryCopyTo(Span buffer, BufferReaderChunkHead head, int fill) if (_reader == null) { - if (!Chunk.TryOpenRead(head.WriteHead, out var reader)) return false; + if (!Chunk.TryOpenRead(chunkExtents.WriteHead, out var reader)) return false; - _reader = (head.WriteHead, reader); + _reader = (chunkExtents.WriteHead, reader); } - _reader.Value.Item2.CopyTo(buffer, head.CommitHead, fill); + _reader.Value.Item2.CopyTo(buffer, chunkExtents.CommitHead, fill); return true; } diff --git a/src/SeqCli/Forwarder/Storage/BufferReaderChunkHead.cs b/src/SeqCli/Forwarder/Storage/BufferReaderChunkExtents.cs similarity index 81% rename from src/SeqCli/Forwarder/Storage/BufferReaderChunkHead.cs rename to src/SeqCli/Forwarder/Storage/BufferReaderChunkExtents.cs index 7969d254..5cc37a69 100644 --- a/src/SeqCli/Forwarder/Storage/BufferReaderChunkHead.cs +++ b/src/SeqCli/Forwarder/Storage/BufferReaderChunkExtents.cs @@ -15,9 +15,9 @@ namespace SeqCli.Forwarder.Storage; /// -/// The current position in a . +/// The current read and write positions in a . /// -readonly record struct BufferReaderChunkHead(long CommitHead, long WriteHead) +readonly record struct BufferReaderChunkExtents(long CommitHead, long WriteHead) { public long Unadvanced => WriteHead - CommitHead; } diff --git a/src/SeqCli/Forwarder/Storage/BufferReaderHead.cs b/src/SeqCli/Forwarder/Storage/BufferReaderHead.cs deleted file mode 100644 index f1f34217..00000000 --- a/src/SeqCli/Forwarder/Storage/BufferReaderHead.cs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright © Datalust Pty Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -namespace SeqCli.Forwarder.Storage; - -/// -/// A position in a . -/// -readonly record struct BufferReaderHead(ulong Chunk, long CommitHead); diff --git a/test/SeqCli.Tests/Forwarder/Storage/BookmarkTests.cs b/test/SeqCli.Tests/Forwarder/Storage/BookmarkTests.cs index 3d67ecbd..2e94f982 100644 --- a/test/SeqCli.Tests/Forwarder/Storage/BookmarkTests.cs +++ b/test/SeqCli.Tests/Forwarder/Storage/BookmarkTests.cs @@ -17,13 +17,13 @@ public void CreateSetGet() Assert.False(bookmark.TryGet(out var value)); Assert.Null(value); - Assert.True(bookmark.TrySet(new BookmarkValue(42, 1))); + Assert.True(bookmark.TrySet(new BufferPosition(42, 1))); Assert.True(bookmark.TryGet(out value)); - Assert.Equal(new BookmarkValue(42, 1), value.Value); + Assert.Equal(new BufferPosition(42, 1), value.Value); - Assert.True(bookmark.TrySet(new BookmarkValue(42, int.MaxValue))); + Assert.True(bookmark.TrySet(new BufferPosition(42, int.MaxValue))); Assert.True(bookmark.TryGet(out value)); - Assert.Equal(new BookmarkValue(42, int.MaxValue), value.Value); + Assert.Equal(new BufferPosition(42, int.MaxValue), value.Value); } [Fact] @@ -31,8 +31,8 @@ public void OpenDeletesOldBookmarks() { var directory = new InMemoryStoreDirectory(); - directory.Create($"{1L:x16}.bookmark", new BookmarkValue(3, 3478).Encode()); - directory.Create($"{3L:x16}.bookmark", new BookmarkValue(42, 17).Encode()); + directory.Create($"{1L:x16}.bookmark", new BufferPosition(3, 3478).Encode()); + directory.Create($"{3L:x16}.bookmark", new BufferPosition(42, 17).Encode()); Assert.Equal(2, directory.Files.Count); @@ -41,7 +41,7 @@ public void OpenDeletesOldBookmarks() Assert.Equal($"{3L:x16}.bookmark", directory.Files.Single().Key); Assert.True(bookmark.TryGet(out var value)); - Assert.Equal(new BookmarkValue(42, 17), value); + Assert.Equal(new BufferPosition(42, 17), value); } [Fact] @@ -49,7 +49,7 @@ public void OpenDeletesCorruptedBookmarks() { var directory = new InMemoryStoreDirectory(); - directory.Create($"{1L:x16}.bookmark", new BookmarkValue(3, 3478).Encode()); + directory.Create($"{1L:x16}.bookmark", new BufferPosition(3, 3478).Encode()); // This bookmark is invalid directory.Create($"{3L:x16}.bookmark", new byte[] { 1, 2, 3 }); @@ -58,7 +58,7 @@ public void OpenDeletesCorruptedBookmarks() Assert.Empty(directory.Files); - Assert.True(bookmark.TrySet(new BookmarkValue(42, 17))); + Assert.True(bookmark.TrySet(new BufferPosition(42, 17))); Assert.Equal($"{4L:x16}.bookmark", directory.Files.Single().Key); } @@ -68,16 +68,16 @@ public void OpenDeletesMisnamedBookmarks() { var directory = new InMemoryStoreDirectory(); - directory.Create($"{1L:x16}.bookmark", new BookmarkValue(3, 3478).Encode()); + directory.Create($"{1L:x16}.bookmark", new BufferPosition(3, 3478).Encode()); // This bookmark is invalid - directory.Create($"ff{3L:x16}.bookmark", new BookmarkValue(42, 17).Encode()); + directory.Create($"ff{3L:x16}.bookmark", new BufferPosition(42, 17).Encode()); var bookmark = Bookmark.Open(directory); Assert.Single(directory.Files); Assert.True(bookmark.TryGet(out var value)); - Assert.Equal(new BookmarkValue(3, 3478), value); + Assert.Equal(new BufferPosition(3, 3478), value); } } diff --git a/test/SeqCli.Tests/Forwarder/Storage/BufferTests.cs b/test/SeqCli.Tests/Forwarder/Storage/BufferTests.cs index 86129275..9b91660b 100644 --- a/test/SeqCli.Tests/Forwarder/Storage/BufferTests.cs +++ b/test/SeqCli.Tests/Forwarder/Storage/BufferTests.cs @@ -25,7 +25,7 @@ public void OpenAppendRead() Assert.False(reader.TryFillBatch(10, out _)); Assert.True(reader.TryFillBatch(10, out var batch)); var batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(1, 9), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(1, 9), batchBuffer.ReaderHead); Assert.Equal("{\"id\":1}\n"u8.ToArray(), batchBuffer.AsSpan().ToArray()); // Advance the reader @@ -39,7 +39,7 @@ public void OpenAppendRead() // Read the payload Assert.True(reader.TryFillBatch(10, out batch)); batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(1, 18), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(1, 18), batchBuffer.ReaderHead); Assert.Equal("{\"id\":2}\n"u8.ToArray(), batchBuffer.AsSpan().ToArray()); // Advance the reader @@ -67,7 +67,7 @@ public void ReadWaitsUntilCompleteDataOnLastChunk() Assert.True(reader.TryFillBatch(512, out var batch)); var batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(1, 9), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(1, 9), batchBuffer.ReaderHead); Assert.Equal("{\"id\":1}\n"u8.ToArray(), batchBuffer.AsSpan().ToArray()); } @@ -83,7 +83,7 @@ public void ReadDiscardsPreviouslyReadChunks() Assert.True(reader.TryFillBatch(512, out var batch)); var batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(2, 9), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(2, 9), batchBuffer.ReaderHead); Assert.Equal("{\"id\":1}\n{\"id\":2}\n"u8.ToArray(), batchBuffer.AsSpan().ToArray()); Assert.Equal(2, directory.Files.Count); @@ -100,7 +100,7 @@ public void ReadDiscardsPreviouslyReadChunks() Assert.False(reader.TryFillBatch(512, out _)); Assert.True(reader.TryFillBatch(512, out batch)); batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(3, 9), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(3, 9), batchBuffer.ReaderHead); Assert.Equal("{\"id\":3}\n"u8.ToArray(), batchBuffer.AsSpan().ToArray()); reader.AdvanceTo(batchBuffer.ReaderHead); @@ -108,6 +108,38 @@ public void ReadDiscardsPreviouslyReadChunks() Assert.Single(directory.Files); } + [Fact] + public void AdvancingToNonexistentLowerPositionRereadsAllChunks() + { + var directory = new InMemoryStoreDirectory(); + + directory.Create(new ChunkName(71).ToString(), "{\"id\":1}\n"u8.ToArray()); + directory.Create(new ChunkName(72).ToString(), "{\"id\":2}\n"u8.ToArray()); + + var reader = BufferReader.Open(directory); + + reader.AdvanceTo(new BufferPosition(60, 0)); + + Assert.True(reader.TryFillBatch(512, out var batch)); + var batchBuffer = batch.Value; + Assert.Equal(new BufferPosition(72, 9), batchBuffer.ReaderHead); + } + + [Fact] + public void AdvancingToNonexistentHigherPositionDiscardsAllChunks() + { + var directory = new InMemoryStoreDirectory(); + + directory.Create(new ChunkName(71).ToString(), "{\"id\":1}\n"u8.ToArray()); + directory.Create(new ChunkName(72).ToString(), "{\"id\":2}\n"u8.ToArray()); + + var reader = BufferReader.Open(directory); + + reader.AdvanceTo(new BufferPosition(80, 0)); + + Assert.False(reader.TryFillBatch(512, out _)); + } + [Fact] public void ReadDiscardsOversizePayloads() { @@ -136,7 +168,7 @@ public void ReadDoesNotDiscardAcrossFiles() Assert.True(reader.TryFillBatch(512, out var batch)); var batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(2, 9), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(2, 9), batchBuffer.ReaderHead); Assert.Equal("{\"id\":2}\n"u8.ToArray(), batchBuffer.AsSpan().ToArray()); } @@ -159,7 +191,7 @@ public void ReadStopsDiscardingOnExternalDelete() Assert.True(reader.TryFillBatch(512, out var batch)); var batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(2, 9), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(2, 9), batchBuffer.ReaderHead); Assert.Equal("{\"id\":2}\n"u8.ToArray(), batchBuffer.AsSpan().ToArray()); } @@ -181,7 +213,7 @@ public void ReadStopsDiscardingOnExternalCreate() Assert.True(reader.TryFillBatch(512, out var batch)); var batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(2, 9), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(2, 9), batchBuffer.ReaderHead); Assert.Equal("{\"id\":2}\n"u8.ToArray(), batchBuffer.AsSpan().ToArray()); } @@ -208,7 +240,7 @@ public void AppendRolloverOnWrite() Assert.True(reader.TryFillBatch(512, out var batch)); var batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(2, 9), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(2, 9), batchBuffer.ReaderHead); Assert.Equal("{\"id\":1}\n{\"id\":2}\n{\"id\":3}\n"u8.ToArray(), batchBuffer.AsSpan().ToArray()); reader.AdvanceTo(batchBuffer.ReaderHead); @@ -236,7 +268,7 @@ public void ExistingFilesAreReadonly() Assert.True(reader.TryFillBatch(512, out var batch)); var batchBuffer = batch.Value; - Assert.Equal(new BufferReaderHead(1, 9), batchBuffer.ReaderHead); + Assert.Equal(new BufferPosition(1, 9), batchBuffer.ReaderHead); } [Fact]