Skip to content

Commit 1b3c383

Browse files
authored
Merge pull request #407 from nblumhardt/restart-from-bookmark
Initialize the reader when reloading from disk
2 parents 308b47d + cd6ac61 commit 1b3c383

File tree

10 files changed

+118
-102
lines changed

10 files changed

+118
-102
lines changed

src/SeqCli/Forwarder/Channel/ForwardingChannel.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
4848
{
4949
if (bookmark.TryGet(out var bookmarkValue))
5050
{
51-
// TODO: initialize reader
52-
// reader.AdvanceTo(bookmarkValue.Value);
51+
reader.AdvanceTo(bookmarkValue.Value);
5352
}
5453

5554
while (true)
@@ -64,8 +63,8 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
6463

6564
await LogShipper.ShipBuffer(connection, apiKey, batch.Value.AsArraySegment(), SendFailureHandling.Retry);
6665

67-
if (bookmark.TrySet(new BookmarkValue(batch.Value.ReaderHead.Chunk,
68-
batch.Value.ReaderHead.CommitHead)))
66+
if (bookmark.TrySet(new BufferPosition(batch.Value.ReaderHead.ChunkId,
67+
batch.Value.ReaderHead.Offset)))
6968
{
7069
reader.AdvanceTo(batch.Value.ReaderHead);
7170
}

src/SeqCli/Forwarder/Storage/Bookmark.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ sealed class Bookmark
3232

3333
readonly Lock _sync = new();
3434
BookmarkName _name;
35-
BookmarkValue? _value;
35+
BufferPosition? _value;
3636

37-
Bookmark(StoreDirectory storeDirectory, BookmarkName name, BookmarkValue? value)
37+
Bookmark(StoreDirectory storeDirectory, BookmarkName name, BufferPosition? value)
3838
{
3939
_storeDirectory = storeDirectory;
4040
_name = name;
@@ -48,7 +48,7 @@ public static Bookmark Open(StoreDirectory storeDirectory)
4848
return new Bookmark(storeDirectory, name, value);
4949
}
5050

51-
public bool TryGet([NotNullWhen(true)] out BookmarkValue? bookmark)
51+
public bool TryGet([NotNullWhen(true)] out BufferPosition? bookmark)
5252
{
5353
lock (_sync)
5454
{
@@ -63,7 +63,7 @@ public bool TryGet([NotNullWhen(true)] out BookmarkValue? bookmark)
6363
}
6464
}
6565

66-
public bool TrySet(BookmarkValue value, bool sync = true)
66+
public bool TrySet(BufferPosition value, bool sync = true)
6767
{
6868
lock (_sync)
6969
{
@@ -82,7 +82,7 @@ public bool TrySet(BookmarkValue value, bool sync = true)
8282
}
8383
}
8484

85-
static void Write(StoreDirectory storeDirectory, BookmarkName name, BookmarkValue value, bool fsync)
85+
static void Write(StoreDirectory storeDirectory, BookmarkName name, BufferPosition value, bool fsync)
8686
{
8787
unsafe
8888
{
@@ -93,7 +93,7 @@ static void Write(StoreDirectory storeDirectory, BookmarkName name, BookmarkValu
9393
}
9494
}
9595

96-
static (BookmarkName, BookmarkValue?) Read(StoreDirectory storeDirectory)
96+
static (BookmarkName, BufferPosition?) Read(StoreDirectory storeDirectory)
9797
{
9898
// NOTE: This method shouldn't throw
9999
var bookmarks = new List<(string, BookmarkName, StoreFile)>();
@@ -131,14 +131,14 @@ static void Write(StoreDirectory storeDirectory, BookmarkName name, BookmarkValu
131131
Span<byte> bookmark = stackalloc byte[16];
132132
if (file.CopyContentsTo(bookmark) != 16) throw new Exception("The bookmark is corrupted.");
133133

134-
return (bookmarkName, BookmarkValue.Decode(bookmark));
134+
return (bookmarkName, BufferPosition.Decode(bookmark));
135135
}
136136
}
137137
catch
138138
{
139139
storeDirectory.TryDelete(fileName);
140140

141-
return (new BookmarkName(bookmarkName.Id + 1), new BookmarkValue());
141+
return (new BookmarkName(bookmarkName.Id + 1), new BufferPosition());
142142
}
143143
}
144144
}

