forked from datalust/seqcli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathForwardingChannel.cs
More file actions
95 lines (80 loc) · 3.08 KB
/
ForwardingChannel.cs
File metadata and controls
95 lines (80 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Seq.Api;
using SeqCli.Forwarder.Storage;
using SeqCli.Ingestion;
namespace SeqCli.Forwarder.Channel;
class ForwardingChannel
{
readonly ChannelWriter<ForwardingChannelEntry> _writer;
readonly Task _writeWorker, _readWorker;
readonly CancellationTokenSource _stop;
readonly CancellationToken _hardCancel;
public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark bookmark, SeqConnection connection, string? apiKey, CancellationToken hardCancel)
{
var channel = System.Threading.Channels.Channel.CreateBounded<ForwardingChannelEntry>(new BoundedChannelOptions(5)
{
SingleReader = false,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait,
});
_stop = CancellationTokenSource.CreateLinkedTokenSource(_hardCancel);
_hardCancel = hardCancel;
_writer = channel.Writer;
_writeWorker = Task.Run(async () =>
{
await foreach (var entry in channel.Reader.ReadAllAsync(hardCancel))
{
try
{
// TODO: chunk sizes, max chunks, ingestion log
appender.TryAppend(entry.Data.AsSpan(), 100_000_000);
entry.CompletionSource.SetResult();
}
catch (Exception e)
{
entry.CompletionSource.TrySetException(e);
}
}
}, cancellationToken: hardCancel);
_readWorker = Task.Run<Task>(async () =>
{
if (bookmark.TryGet(out var bookmarkValue))
{
reader.AdvanceTo(bookmarkValue.Value);
}
while (true)
{
if (_hardCancel.IsCancellationRequested) return;
if (!reader.TryFillBatch(1024 * 1024, out var batch))
{
await Task.Delay(100, hardCancel);
continue;
}
await LogShipper.ShipBuffer(connection, apiKey, batch.Value.AsArraySegment(), SendFailureHandling.Retry);
if (bookmark.TrySet(new BufferPosition(batch.Value.ReaderHead.ChunkId,
batch.Value.ReaderHead.Offset)))
{
reader.AdvanceTo(batch.Value.ReaderHead);
}
batch.Value.Return();
}
}, cancellationToken: hardCancel);
}
public async Task WriteAsync(byte[] storage, Range range, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource();
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _hardCancel);
await _writer.WriteAsync(new ForwardingChannelEntry(storage[range], tcs), cts.Token);
await tcs.Task;
}
public async Task StopAsync()
{
await _stop.CancelAsync();
_writer.Complete();
await _writeWorker;
await _readWorker;
}
}