Skip to content

Commit 8718668

Browse files
authored
Merge pull request #408 from nblumhardt/4xx-both-directions
Validate ingest batches, apply size limits, drop batches that trigger `400` ingestion responses
2 parents 9138e15 + 40d1411 commit 8718668

File tree

12 files changed

+229
-93
lines changed

12 files changed

+229
-93
lines changed

src/SeqCli/Cli/Commands/IngestCommand.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
using System;
1616
using System.Collections.Generic;
17+
using System.Threading;
1718
using System.Threading.Tasks;
1819
using SeqCli.Api;
1920
using SeqCli.Cli.Features;
@@ -120,14 +121,15 @@ protected override async Task<int> Run()
120121
if (_message != null)
121122
reader = new StaticMessageTemplateReader(reader, _message);
122123

123-
var exit = await LogShipper.ShipEvents(
124+
var exit = await LogShipper.ShipEventsAsync(
124125
connection,
125126
apiKey,
126127
reader,
127128
_invalidDataHandlingFeature.InvalidDataHandling,
128129
_sendFailureHandlingFeature.SendFailureHandling,
129130
batchSize,
130-
filter);
131+
filter,
132+
CancellationToken.None);
131133

132134
if (exit != 0)
133135
return exit;

src/SeqCli/Config/Forwarder/SeqCliForwarderStorageConfig.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ namespace SeqCli.Config.Forwarder;
44

55
public class SeqCliForwarderStorageConfig
66
{
7-
public ulong BufferSizeBytes { get; set; } = 67_108_864;
8-
}
7+
public long TargetChunkSizeBytes { get; set; } = 10 * 512 * 1024;
8+
public int? MaxChunks { get; set; } = null;
9+
}

src/SeqCli/Config/SeqCliConnectionConfig.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
using System.Text;
1717
using Newtonsoft.Json;
1818
using SeqCli.Encryptor;
19-
using SeqCli.Util;
2019

2120
namespace SeqCli.Config;
2221

@@ -56,6 +55,16 @@ public void EncodeApiKey(string? apiKey, IDataProtector dataProtector)
5655
}
5756

5857
public uint? PooledConnectionLifetimeMilliseconds { get; set; } = null;
59-
public ulong EventBodyLimitBytes { get; set; } = 256 * 1024;
60-
public ulong PayloadLimitBytes { get; set; } = 10 * 1024 * 1024;
58+
59+
/// <summary>
60+
/// The maximum event body size to send to the Seq server when ingesting events.
61+
/// When forwarding, this value is consulted only on the way in; on the way out, we let the target server reject any
62+
/// stragglers via 400 responses.
63+
/// </summary>
64+
public int EventSizeLimitBytes { get; set; } = 256 * 1024;
65+
66+
/// <summary>
67+
/// The maximum batch size to send to the Seq server when ingesting events.
68+
/// </summary>
69+
public int BatchSizeLimitBytes { get; set; } = 10 * 1024 * 1024;
6170
}

src/SeqCli/Forwarder/Channel/ForwardingChannel.cs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
using System;
2+
using System.IO;
23
using System.Threading;
34
using System.Threading.Channels;
45
using System.Threading.Tasks;
56
using Seq.Api;
7+
using SeqCli.Forwarder.Diagnostics;
68
using SeqCli.Forwarder.Storage;
79
using SeqCli.Ingestion;
810

@@ -15,7 +17,8 @@ class ForwardingChannel
1517
readonly CancellationTokenSource _stop;
1618
readonly CancellationToken _hardCancel;
1719