src/SeqCli/Forwarder/Storage/BookmarkValue.cs renamed to src/SeqCli/Forwarder/Storage/BufferPosition.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ namespace SeqCli.Forwarder.Storage;
2020
/// <summary>
2121
/// The in-memory value of a bookmark.
2222
/// </summary>
23-
readonly record struct BookmarkValue(ulong Id, long CommitHead)
23+
readonly record struct BufferPosition(ulong ChunkId, long Offset)
2424
{
2525
public void EncodeTo(Span<byte> bookmark)
2626
{
2727
if (bookmark.Length != 16) throw new Exception($"Bookmark values must be 16 bytes (got {bookmark.Length}).");
2828

29-
BinaryPrimitives.WriteUInt64LittleEndian(bookmark, Id);
30-
BinaryPrimitives.WriteInt64LittleEndian(bookmark[8..], CommitHead);
29+
BinaryPrimitives.WriteUInt64LittleEndian(bookmark, ChunkId);
30+
BinaryPrimitives.WriteInt64LittleEndian(bookmark[8..], Offset);
3131
}
3232

3333
public byte[] Encode()
@@ -38,17 +38,17 @@ public byte[] Encode()
3838
return buffer;
3939
}
4040

41-
public static BookmarkValue Decode(Span<byte> bookmark)
41+
public static BufferPosition Decode(Span<byte> bookmark)
4242
{
4343
if (bookmark.Length != 16) throw new Exception($"Bookmark values must be 16 bytes (got {bookmark.Length}).");
4444

45-
var id = BinaryPrimitives.ReadUInt64LittleEndian(bookmark);
46-
var commitHead = BinaryPrimitives.ReadInt64LittleEndian(bookmark[8..]);
45+
var chunkId = BinaryPrimitives.ReadUInt64LittleEndian(bookmark[..8]);
46+
var offset = BinaryPrimitives.ReadInt64LittleEndian(bookmark[8..]);
4747

48-
return new BookmarkValue
48+
return new BufferPosition
4949
{
50-
Id = id,
51-
CommitHead = commitHead
50+
ChunkId = chunkId,
51+
Offset = offset
5252
};
5353
}
5454
}

