Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/SeqCli/Forwarder/Channel/ForwardingChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
{
if (bookmark.TryGet(out var bookmarkValue))
{
// TODO: initialize reader
// reader.AdvanceTo(bookmarkValue.Value);
reader.AdvanceTo(bookmarkValue.Value);
}

while (true)
Expand All @@ -64,8 +63,8 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark

await LogShipper.ShipBuffer(connection, apiKey, batch.Value.AsArraySegment(), SendFailureHandling.Retry);

if (bookmark.TrySet(new BookmarkValue(batch.Value.ReaderHead.Chunk,
batch.Value.ReaderHead.CommitHead)))
if (bookmark.TrySet(new BufferPosition(batch.Value.ReaderHead.ChunkId,
batch.Value.ReaderHead.Offset)))
{
reader.AdvanceTo(batch.Value.ReaderHead);
}
Expand Down
16 changes: 8 additions & 8 deletions src/SeqCli/Forwarder/Storage/Bookmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ sealed class Bookmark

readonly Lock _sync = new();
BookmarkName _name;
BookmarkValue? _value;
BufferPosition? _value;

Bookmark(StoreDirectory storeDirectory, BookmarkName name, BookmarkValue? value)
Bookmark(StoreDirectory storeDirectory, BookmarkName name, BufferPosition? value)
{
_storeDirectory = storeDirectory;
_name = name;
Expand All @@ -48,7 +48,7 @@ public static Bookmark Open(StoreDirectory storeDirectory)
return new Bookmark(storeDirectory, name, value);
}

public bool TryGet([NotNullWhen(true)] out BookmarkValue? bookmark)
public bool TryGet([NotNullWhen(true)] out BufferPosition? bookmark)
{
lock (_sync)
{
Expand All @@ -63,7 +63,7 @@ public bool TryGet([NotNullWhen(true)] out BookmarkValue? bookmark)
}
}

public bool TrySet(BookmarkValue value, bool sync = true)
public bool TrySet(BufferPosition value, bool sync = true)
{
lock (_sync)
{
Expand All @@ -82,7 +82,7 @@ public bool TrySet(BookmarkValue value, bool sync = true)
}
}

static void Write(StoreDirectory storeDirectory, BookmarkName name, BookmarkValue value, bool fsync)
static void Write(StoreDirectory storeDirectory, BookmarkName name, BufferPosition value, bool fsync)
{
unsafe
{
Expand All @@ -93,7 +93,7 @@ static void Write(StoreDirectory storeDirectory, BookmarkName name, BookmarkValu
}
}

static (BookmarkName, BookmarkValue?) Read(StoreDirectory storeDirectory)
static (BookmarkName, BufferPosition?) Read(StoreDirectory storeDirectory)
{
// NOTE: This method shouldn't throw
var bookmarks = new List<(string, BookmarkName, StoreFile)>();
Expand Down Expand Up @@ -131,14 +131,14 @@ static void Write(StoreDirectory storeDirectory, BookmarkName name, BookmarkValu
Span<byte> bookmark = stackalloc byte[16];
if (file.CopyContentsTo(bookmark) != 16) throw new Exception("The bookmark is corrupted.");

return (bookmarkName, BookmarkValue.Decode(bookmark));
return (bookmarkName, BufferPosition.Decode(bookmark));
}
}
catch
{
storeDirectory.TryDelete(fileName);

return (new BookmarkName(bookmarkName.Id + 1), new BookmarkValue());
return (new BookmarkName(bookmarkName.Id + 1), new BufferPosition());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ namespace SeqCli.Forwarder.Storage;
/// <summary>
/// The in-memory value of a bookmark.
/// </summary>
readonly record struct BookmarkValue(ulong Id, long CommitHead)
readonly record struct BufferPosition(ulong ChunkId, long Offset)
{
public void EncodeTo(Span<byte> bookmark)
{
if (bookmark.Length != 16) throw new Exception($"Bookmark values must be 16 bytes (got {bookmark.Length}).");

BinaryPrimitives.WriteUInt64LittleEndian(bookmark, Id);
BinaryPrimitives.WriteInt64LittleEndian(bookmark[8..], CommitHead);
BinaryPrimitives.WriteUInt64LittleEndian(bookmark, ChunkId);
BinaryPrimitives.WriteInt64LittleEndian(bookmark[8..], Offset);
}

public byte[] Encode()
Expand All @@ -38,17 +38,17 @@ public byte[] Encode()
return buffer;
}

public static BookmarkValue Decode(Span<byte> bookmark)
public static BufferPosition Decode(Span<byte> bookmark)
{
if (bookmark.Length != 16) throw new Exception($"Bookmark values must be 16 bytes (got {bookmark.Length}).");

var id = BinaryPrimitives.ReadUInt64LittleEndian(bookmark);
var commitHead = BinaryPrimitives.ReadInt64LittleEndian(bookmark[8..]);
var chunkId = BinaryPrimitives.ReadUInt64LittleEndian(bookmark[..8]);
var offset = BinaryPrimitives.ReadInt64LittleEndian(bookmark[8..]);

return new BookmarkValue
return new BufferPosition
{
Id = id,
CommitHead = commitHead
ChunkId = chunkId,
Offset = offset
};
}
}
65 changes: 35 additions & 30 deletions src/SeqCli/Forwarder/Storage/BufferReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ namespace SeqCli.Forwarder.Storage;
sealed class BufferReader
{
readonly StoreDirectory _storeDirectory;
BufferReaderHead? _discardingHead;
BufferReaderHead? _readHead;
BufferPosition? _discardingHead;
BufferPosition _readHead;
List<BufferReaderChunk> _sortedChunks;

BufferReader(StoreDirectory storeDirectory)
{
_sortedChunks = new List<BufferReaderChunk>();
_sortedChunks = [];
_storeDirectory = storeDirectory;
_discardingHead = null;
_readHead = null;
_readHead = new(0, 0);
}

public static BufferReader Open(StoreDirectory storeDirectory)
Expand Down Expand Up @@ -86,19 +86,19 @@ from the underlying chunks.

// If the chunk has changed (it may have been deleted externally)
// then stop discarding
if (chunk.Name.Id != _discardingHead.Value.Chunk)
if (chunk.Name.Id != _discardingHead.Value.ChunkId)
{
_discardingHead = null;

ArrayPool<byte>.Shared.Return(discardingRentedArray);
break;
}

var chunkHead = Head(chunk);
var chunkHead = Extents(chunk);

// Attempt to fill the buffer with data from the underlying chunk
if (!TryFillChunk(chunk,
chunkHead with { CommitHead = _discardingHead.Value.CommitHead },
chunkHead with { CommitHead = _discardingHead.Value.Offset },
discardingBatchBuffer,
out var fill))
{
Expand All @@ -119,11 +119,11 @@ from the underlying chunks.

_discardingHead = _discardingHead.Value with
{
CommitHead = _discardingHead.Value.CommitHead + fill.Value
Offset = _discardingHead.Value.Offset + fill.Value
};
_readHead = _discardingHead;
_readHead = _discardingHead.Value;

var isChunkFinished = _discardingHead.Value.CommitHead == chunkHead.WriteHead;
var isChunkFinished = _discardingHead.Value.Offset == chunkHead.WriteHead;

// If the chunk is finished or a newline is found then stop discarding
if (firstNewlineIndex >= 0 || (isChunkFinished && _sortedChunks.Count > 1))
Expand Down Expand Up @@ -153,15 +153,15 @@ from the underlying chunks.
var batchBuffer = rentedArray.AsSpan()[..maxSize];
var batchLength = 0;

BufferReaderHead? batchHead = null;
BufferPosition? batchHead = null;
var chunkIndex = 0;

// Try fill the buffer with as much data as possible
// by walking over all chunks
while (chunkIndex < _sortedChunks.Count)
{
var chunk = _sortedChunks[chunkIndex];
var chunkHead = Head(chunk);
var chunkHead = Extents(chunk);

if (!TryFillChunk(chunk, chunkHead, batchBuffer[batchLength..], out var fill))
{
Expand Down Expand Up @@ -192,7 +192,7 @@ from the underlying chunks.
// If this is the first chunk then we've hit an oversize payload
if (chunkIndex == 0)
{
_discardingHead = new BufferReaderHead(chunk.Name.Id, chunkHead.CommitHead + fill.Value);
_discardingHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + fill.Value);

// Ensures we don't attempt to yield the data we've read
batchHead = null;
Expand All @@ -206,7 +206,7 @@ from the underlying chunks.
}

batchLength += fill.Value;
batchHead = new BufferReaderHead(chunk.Name.Id, chunkHead.CommitHead + fill.Value);
batchHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + fill.Value);

chunkIndex += 1;
}
Expand All @@ -233,19 +233,26 @@ from the underlying chunks.
/// This method does not throw.
/// </summary>
/// <param name="newReaderHead">The new head to resume reading from.</param>
public void AdvanceTo(BufferReaderHead newReaderHead)
public void AdvanceTo(BufferPosition newReaderHead)
{
var removeLength = 0;

foreach (var chunk in _sortedChunks)
{
// A portion of the chunk is being skipped
if (chunk.Name.Id == newReaderHead.Chunk) break;
if (chunk.Name.Id == newReaderHead.ChunkId) break;

// The remainder of the chunk is being skipped
if (chunk.Name.Id < newReaderHead.Chunk)
if (chunk.Name.Id < newReaderHead.ChunkId)
{
_storeDirectory.TryDelete(chunk.Name.ToString());
}
else
throw new Exception("Chunks are out of order.");
{
// We might end up here if a chunk in the middle of the range was
// deleted from disk, while a saved bookmark references that chunk.
break;
}

removeLength += 1;
}
Expand All @@ -254,29 +261,27 @@ public void AdvanceTo(BufferReaderHead newReaderHead)
_sortedChunks.RemoveRange(0, removeLength);
}

BufferReaderChunkHead Head(BufferReaderChunk chunk)
BufferReaderChunkExtents Extents(BufferReaderChunk chunk)
{
if (_readHead != null && chunk.Name.Id == _readHead.Value.Chunk)
if (chunk.Name.Id == _readHead.ChunkId)
return chunk.Chunk.TryGetLength(out var writeHead)
? new BufferReaderChunkHead(Math.Min(_readHead.Value.CommitHead, writeHead.Value), writeHead.Value)
: new BufferReaderChunkHead(_readHead.Value.CommitHead, _readHead.Value.CommitHead);
? new BufferReaderChunkExtents(Math.Min(_readHead.Offset, writeHead.Value), writeHead.Value)
: new BufferReaderChunkExtents(_readHead.Offset, _readHead.Offset);

chunk.Chunk.TryGetLength(out var length);
return new BufferReaderChunkHead(0, length ?? 0);
return new BufferReaderChunkExtents(0, length ?? 0);
}

void ReadChunks()
{
var head = _readHead ?? new BufferReaderHead(0, 0);

List<BufferReaderChunk> chunks = new();
List<BufferReaderChunk> chunks = [];

foreach (var (fileName, file) in _storeDirectory
.List(candidateName => Path.GetExtension(candidateName) is ".clef"))
{
if (!ChunkName.TryParse(fileName, out var parsedChunkName)) continue;

if (parsedChunkName.Value.Id >= head.Chunk)
if (parsedChunkName.Value.Id >= _readHead.ChunkId)
chunks.Add(new BufferReaderChunk(parsedChunkName.Value, file));
else
// If the chunk is before the one we're expecting to read then delete it; we've already processed it
Expand All @@ -299,15 +304,15 @@ void ReadChunks()
}
}

static bool TryFillChunk(BufferReaderChunk chunk, BufferReaderChunkHead chunkHead, Span<byte> buffer,
static bool TryFillChunk(BufferReaderChunk chunk, BufferReaderChunkExtents chunkExtents, Span<byte> buffer,
[NotNullWhen(true)] out int? filled)
{
var remaining = buffer.Length;
var fill = (int)Math.Min(remaining, chunkHead.Unadvanced);
var fill = (int)Math.Min(remaining, chunkExtents.Unadvanced);

try
{
if (!chunk.TryCopyTo(buffer, chunkHead, fill))
if (!chunk.TryCopyTo(buffer, chunkExtents, fill))
{
filled = null;
return false;
Expand Down
4 changes: 2 additions & 2 deletions src/SeqCli/Forwarder/Storage/BufferReaderBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ readonly record struct BufferReaderBatch
readonly ArrayPool<byte>? _pool;
readonly byte[] _storage;

public BufferReaderBatch(BufferReaderHead readerHead, ArrayPool<byte>? pool, byte[] storage, int length)
public BufferReaderBatch(BufferPosition readerHead, ArrayPool<byte>? pool, byte[] storage, int length)
{
ReaderHead = readerHead;

Expand All @@ -36,7 +36,7 @@ public BufferReaderBatch(BufferReaderHead readerHead, ArrayPool<byte>? pool, byt
_length = length;
}

public BufferReaderHead ReaderHead { get; }
public BufferPosition ReaderHead { get; }

public ReadOnlySpan<byte> AsSpan()
{
Expand Down
10 changes: 5 additions & 5 deletions src/SeqCli/Forwarder/Storage/BufferReaderChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public void Dispose()
_reader?.Item2.Dispose();
}

public bool TryCopyTo(Span<byte> buffer, BufferReaderChunkHead head, int fill)
public bool TryCopyTo(Span<byte> buffer, BufferReaderChunkExtents chunkExtents, int fill)
{
var readEnd = head.CommitHead + fill;
var readEnd = chunkExtents.CommitHead + fill;

if (_reader != null)
if (_reader.Value.Item1 < readEnd)
Expand All @@ -53,12 +53,12 @@ public bool TryCopyTo(Span<byte> buffer, BufferReaderChunkHead head, int fill)

if (_reader == null)
{
if (!Chunk.TryOpenRead(head.WriteHead, out var reader)) return false;
if (!Chunk.TryOpenRead(chunkExtents.WriteHead, out var reader)) return false;

_reader = (head.WriteHead, reader);
_reader = (chunkExtents.WriteHead, reader);
}

_reader.Value.Item2.CopyTo(buffer, head.CommitHead, fill);
_reader.Value.Item2.CopyTo(buffer, chunkExtents.CommitHead, fill);

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
namespace SeqCli.Forwarder.Storage;

/// <summary>
/// The current position in a <see cref="BufferReaderChunk" />.
/// The current read and write positions in a <see cref="BufferReaderChunk" />.
/// </summary>
readonly record struct BufferReaderChunkHead(long CommitHead, long WriteHead)
readonly record struct BufferReaderChunkExtents(long CommitHead, long WriteHead)
{
public long Unadvanced => WriteHead - CommitHead;
}
20 changes: 0 additions & 20 deletions src/SeqCli/Forwarder/Storage/BufferReaderHead.cs

This file was deleted.

Loading