Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/SeqCli/Cli/Commands/IngestCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using SeqCli.Api;
using SeqCli.Cli.Features;
Expand Down Expand Up @@ -120,14 +121,15 @@ protected override async Task<int> Run()
if (_message != null)
reader = new StaticMessageTemplateReader(reader, _message);

var exit = await LogShipper.ShipEvents(
var exit = await LogShipper.ShipEventsAsync(
connection,
apiKey,
reader,
_invalidDataHandlingFeature.InvalidDataHandling,
_sendFailureHandlingFeature.SendFailureHandling,
batchSize,
filter);
filter,
CancellationToken.None);

if (exit != 0)
return exit;
Expand Down
5 changes: 3 additions & 2 deletions src/SeqCli/Config/Forwarder/SeqCliForwarderStorageConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ namespace SeqCli.Config.Forwarder;

public class SeqCliForwarderStorageConfig
{
public ulong BufferSizeBytes { get; set; } = 67_108_864;
}
public long TargetChunkSizeBytes { get; set; } = 10 * 512 * 1024;
public int? MaxChunks { get; set; } = null;
}
15 changes: 12 additions & 3 deletions src/SeqCli/Config/SeqCliConnectionConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
using System.Text;
using Newtonsoft.Json;
using SeqCli.Encryptor;
using SeqCli.Util;

namespace SeqCli.Config;

Expand Down Expand Up @@ -56,6 +55,16 @@ public void EncodeApiKey(string? apiKey, IDataProtector dataProtector)
}

public uint? PooledConnectionLifetimeMilliseconds { get; set; } = null;
public ulong EventBodyLimitBytes { get; set; } = 256 * 1024;
public ulong PayloadLimitBytes { get; set; } = 10 * 1024 * 1024;

/// <summary>
/// The maximum event body size to send to the Seq server when ingesting events.
/// When forwarding, this value is consulted only on the way in; on the way out, we let the target server reject any
/// stragglers via 400 responses.
/// </summary>
public int EventSizeLimitBytes { get; set; } = 256 * 1024;

/// <summary>
/// The maximum batch size to send to the Seq server when ingesting events.
/// </summary>
public int BatchSizeLimitBytes { get; set; } = 10 * 1024 * 1024;
}
27 changes: 21 additions & 6 deletions src/SeqCli/Forwarder/Channel/ForwardingChannel.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Seq.Api;
using SeqCli.Forwarder.Diagnostics;
using SeqCli.Forwarder.Storage;
using SeqCli.Ingestion;

Expand All @@ -15,7 +17,8 @@ class ForwardingChannel
readonly CancellationTokenSource _stop;
readonly CancellationToken _hardCancel;

