Skip to content

Commit 62b65a8

Browse files
authored
Merge pull request #406 from liammclennan/opt-in-to-apikey-forwarding
Opt-in to API key forwarding. Default to using SeqCli's connection se…
2 parents f31bcb8 + e033329 commit 62b65a8

File tree

10 files changed

+223
-103
lines changed

10 files changed

+223
-103
lines changed

src/SeqCli/Config/Forwarder/SeqCliForwarderConfig.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ class SeqCliForwarderConfig
77
public SeqCliForwarderStorageConfig Storage { get; set; } = new();
88
public SeqCliForwarderDiagnosticConfig Diagnostics { get; set; } = new();
99
public SeqCliForwarderApiConfig Api { get; set; } = new();
10+
public bool UseApiKeyForwarding { get; set; }
1011
}

src/SeqCli/Config/KeyValueSettings.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public static void Set(SeqCliConfig config, string key, string? value)
5757
// would be more robust.
5858
var targetProperty = receiver.GetType().GetTypeInfo().DeclaredProperties
5959
.Where(p => p is { CanRead: true, CanWrite: true } && p.GetMethod!.IsPublic && p.SetMethod!.IsPublic && !p.GetMethod.IsStatic)
60-
.SingleOrDefault(p => Camelize(p.Name) == steps[^1]);
60+
.SingleOrDefault(p => Camelize(GetUserFacingName(p)) == steps[^1]);
6161

6262
if (targetProperty == null)
6363
throw new ArgumentException("The key could not be found; run `seqcli config list` to view all keys.");
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
using Seq.Api;
7+
using SeqCli.Config;
8+
using SeqCli.Forwarder.Filesystem.System;
9+
using Serilog;
10+
11+
namespace SeqCli.Forwarder.Channel;
12+
13+
class ApiKeyForwardingChannelWrapper : ForwardingChannelWrapper
14+
{
15+
readonly Dictionary<string, ForwardingChannel> _channelsByApiKey = new();
16+
const string EmptyApiKeyChannelId = "EmptyApiKey";
17+
18+
public ApiKeyForwardingChannelWrapper(string bufferPath, SeqConnection connection, SeqCliConfig config) : base(bufferPath, connection, config)
19+
{
20+
LoadChannels();
21+
}
22+
23+
// Start forwarding channels found on the file system.
24+
void LoadChannels()
25+
{
26+
foreach (var directoryPath in Directory.EnumerateDirectories(BufferPath))
27+
{
28+
if (directoryPath.Equals(GetStorePath(SeqCliConnectionChannelId)))
29+
{
30+
// data was stored when not using API key forwarding
31+
continue;
32+
}
33+
34+
string apiKey, channelId;
35+
36+
if (new SystemStoreDirectory(directoryPath).TryReadApiKey(Config, out var key))
37+
{
38+
apiKey = key!;
39+
channelId = directoryPath;
40+
}
41+
else
42+
{
43+
// directory should contain an api key file but does not
44+
continue;
45+
}
46+
47+
var created = OpenOrCreateChannel(channelId, apiKey);
48+
_channelsByApiKey.Add(apiKey, created);
49+
}
50+
}
51+
52+
public override ForwardingChannel GetForwardingChannel(string? requestApiKey)
53+
{
54+
lock (ChannelsSync)
55+
{
56+
// use empty string to represent no api key
57+
if (_channelsByApiKey.TryGetValue(requestApiKey ?? "", out var channel))
58+
{
59+
return channel;
60+
}
61+
62+
var channelId = ApiKeyToId(requestApiKey);
63+
var created = OpenOrCreateChannel(channelId, requestApiKey);
64+
var store = new SystemStoreDirectory(GetStorePath(channelId));
65+
store.WriteApiKey(Config, requestApiKey ?? "");
66+
_channelsByApiKey.Add(requestApiKey ?? "", created);
67+
return created;
68+
}
69+
}
70+
71+
string ApiKeyToId(string? apiKey)
72+
{
73+
return string.IsNullOrEmpty(apiKey) ? EmptyApiKeyChannelId : Guid.NewGuid().ToString();
74+
}
75+
76+
public override async Task StopAsync()
77+
{
78+
Log.ForContext<ApiKeyForwardingChannelWrapper>().Information("Flushing log buffers");
79+
ShutdownTokenSource.CancelAfter(TimeSpan.FromSeconds(30));
80+
81+
Task[] stopChannels;
82+
lock (ChannelsSync)
83+
{
84+
stopChannels = _channelsByApiKey.Values.Select(ch => ch.StopAsync()).ToArray();
85+
}
86+
87+
await Task.WhenAll([..stopChannels]);
88+
await ShutdownTokenSource.CancelAsync();
89+
}
90+
}

