diff --git a/src/SeqCli/Cli/Commands/IngestCommand.cs b/src/SeqCli/Cli/Commands/IngestCommand.cs index 8a548c06..d356212e 100644 --- a/src/SeqCli/Cli/Commands/IngestCommand.cs +++ b/src/SeqCli/Cli/Commands/IngestCommand.cs @@ -14,6 +14,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using SeqCli.Api; using SeqCli.Cli.Features; @@ -120,14 +121,15 @@ protected override async Task Run() if (_message != null) reader = new StaticMessageTemplateReader(reader, _message); - var exit = await LogShipper.ShipEvents( + var exit = await LogShipper.ShipEventsAsync( connection, apiKey, reader, _invalidDataHandlingFeature.InvalidDataHandling, _sendFailureHandlingFeature.SendFailureHandling, batchSize, - filter); + filter, + CancellationToken.None); if (exit != 0) return exit; diff --git a/src/SeqCli/Config/Forwarder/SeqCliForwarderStorageConfig.cs b/src/SeqCli/Config/Forwarder/SeqCliForwarderStorageConfig.cs index 8f88badf..f5af85f9 100644 --- a/src/SeqCli/Config/Forwarder/SeqCliForwarderStorageConfig.cs +++ b/src/SeqCli/Config/Forwarder/SeqCliForwarderStorageConfig.cs @@ -4,5 +4,6 @@ namespace SeqCli.Config.Forwarder; public class SeqCliForwarderStorageConfig { - public ulong BufferSizeBytes { get; set; } = 67_108_864; -} \ No newline at end of file + public long TargetChunkSizeBytes { get; set; } = 10 * 512 * 1024; + public int? MaxChunks { get; set; } = null; +} diff --git a/src/SeqCli/Config/SeqCliConnectionConfig.cs b/src/SeqCli/Config/SeqCliConnectionConfig.cs index 7c3cc2a1..e5af87ca 100644 --- a/src/SeqCli/Config/SeqCliConnectionConfig.cs +++ b/src/SeqCli/Config/SeqCliConnectionConfig.cs @@ -16,7 +16,6 @@ using System.Text; using Newtonsoft.Json; using SeqCli.Encryptor; -using SeqCli.Util; namespace SeqCli.Config; @@ -56,6 +55,16 @@ public void EncodeApiKey(string? apiKey, IDataProtector dataProtector) } public uint? PooledConnectionLifetimeMilliseconds { get; set; } = null; - public ulong EventBodyLimitBytes { get; set; } = 256 * 1024; - public ulong PayloadLimitBytes { get; set; } = 10 * 1024 * 1024; + + /// + /// The maximum event body size to send to the Seq server when ingesting events. + /// When forwarding, this value is consulted only on the way in; on the way out, we let the target server reject any + /// stragglers via 400 responses. + /// + public int EventSizeLimitBytes { get; set; } = 256 * 1024; + + /// + /// The maximum batch size to send to the Seq server when ingesting events. + /// + public int BatchSizeLimitBytes { get; set; } = 10 * 1024 * 1024; } diff --git a/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs b/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs index e5e10d40..15c6e90c 100644 --- a/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs +++ b/src/SeqCli/Forwarder/Channel/ForwardingChannel.cs @@ -1,8 +1,10 @@ using System; +using System.IO; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Seq.Api; +using SeqCli.Forwarder.Diagnostics; using SeqCli.Forwarder.Storage; using SeqCli.Ingestion; @@ -15,7 +17,8 @@ class ForwardingChannel readonly CancellationTokenSource _stop; readonly CancellationToken _hardCancel; - public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark bookmark, SeqConnection connection, string? apiKey, CancellationToken hardCancel) + public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark bookmark, + SeqConnection connection, string? apiKey, long targetChunkSizeBytes, int? maxChunks, int batchSizeLimitBytes, CancellationToken hardCancel) { var channel = System.Threading.Channels.Channel.CreateBounded(new BoundedChannelOptions(5) { @@ -33,9 +36,21 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark { try { - // TODO: chunk sizes, max chunks, ingestion log - appender.TryAppend(entry.Data.AsSpan(), 100_000_000); - entry.CompletionSource.SetResult(); + const int maxTries = 3; + for (var retry = 0; retry < maxTries; ++retry) + { + if (appender.TryAppend(entry.Data.AsSpan(), targetChunkSizeBytes, maxChunks)) + { + entry.CompletionSource.SetResult(); + break; + } + + if (retry == maxTries - 1) + { + IngestionLog.Log.Error("Buffering failed due to an I/O error; the incoming chunk was rejected"); + entry.CompletionSource.TrySetException(new IOException("Buffering failed due to an I/O error.")); + } + } } catch (Exception e) { @@ -55,13 +70,13 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark { if (_hardCancel.IsCancellationRequested) return; - if (!reader.TryFillBatch(1024 * 1024, out var batch)) + if (!reader.TryFillBatch(batchSizeLimitBytes, out var batch)) { await Task.Delay(100, hardCancel); continue; } - await LogShipper.ShipBuffer(connection, apiKey, batch.Value.AsArraySegment(), SendFailureHandling.Retry); + await LogShipper.ShipBufferAsync(connection, apiKey, batch.Value.AsArraySegment(), IngestionLog.Log, hardCancel); if (bookmark.TrySet(new BufferPosition(batch.Value.ReaderHead.ChunkId, batch.Value.ReaderHead.Offset))) diff --git a/src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs b/src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs index 08132ceb..9dc9cd3a 100644 --- a/src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs +++ b/src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs @@ -5,6 +5,7 @@ using System.Threading; using System.Threading.Tasks; using Seq.Api; +using SeqCli.Config; using SeqCli.Forwarder.Filesystem.System; using SeqCli.Forwarder.Storage; using Serilog; @@ -15,15 +16,17 @@ class ForwardingChannelMap { readonly string _bufferPath; readonly SeqConnection _connection; + readonly SeqCliConfig _config; readonly ForwardingChannel _defaultChannel; readonly Lock _channelsSync = new(); readonly Dictionary _channels = new(); readonly CancellationTokenSource _shutdownTokenSource = new(); - public ForwardingChannelMap(string bufferPath, SeqConnection connection, string? defaultApiKey) + public ForwardingChannelMap(string bufferPath, SeqConnection connection, string? defaultApiKey, SeqCliConfig config) { _bufferPath = bufferPath; _connection = connection; + _config = config; _defaultChannel = OpenOrCreateChannel(defaultApiKey, "Default"); // TODO, load other channels at start-up @@ -43,6 +46,9 @@ ForwardingChannel OpenOrCreateChannel(string? apiKey, string name) Bookmark.Open(store), _connection, apiKey, + _config.Forwarder.Storage.TargetChunkSizeBytes, + _config.Forwarder.Storage.MaxChunks, + _config.Connection.BatchSizeLimitBytes, _shutdownTokenSource.Token); } diff --git a/src/SeqCli/Forwarder/ForwarderModule.cs b/src/SeqCli/Forwarder/ForwarderModule.cs index 960c01d1..ce000a28 100644 --- a/src/SeqCli/Forwarder/ForwarderModule.cs +++ b/src/SeqCli/Forwarder/ForwarderModule.cs @@ -46,7 +46,7 @@ public ForwarderModule(string bufferPath, SeqCliConfig config, SeqConnection con protected override void Load(ContainerBuilder builder) { builder.RegisterType().SingleInstance(); - builder.Register(_ => new ForwardingChannelMap(_bufferPath, _connection, _apiKey)).SingleInstance(); + builder.Register(_ => new ForwardingChannelMap(_bufferPath, _connection, _apiKey, _config)).SingleInstance(); builder.RegisterType().As(); diff --git a/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs b/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs index b1cb184f..467087f9 100644 --- a/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs +++ b/src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs @@ -16,6 +16,7 @@ using System.Buffers; using System.Diagnostics.CodeAnalysis; using System.Linq; +using System.Net; using System.Text; using System.Text.Json; using System.Threading; @@ -23,30 +24,32 @@ using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; using Microsoft.Net.Http.Headers; +using Seq.Api.Model.Shared; using SeqCli.Api; +using SeqCli.Config; using SeqCli.Forwarder.Channel; using SeqCli.Forwarder.Diagnostics; using JsonException = System.Text.Json.JsonException; namespace SeqCli.Forwarder.Web.Api; -// ReSharper disable UnusedMethodReturnValue.Local - class IngestionEndpoints : IMapEndpoints { static readonly Encoding Utf8 = new UTF8Encoding(false); readonly ForwardingChannelMap _forwardingChannels; + readonly SeqCliConfig _config; - public IngestionEndpoints(ForwardingChannelMap forwardingChannels) + public IngestionEndpoints(ForwardingChannelMap forwardingChannels, SeqCliConfig config) { _forwardingChannels = forwardingChannels; + _config = config; } public void MapEndpoints(WebApplication app) { - app.MapPost("ingest/clef", async context => await IngestCompactFormatAsync(context)); - app.MapPost("api/events/raw", async context => await IngestAsync(context)); + app.MapPost("ingest/clef", (Delegate) (async (HttpContext context) => await IngestCompactFormatAsync(context))); + app.MapPost("api/events/raw", (Delegate) (async (HttpContext context) => await IngestAsync(context))); } async Task IngestAsync(HttpContext context) @@ -62,7 +65,7 @@ async Task IngestAsync(HttpContext context) IngestionLog.ForClient(context.Connection.RemoteIpAddress) .Error("Client supplied a legacy raw-format (non-CLEF) payload"); - return Results.BadRequest("Only newline-delimited JSON (CLEF) payloads are supported."); + return Error(HttpStatusCode.BadRequest, "Only newline-delimited JSON (CLEF) payloads are supported."); } async Task IngestCompactFormatAsync(HttpContext context) @@ -74,7 +77,10 @@ async Task IngestCompactFormatAsync(HttpContext context) var log = _forwardingChannels.Get(GetApiKey(context.Request)); - var payload = ArrayPool.Shared.Rent(1024 * 1024 * 10); + // Add one for the extra newline that we have to insert at the end of batches. + var bufferSize = _config.Connection.BatchSizeLimitBytes + 1; + var rented = ArrayPool.Shared.Rent(bufferSize); + var buffer = rented[..bufferSize]; var writeHead = 0; var readHead = 0; @@ -85,28 +91,37 @@ async Task IngestCompactFormatAsync(HttpContext context) // size of write batches. while (!done) { - var remaining = payload.Length - writeHead; + var remaining = buffer.Length - 1 - writeHead; if (remaining == 0) { - break; + IngestionLog.ForClient(context.Connection.RemoteIpAddress) + .Error("An incoming request exceeded the configured batch size limit"); + return Error(HttpStatusCode.RequestEntityTooLarge, "the request is too large to process"); } - var read = await context.Request.Body.ReadAsync(payload.AsMemory(writeHead, remaining), cts.Token); + var read = await context.Request.Body.ReadAsync(buffer.AsMemory(writeHead, remaining), cts.Token); if (read == 0) { done = true; } writeHead += read; + + // Ingested batches must be terminated with `\n`, but this isn't an API requirement. + if (done && writeHead > 0 && writeHead < buffer.Length && buffer[writeHead - 1] != (byte)'\n') + { + buffer[writeHead] = (byte)'\n'; + writeHead += 1; + } } - + // Validate what we read, marking out a batch of one or more complete newline-delimited events. var batchStart = readHead; var batchEnd = readHead; while (batchEnd < writeHead) { var eventStart = batchEnd; - var nlIndex = payload.AsSpan()[eventStart..].IndexOf((byte)'\n'); + var nlIndex = buffer.AsSpan()[eventStart..].IndexOf((byte)'\n'); if (nlIndex == -1) { @@ -117,45 +132,41 @@ async Task IngestCompactFormatAsync(HttpContext context) batchEnd = eventEnd; readHead = batchEnd; - - if (!ValidateClef(payload.AsSpan()[eventStart..eventEnd], out var error)) + + if (!ValidateClef(buffer.AsSpan()[eventStart..eventEnd], out var error)) { - var payloadText = Encoding.UTF8.GetString(payload.AsSpan()[eventStart..eventEnd]); + var payloadText = Encoding.UTF8.GetString(buffer.AsSpan()[eventStart..eventEnd]); IngestionLog.ForPayload(context.Connection.RemoteIpAddress, payloadText) .Error("Payload validation failed: {Error}", error); - return Results.BadRequest($"Payload validation failed: {error}."); + return Error(HttpStatusCode.BadRequest, $"Payload validation failed: {error}."); } } if (batchStart != batchEnd) { - await Write(log, ArrayPool.Shared, payload, batchStart..batchEnd, cts.Token); + await Write(log, ArrayPool.Shared, buffer, batchStart..batchEnd, cts.Token); } // Copy any unprocessed data into our buffer and continue - if (!done) + if (!done && readHead != 0) { var retain = writeHead - readHead; - payload.AsSpan()[readHead..writeHead].CopyTo(payload.AsSpan()[..retain]); + buffer.AsSpan()[readHead..writeHead].CopyTo(buffer.AsSpan()[..retain]); readHead = 0; writeHead = retain; } } // Exception cases are handled by `Write` - ArrayPool.Shared.Return(payload); + ArrayPool.Shared.Return(rented); - return TypedResults.Content( - null, - "application/json", - Utf8, - StatusCodes.Status201Created); + return SuccessfulIngestion(); } catch (Exception ex) { IngestionLog.ForClient(context.Connection.RemoteIpAddress) .Error(ex, "Ingestion failed"); - return Results.InternalServerError(); + return Error(HttpStatusCode.InternalServerError, "Ingestion failed."); } } @@ -185,11 +196,17 @@ static bool DefaultedBoolQuery(HttpRequest request, string queryParameterName) return request.Query.TryGetValue("apiKey", out var apiKey) ? apiKey.Last() : null; } - static bool ValidateClef(Span evt, [NotNullWhen(false)] out string? errorFragment) + bool ValidateClef(Span evt, [NotNullWhen(false)] out string? errorFragment) { // Note that `errorFragment` does not include user-supplied values; we opt in to adding this to // the ingestion log and include it using `ForPayload()`. + if (evt.Length > _config.Connection.EventSizeLimitBytes) + { + errorFragment = "an event exceeds the configured size limit"; + return false; + } + var reader = new Utf8JsonReader(evt); var foundTimestamp = false; @@ -225,7 +242,6 @@ static bool ValidateClef(Span evt, [NotNullWhen(false)] out string? errorF } foundTimestamp = true; - break; } } } @@ -259,4 +275,18 @@ static async Task Write(ForwardingChannel forwardingChannel, ArrayPool poo throw; } } -} \ No newline at end of file + + static IResult Error(HttpStatusCode statusCode, string message) + { + return Results.Json(new ErrorPart { Error = message }, statusCode: (int)statusCode); + } + + static IResult SuccessfulIngestion() + { + return TypedResults.Content( + "{}", + "application/json", + Utf8, + StatusCodes.Status201Created); + } +} diff --git a/src/SeqCli/Ingestion/LogShipper.cs b/src/SeqCli/Ingestion/LogShipper.cs index 94f15298..fbd892b1 100644 --- a/src/SeqCli/Ingestion/LogShipper.cs +++ b/src/SeqCli/Ingestion/LogShipper.cs @@ -15,9 +15,11 @@ using System; using System.Collections.Generic; using System.IO; +using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Text; +using System.Threading; using System.Threading.Tasks; using Newtonsoft.Json; using Seq.Api; @@ -33,11 +35,12 @@ static class LogShipper { static readonly ITextFormatter JsonFormatter = OutputFormatter.Json(null); - public static async Task ShipBuffer( + public static async Task ShipBufferAsync( SeqConnection connection, string? apiKey, ArraySegment utf8Clef, - SendFailureHandling sendFailureHandling) + ILogger sendFailureLog, + CancellationToken cancellationToken) { var content = new ByteArrayContent(utf8Clef.Array!, utf8Clef.Offset, utf8Clef.Count) { @@ -50,51 +53,57 @@ public static async Task ShipBuffer( var retries = 0; while (true) { - var sendSucceeded = false; try { - sendSucceeded = await Send( + var statusCode = await SendAsync( connection, apiKey, - sendFailureHandling != SendFailureHandling.Ignore, - content); - } - catch (Exception ex) - { - if (sendFailureHandling != SendFailureHandling.Ignore) - Log.Error(ex, "Failed to send an event batch"); - } + sendFailureLog, + content, + cancellationToken); - if (!sendSucceeded) - { - if (sendFailureHandling == SendFailureHandling.Fail) - return false; + if ((int)statusCode is >= 200 and < 300) + { + return; + } - if (sendFailureHandling == SendFailureHandling.Retry) + if (statusCode == HttpStatusCode.BadRequest) { - var millisecondsDelay = (int)Math.Min(Math.Pow(2, retries) * 2000, 60000); - await Task.Delay(millisecondsDelay); - retries += 1; - continue; + sendFailureLog.Warning( + "Status code {StatusCode} indicates that the batch will not be accepted on retry; dropping", + (int)statusCode); + return; } } + catch (TaskCanceledException) + { + throw; + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + var millisecondsDelay = (int)Math.Min(Math.Pow(2, retries) * 2000, 60000); + sendFailureLog.Error(ex, "Failed to send an event batch; retry in {MillisecondsDelay}", millisecondsDelay); - return true; + await Task.Delay(millisecondsDelay, cancellationToken); + retries += 1; + } } } - public static async Task ShipEvents( + public static async Task ShipEventsAsync( SeqConnection connection, string? apiKey, ILogEventReader reader, InvalidDataHandling invalidDataHandling, SendFailureHandling sendFailureHandling, int batchSize, - Func? filter = null) + Func? filter, + CancellationToken cancellationToken) { - if (connection == null) throw new ArgumentNullException(nameof(connection)); - if (reader == null) throw new ArgumentNullException(nameof(reader)); - const int maxEmptyBatchWaitMS = 2000; var batch = await ReadBatchAsync(reader, filter, batchSize, invalidDataHandling, maxEmptyBatchWaitMS); var retries = 0; @@ -103,16 +112,19 @@ public static async Task ShipEvents( var sendSucceeded = false; try { - sendSucceeded = await SendBatchAsync( + var statusCode = await SendBatchAsync( connection, apiKey, batch.LogEvents, - sendFailureHandling != SendFailureHandling.Ignore); + sendFailureHandling != SendFailureHandling.Ignore ? Log.Logger : null, + cancellationToken); + + sendSucceeded = (int)statusCode is >= 200 and < 300; } catch (Exception ex) { if (sendFailureHandling != SendFailureHandling.Ignore) - Log.Error(ex, "Failed to send an event batch"); + Log.Error(ex, "Batch shipping failed"); } if (!sendSucceeded) @@ -195,14 +207,15 @@ static async Task ReadBatchAsync( } while (true); } - static async Task SendBatchAsync( + static async Task SendBatchAsync( SeqConnection connection, string? apiKey, IReadOnlyCollection batch, - bool logSendFailures) + ILogger? sendFailureLog, + CancellationToken cancellationToken) { if (batch.Count == 0) - return true; + return HttpStatusCode.OK; StringContent content; // ReSharper disable once UseAwaitUsing @@ -214,35 +227,32 @@ static async Task SendBatchAsync( content = new StringContent(builder.ToString(), Encoding.UTF8, ApiConstants.ClefMediaType); } - return await Send(connection, apiKey, logSendFailures, content); + return await SendAsync(connection, apiKey, sendFailureLog, content, cancellationToken); } - static async Task Send(SeqConnection connection, string? apiKey, bool logSendFailures, HttpContent content) + static async Task SendAsync(SeqConnection connection, string? apiKey, ILogger? sendFailureLog, HttpContent content, CancellationToken cancellationToken) { var request = new HttpRequestMessage(HttpMethod.Post, ApiConstants.IngestionEndpoint) { Content = content }; if (apiKey != null) request.Headers.Add(ApiConstants.ApiKeyHeaderName, apiKey); - var result = await connection.Client.HttpClient.SendAsync(request); - - if (result.IsSuccessStatusCode) - return true; + var result = await connection.Client.HttpClient.SendAsync(request, cancellationToken); - if (!logSendFailures) - return false; + if (result.IsSuccessStatusCode || sendFailureLog == null) + return result.StatusCode; - var resultJson = await result.Content.ReadAsStringAsync(); + var resultJson = await result.Content.ReadAsStringAsync(cancellationToken); if (!string.IsNullOrWhiteSpace(resultJson)) { try { var error = JsonConvert.DeserializeObject(resultJson)!; - Log.Error("Failed with status code {StatusCode}: {ErrorMessage}", + sendFailureLog.Error("Shipping failed with status code {StatusCode}: {ErrorMessage}", result.StatusCode, (string)error.Error); - return false; + return result.StatusCode; } catch { @@ -250,7 +260,7 @@ static async Task Send(SeqConnection connection, string? apiKey, bool logS } } - Log.Error("Failed with status code {StatusCode} ({ReasonPhrase})", result.StatusCode, result.ReasonPhrase); - return false; + sendFailureLog.Error("Shipping failed with status code {StatusCode} ({ReasonPhrase})", result.StatusCode, result.ReasonPhrase); + return result.StatusCode; } } \ No newline at end of file diff --git a/src/SeqCli/Ingestion/SendFailureHandling.cs b/src/SeqCli/Ingestion/SendFailureHandling.cs index 1a4c3980..ae9cb221 100644 --- a/src/SeqCli/Ingestion/SendFailureHandling.cs +++ b/src/SeqCli/Ingestion/SendFailureHandling.cs @@ -17,7 +17,6 @@ namespace SeqCli.Ingestion; /// /// Controls how connection failures during ingestion are handled. /// -/// A 'retry' option will appear here at some future point. enum SendFailureHandling { /// diff --git a/src/SeqCli/Sample/Loader/Simulation.cs b/src/SeqCli/Sample/Loader/Simulation.cs index 0735c4f8..b3fc062f 100644 --- a/src/SeqCli/Sample/Loader/Simulation.cs +++ b/src/SeqCli/Sample/Loader/Simulation.cs @@ -34,8 +34,8 @@ public static async Task RunAsync(SeqConnection connection, string? apiKey, int .WriteTo.Sink(buffer) .CreateLogger(); - var ship = Task.Run(() => LogShipper.ShipEvents(connection, apiKey, buffer, - InvalidDataHandling.Fail, SendFailureHandling.Continue, batchSize), cancellationToken); + var ship = Task.Run(() => LogShipper.ShipEventsAsync(connection, apiKey, buffer, + InvalidDataHandling.Fail, SendFailureHandling.Continue, batchSize, null, cancellationToken), cancellationToken); await Roastery.Program.Main(logger, cancellationToken); await logger.DisposeAsync(); diff --git a/src/SeqCli/SeqCli.csproj b/src/SeqCli/SeqCli.csproj index 0af2a82b..e7ef671e 100644 --- a/src/SeqCli/SeqCli.csproj +++ b/src/SeqCli/SeqCli.csproj @@ -46,6 +46,7 @@ + diff --git a/test/SeqCli.EndToEnd/Forwarder/ForwarderIngestionValidationTestCase.cs b/test/SeqCli.EndToEnd/Forwarder/ForwarderIngestionValidationTestCase.cs new file mode 100644 index 00000000..0aafbc66 --- /dev/null +++ b/test/SeqCli.EndToEnd/Forwarder/ForwarderIngestionValidationTestCase.cs @@ -0,0 +1,63 @@ +using System; +using System.Globalization; +using System.Net; +using System.Net.Http; +using System.Threading.Tasks; +using Seq.Api; +using SeqCli.EndToEnd.Support; +using Serilog; +using Xunit; + +namespace SeqCli.EndToEnd.Forwarder; + +public class ForwarderIngestionValidationTestCase: ICliTestCase +{ + public async Task ExecuteAsync(SeqConnection _, ILogger logger, CliCommandRunner runner) + { + const int eventSizeLimit = 10_000; + const int batchSizeLimit = eventSizeLimit * 10; + var (forwarder, forwarderUri) = await runner.SpawnForwarderAsync(environment: new() + { + ["SEQCLI_CONNECTION_EVENTSIZELIMITBYTES"] = eventSizeLimit.ToString(CultureInfo.InvariantCulture), + ["SEQCLI_CONNECTION_BATCHSIZELIMITBYTES"] = batchSizeLimit.ToString(CultureInfo.InvariantCulture) + }); + using (forwarder) + { + using var connection = new SeqConnection(forwarderUri); + + var isoNow = DateTime.UtcNow.ToString("o"); + + await IngestClef(connection, $"{{\"@t\":\"{isoNow}\"}}", HttpStatusCode.Created); + await IngestClef(connection, $"{{\"@t\":\"{isoNow}\",\"\":42}}", HttpStatusCode.Created); + await IngestClef(connection, $"{{\"@t\":\"{isoNow}\",\"@mt\":\"{{a.b}}{{a.c}}{{b.d}}\",\"a\":{{\"b\":1}}}}", + HttpStatusCode.Created); + await IngestClef(connection, + $"{{\"@t\":\"{isoNow}\",\"@mt\":\"test {{a}} {{b}}\",\"@i\":\"ba6bd213\",\"a\":\"A\",\"b\":42}}", + HttpStatusCode.Created); + await IngestClef(connection, $"{{\"@t\":\"{isoNow}\",\"N\":635476923356034756,\"@mt\":\"N{{N}}\"}}", + HttpStatusCode.Created); + await IngestClef(connection, "{}", HttpStatusCode.BadRequest); + await IngestClef(connection, "{hello!}", HttpStatusCode.BadRequest); + await IngestClef(connection, "{\"@t\":\"ceci n'est pas un timestamp\"}", HttpStatusCode.BadRequest); + await IngestClef(connection, new string('X', 100), HttpStatusCode.BadRequest); + await IngestClef(connection, new string('X', batchSizeLimit + 100), HttpStatusCode.RequestEntityTooLarge); + await IngestClef(connection, $"{{\"@t\":\"{isoNow}\",\"X\":\"{new string('X', eventSizeLimit + 1000)}\"}}", + HttpStatusCode.BadRequest); + await IngestClef(connection, $"{{\"@t\":\"{isoNow}\",\"X\":\"{new string('X', eventSizeLimit - 1000)}\"}}", + HttpStatusCode.Created); + await IngestClef(connection, + $"{{\"@t\":\"{isoNow}\",\"@tr\":\"abc\",\"@sp\":\"def\",\"@ra\":{{\"ghi\":\"jkl\"}},\"@sa\":{{\"mno\":\"pqr\"}}}}", + HttpStatusCode.Created); + await IngestClef(connection, + $"{{\"@t\":\"{isoNow}\",\"@m\":\"Not a template {{foo: 'bar'}}\",\"@i\":\"01234567\"}}", + HttpStatusCode.Created); + } + } + + static async Task IngestClef(SeqConnection connection, string clef, HttpStatusCode expectedStatusCode) + { + var content = new StringContent(clef); + var response = await connection.Client.HttpClient.PostAsync("ingest/clef", content); + Assert.Equal(expectedStatusCode, response.StatusCode); + } +}