src/SeqCli/Forwarder/Storage/BufferReader.cs

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,16 @@ namespace SeqCli.Forwarder.Storage;
2727
sealed class BufferReader
2828
{
2929
readonly StoreDirectory _storeDirectory;
30-
BufferReaderHead? _discardingHead;
31-
BufferReaderHead? _readHead;
30+
BufferPosition? _discardingHead;
31+
BufferPosition _readHead;
3232
List<BufferReaderChunk> _sortedChunks;
3333

3434
BufferReader(StoreDirectory storeDirectory)
3535
{
36-
_sortedChunks = new List<BufferReaderChunk>();
36+
_sortedChunks = [];
3737
_storeDirectory = storeDirectory;
3838
_discardingHead = null;
39-
_readHead = null;
39+
_readHead = new(0, 0);
4040
}
4141

4242
public static BufferReader Open(StoreDirectory storeDirectory)
@@ -86,19 +86,19 @@ from the underlying chunks.
8686

8787
// If the chunk has changed (it may have been deleted externally)
8888
// then stop discarding
89-
if (chunk.Name.Id != _discardingHead.Value.Chunk)
89+
if (chunk.Name.Id != _discardingHead.Value.ChunkId)
9090
{
9191
_discardingHead = null;
9292

9393
ArrayPool<byte>.Shared.Return(discardingRentedArray);
9494
break;
9595
}
9696

97-
var chunkHead = Head(chunk);
97+
var chunkHead = Extents(chunk);
9898

9999
// Attempt to fill the buffer with data from the underlying chunk
100100
if (!TryFillChunk(chunk,
101-
chunkHead with { CommitHead = _discardingHead.Value.CommitHead },
101+
chunkHead with { CommitHead = _discardingHead.Value.Offset },
102102
discardingBatchBuffer,
103103
out var fill))
104104
{
@@ -119,11 +119,11 @@ from the underlying chunks.
119119

120120
_discardingHead = _discardingHead.Value with
121121
{
122-
CommitHead = _discardingHead.Value.CommitHead + fill.Value
122+
Offset = _discardingHead.Value.Offset + fill.Value
123123
};
124-
_readHead = _discardingHead;
124+
_readHead = _discardingHead.Value;
125125

126-
var isChunkFinished = _discardingHead.Value.CommitHead == chunkHead.WriteHead;
126+
var isChunkFinished = _discardingHead.Value.Offset == chunkHead.WriteHead;
127127

128128
// If the chunk is finished or a newline is found then stop discarding
129129
if (firstNewlineIndex >= 0 || (isChunkFinished && _sortedChunks.Count > 1))
@@ -153,15 +153,15 @@ from the underlying chunks.
153153
var batchBuffer = rentedArray.AsSpan()[..maxSize];
154154
var batchLength = 0;
155155

156-
BufferReaderHead? batchHead = null;
156+
BufferPosition? batchHead = null;
157157
var chunkIndex = 0;
158158

159159
// Try fill the buffer with as much data as possible
160160
// by walking over all chunks
161161
while (chunkIndex < _sortedChunks.Count)
162162
{
163163
var chunk = _sortedChunks[chunkIndex];
164-
var chunkHead = Head(chunk);
164+
var chunkHead = Extents(chunk);
165165

166166
if (!TryFillChunk(chunk, chunkHead, batchBuffer[batchLength..], out var fill))
167167
{
@@ -192,7 +192,7 @@ from the underlying chunks.
192192
// If this is the first chunk then we've hit an oversize payload
193193
if (chunkIndex == 0)
194194
{
195-
_discardingHead = new BufferReaderHead(chunk.Name.Id, chunkHead.CommitHead + fill.Value);
195+
_discardingHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + fill.Value);
196196

197197
// Ensures we don't attempt to yield the data we've read
198198
batchHead = null;
@@ -206,7 +206,7 @@ from the underlying chunks.
206206
}
207207

208208
batchLength += fill.Value;
209-
batchHead = new BufferReaderHead(chunk.Name.Id, chunkHead.CommitHead + fill.Value);
209+
batchHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + fill.Value);
210210

211211
chunkIndex += 1;
212212
}
@@ -233,19 +233,26 @@ from the underlying chunks.
233233
/// This method does not throw.
234234
/// </summary>
235235
/// <param name="newReaderHead">The new head to resume reading from.</param>
236-
public void AdvanceTo(BufferReaderHead newReaderHead)
236+
public void AdvanceTo(BufferPosition newReaderHead)
237237
{
238238
var removeLength = 0;
239+
239240
foreach (var chunk in _sortedChunks)
240241
{
241242
// A portion of the chunk is being skipped
242-
if (chunk.Name.Id == newReaderHead.Chunk) break;
243+
if (chunk.Name.Id == newReaderHead.ChunkId) break;
243244

244245
// The remainder of the chunk is being skipped
245-
if (chunk.Name.Id < newReaderHead.Chunk)
246+
if (chunk.Name.Id < newReaderHead.ChunkId)
247+
{
246248
_storeDirectory.TryDelete(chunk.Name.ToString());
249+
}
247250
else
248-
throw new Exception("Chunks are out of order.");
251+
{
252+
// We might end up here if a chunk in the middle of the range was
253+
// deleted from disk, while a saved bookmark references that chunk.
254+
break;
255+
}
249256

250257
removeLength += 1;
251258
}
@@ -254,29 +261,27 @@ public void AdvanceTo(BufferReaderHead newReaderHead)
254261
_sortedChunks.RemoveRange(0, removeLength);
255262
}
256263

257-
BufferReaderChunkHead Head(BufferReaderChunk chunk)
264+
BufferReaderChunkExtents Extents(BufferReaderChunk chunk)
258265
{
259-
if (_readHead != null && chunk.Name.Id == _readHead.Value.Chunk)
266+
if (chunk.Name.Id == _readHead.ChunkId)
260267
return chunk.Chunk.TryGetLength(out var writeHead)
261-
? new BufferReaderChunkHead(Math.Min(_readHead.Value.CommitHead, writeHead.Value), writeHead.Value)
262-
: new BufferReaderChunkHead(_readHead.Value.CommitHead, _readHead.Value.CommitHead);
268+
? new BufferReaderChunkExtents(Math.Min(_readHead.Offset, writeHead.Value), writeHead.Value)
269+
: new BufferReaderChunkExtents(_readHead.Offset, _readHead.Offset);
263270

264271
chunk.Chunk.TryGetLength(out var length);
265-
return new BufferReaderChunkHead(0, length ?? 0);
272+
return new BufferReaderChunkExtents(0, length ?? 0);
266273
}
267274

268275
void ReadChunks()
269276
{
270-
var head = _readHead ?? new BufferReaderHead(0, 0);
271-
272-
List<BufferReaderChunk> chunks = new();
277+
List<BufferReaderChunk> chunks = [];
273278

274279
foreach (var (fileName, file) in _storeDirectory
275280
.List(candidateName => Path.GetExtension(candidateName) is ".clef"))
276281
{
277282
if (!ChunkName.TryParse(fileName, out var parsedChunkName)) continue;
278283

279-
if (parsedChunkName.Value.Id >= head.Chunk)
284+
if (parsedChunkName.Value.Id >= _readHead.ChunkId)
280285
chunks.Add(new BufferReaderChunk(parsedChunkName.Value, file));
281286
else
282287
// If the chunk is before the one we're expecting to read then delete it; we've already processed it
@@ -299,15 +304,15 @@ void ReadChunks()
299304
}
300305
}
301306

302-
static bool TryFillChunk(BufferReaderChunk chunk, BufferReaderChunkHead chunkHead, Span<byte> buffer,
307+
static bool TryFillChunk(BufferReaderChunk chunk, BufferReaderChunkExtents chunkExtents, Span<byte> buffer,
303308
[NotNullWhen(true)] out int? filled)
304309
{
305310
var remaining = buffer.Length;
306-
var fill = (int)Math.Min(remaining, chunkHead.Unadvanced);
311+
var fill = (int)Math.Min(remaining, chunkExtents.Unadvanced);
307312

308313
try
309314
{
310-
if (!chunk.TryCopyTo(buffer, chunkHead, fill))
315+
if (!chunk.TryCopyTo(buffer, chunkExtents, fill))
311316
{
312317
filled = null;
313318
return false;

src/SeqCli/Forwarder/Storage/BufferReaderBatch.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ readonly record struct BufferReaderBatch
2727
readonly ArrayPool<byte>? _pool;
2828
readonly byte[] _storage;
2929

30-
public BufferReaderBatch(BufferReaderHead readerHead, ArrayPool<byte>? pool, byte[] storage, int length)
30+
public BufferReaderBatch(BufferPosition readerHead, ArrayPool<byte>? pool, byte[] storage, int length)
3131
{
3232
ReaderHead = readerHead;
3333

@@ -36,7 +36,7 @@ public BufferReaderBatch(BufferReaderHead readerHead, ArrayPool<byte>? pool, byt
3636
_length = length;
3737
}
3838

39-
public BufferReaderHead ReaderHead { get; }
39+
public BufferPosition ReaderHead { get; }
4040

4141
public ReadOnlySpan<byte> AsSpan()
4242
{

src/SeqCli/Forwarder/Storage/BufferReaderChunk.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public void Dispose()
3838
_reader?.Item2.Dispose();
3939
}
4040

41-
public bool TryCopyTo(Span<byte> buffer, BufferReaderChunkHead head, int fill)
41+
public bool TryCopyTo(Span<byte> buffer, BufferReaderChunkExtents chunkExtents, int fill)
4242
{
43-
var readEnd = head.CommitHead + fill;
43+
var readEnd = chunkExtents.CommitHead + fill;
4444

4545
if (_reader != null)
4646
if (_reader.Value.Item1 < readEnd)
@@ -53,12 +53,12 @@ public bool TryCopyTo(Span<byte> buffer, BufferReaderChunkHead head, int fill)
5353

5454
if (_reader == null)
5555
{
56-
if (!Chunk.TryOpenRead(head.WriteHead, out var reader)) return false;
56+
if (!Chunk.TryOpenRead(chunkExtents.WriteHead, out var reader)) return false;
5757

58-
_reader = (head.WriteHead, reader);
58+
_reader = (chunkExtents.WriteHead, reader);
5959
}
6060

61-
_reader.Value.Item2.CopyTo(buffer, head.CommitHead, fill);
61+
_reader.Value.Item2.CopyTo(buffer, chunkExtents.CommitHead, fill);
6262

6363
return true;
6464
}

src/SeqCli/Forwarder/Storage/BufferReaderChunkHead.cs renamed to src/SeqCli/Forwarder/Storage/BufferReaderChunkExtents.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
namespace SeqCli.Forwarder.Storage;
1616

1717
/// <summary>
18-
/// The current position in a <see cref="BufferReaderChunk" />.
18+
/// The current read and write positions in a <see cref="BufferReaderChunk" />.
1919
/// </summary>
20-
readonly record struct BufferReaderChunkHead(long CommitHead, long WriteHead)
20+
readonly record struct BufferReaderChunkExtents(long CommitHead, long WriteHead)
2121
{
2222
public long Unadvanced => WriteHead - CommitHead;
2323
}

src/SeqCli/Forwarder/Storage/BufferReaderHead.cs

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)