src/SeqCli/Forwarder/Channel/ForwardingChannelMap.cs

Lines changed: 0 additions & 96 deletions
This file was deleted.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using System.IO;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Seq.Api;
5+
using SeqCli.Config;
6+
using SeqCli.Forwarder.Filesystem.System;
7+
using SeqCli.Forwarder.Storage;
8+
using Serilog;
9+
10+
namespace SeqCli.Forwarder.Channel;
11+
12+
internal abstract class ForwardingChannelWrapper(string bufferPath, SeqConnection connection, SeqCliConfig config)
13+
{
14+
protected const string SeqCliConnectionChannelId = "SeqCliConnection";
15+
protected readonly string BufferPath = bufferPath;
16+
protected readonly SeqCliConfig Config = config;
17+
protected readonly CancellationTokenSource ShutdownTokenSource = new();
18+
protected readonly Lock ChannelsSync = new();
19+
20+
// <param name="id">The id used for the channel storage on the file system.</param>
21+
// <param name="apiKey">The apiKey that will be used to connect to the downstream Seq instance.</param>
22+
protected ForwardingChannel OpenOrCreateChannel(string id, string? apiKey)
23+
{
24+
var storePath = GetStorePath(id);
25+
var store = new SystemStoreDirectory(storePath);
26+
27+
Log.ForContext<ForwardingChannelWrapper>().Information("Opening local buffer in {StorePath}", storePath);
28+
29+
return new ForwardingChannel(
30+
BufferAppender.Open(store),
31+
BufferReader.Open(store),
32+
Bookmark.Open(store),
33+
connection,
34+
apiKey,
35+
Config.Forwarder.Storage.TargetChunkSizeBytes,
36+
Config.Forwarder.Storage.MaxChunks,
37+
Config.Connection.BatchSizeLimitBytes,
38+
ShutdownTokenSource.Token);
39+
}
40+
41+
public abstract ForwardingChannel GetForwardingChannel(string? requestApiKey);
42+
43+
public abstract Task StopAsync();
44+
45+
protected string GetStorePath(string id)
46+
{
47+
return Path.Combine(BufferPath, id);
48+
}
49+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Seq.Api;
4+
using SeqCli.Config;
5+
using Serilog;
6+
7+
namespace SeqCli.Forwarder.Channel;
8+
9+
class SeqCliConnectionForwardingChannelWrapper: ForwardingChannelWrapper
10+
{
11+
readonly ForwardingChannel _seqCliConnectionChannel;
12+
13+
public SeqCliConnectionForwardingChannelWrapper(string bufferPath, SeqConnection connection, SeqCliConfig config, string? seqCliApiKey): base(bufferPath, connection, config)
14+
{
15+
_seqCliConnectionChannel = OpenOrCreateChannel(SeqCliConnectionChannelId, seqCliApiKey);
16+
}
17+
18+
public override ForwardingChannel GetForwardingChannel(string? _)
19+
{
20+
return _seqCliConnectionChannel;
21+
}
22+
23+
public override async Task StopAsync()
24+
{
25+
Log.ForContext<SeqCliConnectionForwardingChannelWrapper>().Information("Flushing log buffers");
26+
ShutdownTokenSource.CancelAfter(TimeSpan.FromSeconds(30));
27+
28+
await _seqCliConnectionChannel.StopAsync();
29+
await ShutdownTokenSource.CancelAsync();
30+
}
31+
}

src/SeqCli/Forwarder/Filesystem/System/SystemStoreDirectory.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@
1414

1515
using System;
1616
using System.Collections.Generic;
17+
using System.Diagnostics.CodeAnalysis;
1718
using System.IO;
1819
using System.Runtime.InteropServices;
20+
using System.Text;
21+
using SeqCli.Config;
22+
using Serilog;
1923

2024
#if UNIX
2125
using SeqCli.Forwarder.Filesystem.System.Unix;
@@ -34,6 +38,34 @@ public SystemStoreDirectory(string path)
3438
if (!Directory.Exists(_directoryPath)) Directory.CreateDirectory(_directoryPath);
3539
}
3640