18-
public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark bookmark, SeqConnection connection, string? apiKey, CancellationToken hardCancel)
20+
public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark bookmark,
21+
SeqConnection connection, string? apiKey, long targetChunkSizeBytes, int? maxChunks, int batchSizeLimitBytes, CancellationToken hardCancel)
1922
{
2023
var channel = System.Threading.Channels.Channel.CreateBounded<ForwardingChannelEntry>(new BoundedChannelOptions(5)
2124
{
@@ -33,9 +36,21 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
3336
{
3437
try
3538
{
36-
// TODO: chunk sizes, max chunks, ingestion log
37-
appender.TryAppend(entry.Data.AsSpan(), 100_000_000);
38-
entry.CompletionSource.SetResult();
39+
const int maxTries = 3;
40+
for (var retry = 0; retry < maxTries; ++retry)
41+
{
42+
if (appender.TryAppend(entry.Data.AsSpan(), targetChunkSizeBytes, maxChunks))
43+
{
44+
entry.CompletionSource.SetResult();
45+
break;
46+
}
47+
48+
if (retry == maxTries - 1)
49+
{
50+
IngestionLog.Log.Error("Buffering failed due to an I/O error; the incoming chunk was rejected");
51+
entry.CompletionSource.TrySetException(new IOException("Buffering failed due to an I/O error."));
52+
}
53+
}
3954
}
4055
catch (Exception e)
4156
{
@@ -55,13 +70,13 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
5570
{
5671
if (_hardCancel.IsCancellationRequested) return;
5772

58-
if (!reader.TryFillBatch(1024 * 1024, out var batch))
73+
if (!reader.TryFillBatch(batchSizeLimitBytes, out var batch))
5974
{
6075
await Task.Delay(100, hardCancel);
6176
continue;
6277
}
6378

64-
await LogShipper.ShipBuffer(connection, apiKey, batch.Value.AsArraySegment(), SendFailureHandling.Retry);
79+
await LogShipper.ShipBufferAsync(connection, apiKey, batch.Value.AsArraySegment(), IngestionLog.Log, hardCancel);
6580

6681
if (bookmark.TrySet(new BufferPosition(batch.Value.ReaderHead.ChunkId,
6782
batch.Value.ReaderHead.Offset)))

src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading;
66
using System.Threading.Tasks;
77
using Seq.Api;
8+
using SeqCli.Config;
89
using SeqCli.Forwarder.Filesystem.System;
910
using SeqCli.Forwarder.Storage;
1011
using Serilog;
@@ -15,15 +16,17 @@ class ForwardingChannelMap
1516
{
1617
readonly string _bufferPath;
1718
readonly SeqConnection _connection;
19+
readonly SeqCliConfig _config;
1820
readonly ForwardingChannel _defaultChannel;
1921
readonly Lock _channelsSync = new();
2022
readonly Dictionary<string, ForwardingChannel> _channels = new();
2123
readonly CancellationTokenSource _shutdownTokenSource = new();
2224

23-
public ForwardingChannelMap(string bufferPath, SeqConnection connection, string? defaultApiKey)
25+
public ForwardingChannelMap(string bufferPath, SeqConnection connection, string? defaultApiKey, SeqCliConfig config)
2426
{
2527
_bufferPath = bufferPath;
2628
_connection = connection;
29+
_config = config;
2730
_defaultChannel = OpenOrCreateChannel(defaultApiKey, "Default");
2831

2932
// TODO, load other channels at start-up
@@ -43,6 +46,9 @@ ForwardingChannel OpenOrCreateChannel(string? apiKey, string name)
4346
Bookmark.Open(store),
4447
_connection,
4548
apiKey,
49+
_config.Forwarder.Storage.TargetChunkSizeBytes,
50+
_config.Forwarder.Storage.MaxChunks,
51+
_config.Connection.BatchSizeLimitBytes,
4652
_shutdownTokenSource.Token);
4753
}
4854

src/SeqCli/Forwarder/ForwarderModule.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public ForwarderModule(string bufferPath, SeqCliConfig config, SeqConnection con
4646
protected override void Load(ContainerBuilder builder)
4747
{
4848
builder.RegisterType<ServerService>().SingleInstance();
49-
builder.Register(_ => new ForwardingChannelMap(_bufferPath, _connection, _apiKey)).SingleInstance();
49+
builder.Register(_ => new ForwardingChannelMap(_bufferPath, _connection, _apiKey, _config)).SingleInstance();
5050

5151
builder.RegisterType<IngestionEndpoints>().As<IMapEndpoints>();
5252

src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs

Lines changed: 59 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,37 +16,40 @@
1616
using System.Buffers;
1717
using System.Diagnostics.CodeAnalysis;
1818
using System.Linq;
19+
using System.Net;
1920
using System.Text;
2021
using System.Text.Json;
2122
using System.Threading;
2223
using System.Threading.Tasks;
2324
using Microsoft.AspNetCore.Builder;
2425
using Microsoft.AspNetCore.Http;
2526
using Microsoft.Net.Http.Headers;
27+
using Seq.Api.Model.Shared;
2628
using SeqCli.Api;
29+
using SeqCli.Config;
2730
using SeqCli.Forwarder.Channel;
2831
using SeqCli.Forwarder.Diagnostics;
2932
using JsonException = System.Text.Json.JsonException;
3033

3134
namespace SeqCli.Forwarder.Web.Api;
3235

33-
// ReSharper disable UnusedMethodReturnValue.Local
34-
3536
class IngestionEndpoints : IMapEndpoints
3637
{
3738
static readonly Encoding Utf8 = new UTF8Encoding(false);
3839

3940
readonly ForwardingChannelMap _forwardingChannels;
41+
readonly SeqCliConfig _config;
4042

41-
public IngestionEndpoints(ForwardingChannelMap forwardingChannels)
43+
public IngestionEndpoints(ForwardingChannelMap forwardingChannels, SeqCliConfig config)
4244
{
4345
_forwardingChannels = forwardingChannels;
46+
_config = config;
4447
}
4548

4649
public void MapEndpoints(WebApplication app)
4750
{
48-
app.MapPost("ingest/clef", async context => await IngestCompactFormatAsync(context));
49-
app.MapPost("api/events/raw", async context => await IngestAsync(context));
51+
app.MapPost("ingest/clef", (Delegate) (async (HttpContext context) => await IngestCompactFormatAsync(context)));
52+
app.MapPost("api/events/raw", (Delegate) (async (HttpContext context) => await IngestAsync(context)));
5053
}
5154

5255
async Task<IResult> IngestAsync(HttpContext context)
@@ -62,7 +65,7 @@ async Task<IResult> IngestAsync(HttpContext context)
6265

6366
IngestionLog.ForClient(context.Connection.RemoteIpAddress)
6467
.Error("Client supplied a legacy raw-format (non-CLEF) payload");
65-
return Results.BadRequest("Only newline-delimited JSON (CLEF) payloads are supported.");
68+
return Error(HttpStatusCode.BadRequest, "Only newline-delimited JSON (CLEF) payloads are supported.");
6669
}
6770

6871
async Task<IResult> IngestCompactFormatAsync(HttpContext context)
@@ -74,7 +77,10 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
7477

7578
var log = _forwardingChannels.Get(GetApiKey(context.Request));
7679

77-
var payload = ArrayPool<byte>.Shared.Rent(1024 * 1024 * 10);
80+
// Add one for the extra newline that we have to insert at the end of batches.
81+
var bufferSize = _config.Connection.BatchSizeLimitBytes + 1;
82+
var rented = ArrayPool<byte>.Shared.Rent(bufferSize);
83+
var buffer = rented[..bufferSize];
7884
var writeHead = 0;
7985
var readHead = 0;
8086

@@ -85,28 +91,37 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
8591
// size of write batches.
8692
while (!done)
8793
{
88-
var remaining = payload.Length - writeHead;
94+
var remaining = buffer.Length - 1 - writeHead;
8995
if (remaining == 0)
9096
{
91-
break;
97+
IngestionLog.ForClient(context.Connection.RemoteIpAddress)
98+
.Error("An incoming request exceeded the configured batch size limit");
99+
return Error(HttpStatusCode.RequestEntityTooLarge, "the request is too large to process");
92100
}
93101

94-
var read = await context.Request.Body.ReadAsync(payload.AsMemory(writeHead, remaining), cts.Token);
102+
var read = await context.Request.Body.ReadAsync(buffer.AsMemory(writeHead, remaining), cts.Token);
95103
if (read == 0)
96104
{
97105
done = true;
98106
}
99107

100108
writeHead += read;
109+
110+
// Ingested batches must be terminated with `\n`, but this isn't an API requirement.
111+
if (done && writeHead > 0 && writeHead < buffer.Length && buffer[writeHead - 1] != (byte)'\n')
112+
{
113+
buffer[writeHead] = (byte)'\n';
114+
writeHead += 1;
115+
}
101116
}
102-
117+
103118
// Validate what we read, marking out a batch of one or more complete newline-delimited events.
104119
var batchStart = readHead;
105120
var batchEnd = readHead;
106121
while (batchEnd < writeHead)
107122
{
108123
var eventStart = batchEnd;
109-
var nlIndex = payload.AsSpan()[eventStart..].IndexOf((byte)'\n');
124+
var nlIndex = buffer.AsSpan()[eventStart..].IndexOf((byte)'\n');
110125

111126
if (nlIndex == -1)
112127
{
@@ -117,45 +132,41 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
117132

118133
batchEnd = eventEnd;
119134
readHead = batchEnd;
120-
121-
if (!ValidateClef(payload.AsSpan()[eventStart..eventEnd], out var error))
135+
136+
if (!ValidateClef(buffer.AsSpan()[eventStart..eventEnd], out var error))
122137
{
123-
var payloadText = Encoding.UTF8.GetString(payload.AsSpan()[eventStart..eventEnd]);
138+
var payloadText = Encoding.UTF8.GetString(buffer.AsSpan()[eventStart..eventEnd]);
124139
IngestionLog.ForPayload(context.Connection.RemoteIpAddress, payloadText)
125140
.Error("Payload validation failed: {Error}", error);
126-
return Results.BadRequest($"Payload validation failed: {error}.");
141+
return Error(HttpStatusCode.BadRequest, $"Payload validation failed: {error}.");
127142
}
128143
}
129144

130145
if (batchStart != batchEnd)
131146
{
132-
await Write(log, ArrayPool<byte>.Shared, payload, batchStart..batchEnd, cts.Token);
147+
await Write(log, ArrayPool<byte>.Shared, buffer, batchStart..batchEnd, cts.Token);
133148
}
134149

135150
// Copy any unprocessed data into our buffer and continue
136-
if (!done)
151+
if (!done && readHead != 0)
137152
{
138153
var retain = writeHead - readHead;
139-
payload.AsSpan()[readHead..writeHead].CopyTo(payload.AsSpan()[..retain]);
154+
buffer.AsSpan()[readHead..writeHead].CopyTo(buffer.AsSpan()[..retain]);
140155
readHead = 0;
141156
writeHead = retain;
142157
}
143158
}
144159

145160
// Exception cases are handled by `Write`
146-
ArrayPool<byte>.Shared.Return(payload);
161+
ArrayPool<byte>.Shared.Return(rented);
147162

148-
return TypedResults.Content(
149-
null,
150-
"application/json",
151-
Utf8,
152-
StatusCodes.Status201Created);
163+
return SuccessfulIngestion();
153164
}
154165
catch (Exception ex)
155166
{
156167
IngestionLog.ForClient(context.Connection.RemoteIpAddress)
157168
.Error(ex, "Ingestion failed");
158-
return Results.InternalServerError();
169+
return Error(HttpStatusCode.InternalServerError, "Ingestion failed.");
159170
}
160171
}
161172

@@ -185,11 +196,17 @@ static bool DefaultedBoolQuery(HttpRequest request, string queryParameterName)
185196
return request.Query.TryGetValue("apiKey", out var apiKey) ? apiKey.Last() : null;
186197
}
187198

188-
static bool ValidateClef(Span<byte> evt, [NotNullWhen(false)] out string? errorFragment)
199+
bool ValidateClef(Span<byte> evt, [NotNullWhen(false)] out string? errorFragment)
189200
{
190201
// Note that `errorFragment` does not include user-supplied values; we opt in to adding this to
191202
// the ingestion log and include it using `ForPayload()`.
192203

204+
if (evt.Length > _config.Connection.EventSizeLimitBytes)
205+
{
206+
errorFragment = "an event exceeds the configured size limit";
207+
return false;
208+
}
209+
193210
var reader = new Utf8JsonReader(evt);
194211

195212
var foundTimestamp = false;
@@ -225,7 +242,6 @@ static bool ValidateClef(Span<byte> evt, [NotNullWhen(false)] out string? errorF
225242
}
226243

227244
foundTimestamp = true;
228-
break;
229245
}
230246
}
231247
}
@@ -259,4 +275,18 @@ static async Task Write(ForwardingChannel forwardingChannel, ArrayPool<byte> poo
259275
throw;
260276
}
261277
}
262-
}
278+
279+
static IResult Error(HttpStatusCode statusCode, string message)
280+
{
281+
return Results.Json(new ErrorPart { Error = message }, statusCode: (int)statusCode);
282+
}
283+
284+
static IResult SuccessfulIngestion()
285+
{
286+
return TypedResults.Content(
287+
"{}",
288+
"application/json",
289+
Utf8,
290+
StatusCodes.Status201Created);
291+
}
292+
}

0 commit comments

Comments
 (0)