public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark bookmark, SeqConnection connection, string? apiKey, CancellationToken hardCancel)
public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark bookmark,
SeqConnection connection, string? apiKey, long targetChunkSizeBytes, int? maxChunks, int batchSizeLimitBytes, CancellationToken hardCancel)
{
var channel = System.Threading.Channels.Channel.CreateBounded<ForwardingChannelEntry>(new BoundedChannelOptions(5)
{
Expand All @@ -33,9 +36,21 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
{
try
{
// TODO: chunk sizes, max chunks, ingestion log
appender.TryAppend(entry.Data.AsSpan(), 100_000_000);
entry.CompletionSource.SetResult();
const int maxTries = 3;
for (var retry = 0; retry < maxTries; ++retry)
{
if (appender.TryAppend(entry.Data.AsSpan(), targetChunkSizeBytes, maxChunks))
{
entry.CompletionSource.SetResult();
break;
}

if (retry == maxTries - 1)
{
IngestionLog.Log.Error("Buffering failed due to an I/O error; the incoming chunk was rejected");
entry.CompletionSource.TrySetException(new IOException("Buffering failed due to an I/O error."));
}
}
}
catch (Exception e)
{
Expand All @@ -55,13 +70,13 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
{
if (_hardCancel.IsCancellationRequested) return;

if (!reader.TryFillBatch(1024 * 1024, out var batch))
if (!reader.TryFillBatch(batchSizeLimitBytes, out var batch))
{
await Task.Delay(100, hardCancel);
continue;
}

await LogShipper.ShipBuffer(connection, apiKey, batch.Value.AsArraySegment(), SendFailureHandling.Retry);
await LogShipper.ShipBufferAsync(connection, apiKey, batch.Value.AsArraySegment(), IngestionLog.Log, hardCancel);

if (bookmark.TrySet(new BufferPosition(batch.Value.ReaderHead.ChunkId,
batch.Value.ReaderHead.Offset)))
Expand Down
8 changes: 7 additions & 1 deletion src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading;
using System.Threading.Tasks;
using Seq.Api;
using SeqCli.Config;
using SeqCli.Forwarder.Filesystem.System;
using SeqCli.Forwarder.Storage;
using Serilog;
Expand All @@ -15,15 +16,17 @@ class ForwardingChannelMap
{
readonly string _bufferPath;
readonly SeqConnection _connection;
readonly SeqCliConfig _config;
readonly ForwardingChannel _defaultChannel;
readonly Lock _channelsSync = new();
readonly Dictionary<string, ForwardingChannel> _channels = new();
readonly CancellationTokenSource _shutdownTokenSource = new();

public ForwardingChannelMap(string bufferPath, SeqConnection connection, string? defaultApiKey)
public ForwardingChannelMap(string bufferPath, SeqConnection connection, string? defaultApiKey, SeqCliConfig config)
{
_bufferPath = bufferPath;
_connection = connection;
_config = config;
_defaultChannel = OpenOrCreateChannel(defaultApiKey, "Default");

// TODO, load other channels at start-up
Expand All @@ -43,6 +46,9 @@ ForwardingChannel OpenOrCreateChannel(string? apiKey, string name)
Bookmark.Open(store),
_connection,
apiKey,
_config.Forwarder.Storage.TargetChunkSizeBytes,
_config.Forwarder.Storage.MaxChunks,
_config.Connection.BatchSizeLimitBytes,
_shutdownTokenSource.Token);
}

Expand Down
2 changes: 1 addition & 1 deletion src/SeqCli/Forwarder/ForwarderModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ForwarderModule(string bufferPath, SeqCliConfig config, SeqConnection con
protected override void Load(ContainerBuilder builder)
{
builder.RegisterType<ServerService>().SingleInstance();
builder.Register(_ => new ForwardingChannelMap(_bufferPath, _connection, _apiKey)).SingleInstance();
builder.Register(_ => new ForwardingChannelMap(_bufferPath, _connection, _apiKey, _config)).SingleInstance();

builder.RegisterType<IngestionEndpoints>().As<IMapEndpoints>();

Expand Down
88 changes: 59 additions & 29 deletions src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,40 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Net;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Net.Http.Headers;
using Seq.Api.Model.Shared;
using SeqCli.Api;
using SeqCli.Config;
using SeqCli.Forwarder.Channel;
using SeqCli.Forwarder.Diagnostics;
using JsonException = System.Text.Json.JsonException;

namespace SeqCli.Forwarder.Web.Api;

// ReSharper disable UnusedMethodReturnValue.Local

class IngestionEndpoints : IMapEndpoints
{
static readonly Encoding Utf8 = new UTF8Encoding(false);

readonly ForwardingChannelMap _forwardingChannels;
readonly SeqCliConfig _config;

public IngestionEndpoints(ForwardingChannelMap forwardingChannels)
public IngestionEndpoints(ForwardingChannelMap forwardingChannels, SeqCliConfig config)
{
_forwardingChannels = forwardingChannels;
_config = config;
}

public void MapEndpoints(WebApplication app)
{
app.MapPost("ingest/clef", async context => await IngestCompactFormatAsync(context));
app.MapPost("api/events/raw", async context => await IngestAsync(context));
app.MapPost("ingest/clef", (Delegate) (async (HttpContext context) => await IngestCompactFormatAsync(context)));
app.MapPost("api/events/raw", (Delegate) (async (HttpContext context) => await IngestAsync(context)));
}

async Task<IResult> IngestAsync(HttpContext context)
Expand All @@ -62,7 +65,7 @@ async Task<IResult> IngestAsync(HttpContext context)

IngestionLog.ForClient(context.Connection.RemoteIpAddress)
.Error("Client supplied a legacy raw-format (non-CLEF) payload");
return Results.BadRequest("Only newline-delimited JSON (CLEF) payloads are supported.");
return Error(HttpStatusCode.BadRequest, "Only newline-delimited JSON (CLEF) payloads are supported.");
}

async Task<IResult> IngestCompactFormatAsync(HttpContext context)
Expand All @@ -74,7 +77,10 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)

var log = _forwardingChannels.Get(GetApiKey(context.Request));

var payload = ArrayPool<byte>.Shared.Rent(1024 * 1024 * 10);
// Add one for the extra newline that we have to insert at the end of batches.
var bufferSize = _config.Connection.BatchSizeLimitBytes + 1;
var rented = ArrayPool<byte>.Shared.Rent(bufferSize);
var buffer = rented[..bufferSize];
var writeHead = 0;
var readHead = 0;

Expand All @@ -85,28 +91,37 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
// size of write batches.
while (!done)
{
var remaining = payload.Length - writeHead;
var remaining = buffer.Length - 1 - writeHead;
if (remaining == 0)
{
break;
IngestionLog.ForClient(context.Connection.RemoteIpAddress)
.Error("An incoming request exceeded the configured batch size limit");
return Error(HttpStatusCode.RequestEntityTooLarge, "the request is too large to process");
}

var read = await context.Request.Body.ReadAsync(payload.AsMemory(writeHead, remaining), cts.Token);
var read = await context.Request.Body.ReadAsync(buffer.AsMemory(writeHead, remaining), cts.Token);
if (read == 0)
{
done = true;
}

writeHead += read;

// Ingested batches must be terminated with `\n`, but this isn't an API requirement.
if (done && writeHead > 0 && writeHead < buffer.Length && buffer[writeHead - 1] != (byte)'\n')
{
buffer[writeHead] = (byte)'\n';
writeHead += 1;
}
}

// Validate what we read, marking out a batch of one or more complete newline-delimited events.
var batchStart = readHead;
var batchEnd = readHead;
while (batchEnd < writeHead)
{
var eventStart = batchEnd;
var nlIndex = payload.AsSpan()[eventStart..].IndexOf((byte)'\n');
var nlIndex = buffer.AsSpan()[eventStart..].IndexOf((byte)'\n');

if (nlIndex == -1)
{
Expand All @@ -117,45 +132,41 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)

batchEnd = eventEnd;
readHead = batchEnd;
if (!ValidateClef(payload.AsSpan()[eventStart..eventEnd], out var error))

if (!ValidateClef(buffer.AsSpan()[eventStart..eventEnd], out var error))
{
var payloadText = Encoding.UTF8.GetString(payload.AsSpan()[eventStart..eventEnd]);
var payloadText = Encoding.UTF8.GetString(buffer.AsSpan()[eventStart..eventEnd]);
IngestionLog.ForPayload(context.Connection.RemoteIpAddress, payloadText)
.Error("Payload validation failed: {Error}", error);
return Results.BadRequest($"Payload validation failed: {error}.");
return Error(HttpStatusCode.BadRequest, $"Payload validation failed: {error}.");
}
}

if (batchStart != batchEnd)
{
await Write(log, ArrayPool<byte>.Shared, payload, batchStart..batchEnd, cts.Token);
await Write(log, ArrayPool<byte>.Shared, buffer, batchStart..batchEnd, cts.Token);
}

// Copy any unprocessed data into our buffer and continue
if (!done)
if (!done && readHead != 0)
{
var retain = writeHead - readHead;
payload.AsSpan()[readHead..writeHead].CopyTo(payload.AsSpan()[..retain]);
buffer.AsSpan()[readHead..writeHead].CopyTo(buffer.AsSpan()[..retain]);
readHead = 0;
writeHead = retain;
}
}

// Exception cases are handled by `Write`
ArrayPool<byte>.Shared.Return(payload);
ArrayPool<byte>.Shared.Return(rented);

return TypedResults.Content(
null,
"application/json",
Utf8,
StatusCodes.Status201Created);
return SuccessfulIngestion();
}
catch (Exception ex)
{
IngestionLog.ForClient(context.Connection.RemoteIpAddress)
.Error(ex, "Ingestion failed");
return Results.InternalServerError();
return Error(HttpStatusCode.InternalServerError, "Ingestion failed.");
}
}

Expand Down Expand Up @@ -185,11 +196,17 @@ static bool DefaultedBoolQuery(HttpRequest request, string queryParameterName)
return request.Query.TryGetValue("apiKey", out var apiKey) ? apiKey.Last() : null;
}

static bool ValidateClef(Span<byte> evt, [NotNullWhen(false)] out string? errorFragment)
bool ValidateClef(Span<byte> evt, [NotNullWhen(false)] out string? errorFragment)
{
// Note that `errorFragment` does not include user-supplied values; we opt in to adding this to
// the ingestion log and include it using `ForPayload()`.

if (evt.Length > _config.Connection.EventSizeLimitBytes)
{
errorFragment = "an event exceeds the configured size limit";
return false;
}

var reader = new Utf8JsonReader(evt);

var foundTimestamp = false;
Expand Down Expand Up @@ -225,7 +242,6 @@ static bool ValidateClef(Span<byte> evt, [NotNullWhen(false)] out string? errorF
}

foundTimestamp = true;
break;
}
}
}
Expand Down Expand Up @@ -259,4 +275,18 @@ static async Task Write(ForwardingChannel forwardingChannel, ArrayPool<byte> poo
throw;
}
}
}

static IResult Error(HttpStatusCode statusCode, string message)
{
return Results.Json(new ErrorPart { Error = message }, statusCode: (int)statusCode);
}

static IResult SuccessfulIngestion()
{
return TypedResults.Content(
"{}",
"application/json",
Utf8,
StatusCodes.Status201Created);
}
}
Loading