41+
public void WriteApiKey(SeqCliConfig config, string apiKey)
42+
{
43+
File.WriteAllBytes(
44+
Path.Combine(_directoryPath, "api.key"),
45+
config.Encryption.DataProtector().Encrypt(Encoding.UTF8.GetBytes(apiKey)));
46+
}
47+
48+
public bool TryReadApiKey(SeqCliConfig config, [NotNullWhen(true)] out string? apiKey)
49+
{
50+
apiKey = null;
51+
var path = Path.Combine(_directoryPath, "api.key");
52+
53+
if (!File.Exists(path)) return false;
54+
55+
try
56+
{
57+
var encrypted = File.ReadAllBytes(path);
58+
apiKey = Encoding.UTF8.GetString(config.Encryption.DataProtector().Decrypt(encrypted));
59+
return true;
60+
}
61+
catch (Exception exception)
62+
{
63+
Log.Warning(exception, "Could not read or decrypt api key");
64+
}
65+
66+
return false;
67+
}
68+
3769
public override SystemStoreFile Create(string name)
3870
{
3971
var filePath = Path.Combine(_directoryPath, name);

src/SeqCli/Forwarder/ForwarderModule.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,19 @@ public ForwarderModule(string bufferPath, SeqCliConfig config, SeqConnection con
4545
protected override void Load(ContainerBuilder builder)
4646
{
4747
builder.RegisterType<ServerService>().SingleInstance();
48-
builder.Register(_ => new ForwardingChannelMap(_bufferPath, _connection, _apiKey, _config)).SingleInstance();
48+
49+
if (_config.Forwarder.UseApiKeyForwarding)
50+
{
51+
builder.Register<ApiKeyForwardingChannelWrapper>(_ =>
52+
new ApiKeyForwardingChannelWrapper(_bufferPath, _connection, _config))
53+
.As<ForwardingChannelWrapper>().SingleInstance();
54+
}
55+
else
56+
{
57+
builder.Register<SeqCliConnectionForwardingChannelWrapper>(_ =>
58+
new SeqCliConnectionForwardingChannelWrapper(_bufferPath, _connection, _config, _apiKey))
59+
.As<ForwardingChannelWrapper>().SingleInstance();
60+
}
4961

5062
builder.RegisterType<IngestionEndpoints>().As<IMapEndpoints>();
5163

src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ class IngestionEndpoints : IMapEndpoints
3737
{
3838
static readonly Encoding Utf8 = new UTF8Encoding(false);
3939

40-
readonly ForwardingChannelMap _forwardingChannels;
40+
readonly ForwardingChannelWrapper _forwardingChannels;
4141
readonly SeqCliConfig _config;
4242

43-
public IngestionEndpoints(ForwardingChannelMap forwardingChannels, SeqCliConfig config)
43+
public IngestionEndpoints(ForwardingChannelWrapper forwardingChannels, SeqCliConfig config)
4444
{
4545
_forwardingChannels = forwardingChannels;
4646
_config = config;
@@ -75,7 +75,8 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
7575
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
7676
cts.CancelAfter(TimeSpan.FromSeconds(5));
7777

78-
var log = _forwardingChannels.Get(GetApiKey(context.Request));
78+
var requestApiKey = GetApiKey(context.Request);
79+
var log = _forwardingChannels.GetForwardingChannel(requestApiKey);
7980

8081
// Add one for the extra newline that we have to insert at the end of batches.
8182
var bufferSize = _config.Connection.BatchSizeLimitBytes + 1;

src/SeqCli/Forwarder/Web/Host/ServerService.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ namespace SeqCli.Forwarder.Web.Host;
2424
class ServerService
2525
{
2626
readonly IHost _host;
27-
readonly ForwardingChannelMap _forwardingChannelMap;
27+
readonly ForwardingChannelWrapper _forwardingChannelMap;
2828
readonly string _listenUri;
2929

30-
public ServerService(IHost host, ForwardingChannelMap forwardingChannelMap, string listenUri)
30+
public ServerService(IHost host, ForwardingChannelWrapper forwardingChannelMap, string listenUri)
3131
{
3232
_host = host;
3333
_forwardingChannelMap = forwardingChannelMap;

0 commit comments

Comments
 (0)