Skip to content

Commit f002f6a

Browse files
committed
feat(Solution): Implemented an internal cloud event bus, allowing solution components to correlate internally emitted events
Closes #419 Signed-off-by: Charles d'Avernas <[email protected]>
1 parent 9e7ec56 commit f002f6a

File tree

7 files changed

+350
-17
lines changed

7 files changed

+350
-17
lines changed

src/api/Synapse.Api.Application/Services/CloudEventPublisher.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
using Neuroglia.Eventing.CloudEvents;
1818
using Neuroglia.Serialization;
1919
using Polly;
20+
using StackExchange.Redis;
2021
using Synapse.Api.Application.Configuration;
2122
using System.Text;
2223
using System.Threading.Channels;
@@ -27,10 +28,11 @@ namespace Synapse.Api.Application.Services;
2728
/// Represents the default implementation of the <see cref="ICloudEventPublisher"/> interface
2829
/// </summary>
2930
/// <param name="logger">The service used to perform logging</param>
31+
/// <param name="connectionMultiplexer">The service used to connect to the Redis API</param>
3032
/// <param name="options">The service used to access the current <see cref="ApiServerOptions"/></param>
3133
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON</param>
3234
/// <param name="httpClient">The service used to perform HTTP requests</param>
33-
public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IJsonSerializer jsonSerializer, IOptions<ApiServerOptions> options, HttpClient httpClient)
35+
public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IConnectionMultiplexer connectionMultiplexer, IJsonSerializer jsonSerializer, IOptions<ApiServerOptions> options, HttpClient httpClient)
3436
: IHostedService, ICloudEventPublisher, IDisposable, IAsyncDisposable
3537
{
3638

@@ -41,6 +43,16 @@ public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IJsonSeria
4143
/// </summary>
4244
protected ILogger Logger { get; } = logger;
4345

46+
/// <summary>
47+
/// Gets the service used to connect to the Redis API
48+
/// </summary>
49+
protected IConnectionMultiplexer ConnectionMultiplexer { get; } = connectionMultiplexer;
50+
51+
/// <summary>
52+
/// Gets the Redis database to use
53+
/// </summary>
54+
protected StackExchange.Redis.IDatabase Database { get; } = connectionMultiplexer.GetDatabase();
55+
4456
/// <summary>
4557
/// Gets the service used to serialize/deserialize data to/from JSON
4658
/// </summary>
@@ -66,11 +78,28 @@ public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IJsonSeria
6678
/// </summary>
6779
protected Channel<CloudEvent> Channel { get; } = System.Threading.Channels.Channel.CreateUnbounded<CloudEvent>();
6880

81+
/// <summary>
82+
/// Gets a boolean indicating whether or not the Redis version used by the cloud event publisher supports streaming commands
83+
/// </summary>
84+
protected bool SupportsStreaming { get; private set; }
85+
6986
/// <inheritdoc/>
7087
public virtual Task StartAsync(CancellationToken cancellationToken)
7188
{
7289
if (options.Value.CloudEvents.Endpoint == null) logger.LogWarning("No endpoint configured for cloud events. Events will not be published.");
7390
else _ = this.PublishEnqueuedEventsAsync();
91+
var version = ((string)(this.Database.Execute("INFO", "server"))!).Split('\n').FirstOrDefault(line => line.StartsWith("redis_version:"))?[14..]?.Trim() ?? "undetermined";
92+
try
93+
{
94+
this.Database.StreamInfo(SynapseDefaults.CloudEvents.Bus.StreamName);
95+
this.SupportsStreaming = true;
96+
this.Logger.LogInformation("Redis server version '{version}' supports streaming commands. Streaming feature is enabled", version);
97+
}
98+
catch (RedisServerException ex) when (ex.Message.StartsWith("ERR unknown command"))
99+
{
100+
this.SupportsStreaming = false;
101+
this.Logger.LogInformation("Redis server version '{version}' does not support streaming commands. Streaming feature is emulated using lists", version);
102+
}
74103
return Task.CompletedTask;
75104
}
76105

@@ -79,6 +108,13 @@ public virtual async Task PublishAsync(CloudEvent e, CancellationToken cancellat
79108
{
80109
ArgumentNullException.ThrowIfNull(e);
81110
await this.Channel.Writer.WriteAsync(e, cancellationToken).ConfigureAwait(false);
111+
var json = this.JsonSerializer.SerializeToText(e);
112+
if (this.SupportsStreaming) await this.Database.StreamAddAsync(SynapseDefaults.CloudEvents.Bus.StreamName, SynapseDefaults.CloudEvents.Bus.EventFieldName, json).ConfigureAwait(false);
113+
else
114+
{
115+
await this.Database.ListLeftPushAsync(SynapseDefaults.CloudEvents.Bus.StreamName, json).ConfigureAwait(false);
116+
await this.Database.ScriptEvaluateAsync(SynapseDefaults.CloudEvents.Bus.MessageDistributionScript, [SynapseDefaults.CloudEvents.Bus.StreamName, SynapseDefaults.CloudEvents.Bus.ConsumerGroupListKey]).ConfigureAwait(false);
117+
}
82118
}
83119

84120
/// <inheritdoc/>

src/core/Synapse.Core.Infrastructure/Services/DatabaseInitializer.cs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// limitations under the License.
1313

1414
using Microsoft.Extensions.Logging;
15+
using Neuroglia;
1516
using Neuroglia.Data.Infrastructure.ResourceOriented;
1617
using Synapse.Resources;
18+
using System.Net;
1719
using System.Security.Cryptography;
1820

1921
namespace Synapse.Core.Infrastructure.Services;
@@ -29,28 +31,33 @@ public class DatabaseInitializer(ILoggerFactory loggerFactory, IServiceProvider
2931
/// <inheritdoc/>
3032
protected override async Task SeedAsync(CancellationToken cancellationToken)
3133
{
32-
foreach (var definition in SynapseDefaults.Resources.Definitions.AsEnumerable()) await this.Database.CreateResourceAsync(definition, cancellationToken: cancellationToken).ConfigureAwait(false);
34+
foreach (var definition in SynapseDefaults.Resources.Definitions.AsEnumerable())
35+
{
36+
try { await this.Database.CreateResourceAsync(definition, cancellationToken: cancellationToken).ConfigureAwait(false); }
37+
catch (ProblemDetailsException ex) when (ex.Problem.Status == (int)HttpStatusCode.Conflict || (ex.Problem.Status == (int)HttpStatusCode.BadRequest && ex.Problem.Title == "Conflict")) { continue; }
38+
}
3339
var keyBytes = new byte[64];
3440
using var rng = RandomNumberGenerator.Create();
3541
rng.GetBytes(keyBytes);
3642
var key = Convert.ToBase64String(keyBytes);
37-
await this.Database.CreateResourceAsync(new ServiceAccount()
43+
try
3844
{
39-
Metadata = new()
40-
{
41-
Namespace = Namespace.DefaultNamespaceName,
42-
Name = ServiceAccount.DefaultServiceAccountName
43-
},
44-
Spec = new()
45+
await this.Database.CreateResourceAsync(new ServiceAccount()
4546
{
46-
Key = key,
47-
Claims = new Dictionary<string, string>()
47+
Metadata = new()
4848
{
49-
49+
Namespace = Namespace.DefaultNamespaceName,
50+
Name = ServiceAccount.DefaultServiceAccountName
51+
},
52+
Spec = new()
53+
{
54+
Key = key,
55+
Claims = new Dictionary<string, string>() { }
5056
}
51-
}
52-
}, false, cancellationToken).ConfigureAwait(false);
53-
await this.Database.InitializeAsync(cancellationToken);
57+
}, false, cancellationToken).ConfigureAwait(false);
58+
await this.Database.InitializeAsync(cancellationToken);
59+
}
60+
catch (ProblemDetailsException ex) when (ex.Problem.Status == (int)HttpStatusCode.Conflict || (ex.Problem.Status == (int)HttpStatusCode.BadRequest && ex.Problem.Title == "Conflict")) { continue; }
5461
}
5562

5663
}

src/core/Synapse.Core/SynapseDefaults.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,40 @@ public static class CloudEvents
5353
/// </summary>
5454
public const string TypePrefix = "io.synapse-wfms.events.";
5555

56+
/// <summary>
57+
/// Exposes constants about the Synapse cloud event bus
58+
/// </summary>
59+
public static class Bus
60+
{
61+
62+
/// <summary>
63+
/// Gets the name of the stream used to observe cloud events published to the Synapse cloud event bus
64+
/// </summary>
65+
public const string StreamName = "cloud-events";
66+
/// <summary>
67+
/// Gets the name of the field used to store the serialized cloud event
68+
/// </summary>
69+
public const string EventFieldName = "event";
70+
/// <summary>
71+
/// Gets the key of the list used to store all existing consumer groups
72+
/// </summary>
73+
public const string ConsumerGroupListKey = "cloud-events-consumer-groups";
74+
/// <summary>
75+
/// Gets the LUA script used to distribute cloud event bus messages amongst consumer groups
76+
/// </summary>
77+
public static string MessageDistributionScript = @"
78+
local message = redis.call('RPOP', KEYS[1]);
79+
if message then
80+
local consumerGroups = redis.call('SMEMBERS', KEYS[2]);
81+
for _, group in ipairs(consumerGroups) do
82+
redis.call('LPUSH', group, message);
83+
end;
84+
end;
85+
return message;
86+
";
87+
88+
}
89+
5690
/// <summary>
5791
/// Exposes constants about workflow-related cloud events
5892
/// </summary>
@@ -497,6 +531,24 @@ public static class Correlator
497531
/// </summary>
498532
public const string Name = Prefix + "NAME";
499533

534+
/// <summary>
535+
/// Exposes constants about the cloud events related environment variables of a correlator
536+
/// </summary>
537+
public static class Events
538+
{
539+
540+
/// <summary>
541+
/// Gets the prefix for all correlator related environment variables
542+
/// </summary>
543+
public const string Prefix = Correlator.Prefix + "EVENTS_";
544+
545+
/// <summary>
546+
/// Gets the name of the consumer group the correlator to configure belongs to, if any
547+
/// </summary>
548+
public const string ConsumerGroup = Prefix + "CONSUMER";
549+
550+
}
551+
500552
}
501553

502554
/// <summary>
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright © 2024-Present The Synapse Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"),
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
namespace Synapse.Correlator.Configuration;
15+
16+
/// <summary>
17+
/// Represents the options used to configure the way a Synapse Correlator consumes cloud events
18+
/// </summary>
19+
public class CloudEventConsumptionOptions
20+
{
21+
22+
/// <summary>
23+
/// Gets/sets the name of the correlator's consumer group, if any. If not set, defaults to a concatenation of the correlators name and namespace.
24+
/// </summary>
25+
public virtual string? ConsumerGroup { get; set; }
26+
27+
}

src/correlator/Synapse.Correlator/Configuration/CorrelatorOptions.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@ public class CorrelatorOptions
2424
/// </summary>
2525
public CorrelatorOptions()
2626
{
27-
Namespace = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Correlator.Namespace)!;
28-
Name = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Correlator.Name)!;
27+
this.Namespace = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Correlator.Namespace)!;
28+
this.Name = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Correlator.Name)!;
29+
var env = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Correlator.Events.ConsumerGroup);
30+
if (!string.IsNullOrWhiteSpace(env))
31+
{
32+
this.Events ??= new();
33+
this.Events.ConsumerGroup = env;
34+
}
2935
}
3036

3137
/// <summary>
@@ -38,4 +44,9 @@ public CorrelatorOptions()
3844
/// </summary>
3945
public virtual string Name { get; set; }
4046

47+
/// <summary>
48+
/// Gets/sets the options used to configure the way a Synapse Correlator consumes cloud events
49+
/// </summary>
50+
public virtual CloudEventConsumptionOptions? Events { get; set; }
51+
4152
}

src/correlator/Synapse.Correlator/Program.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
builder.Services.AddCloudEventBus();
3232
builder.Services.AddTransient<IRequestHandler<IngestCloudEventCommand, IOperationResult>, IngestCloudEventCommandHandler>();
3333

34+
builder.Services.AddSingleton<RedisCloudEventIngestor>();
35+
builder.Services.AddHostedService(provider => provider.GetRequiredService<RedisCloudEventIngestor>());
36+
3437
builder.Services.AddScoped<CorrelatorController>();
3538
builder.Services.AddScoped<ICorrelatorController>(provider => provider.GetRequiredService<CorrelatorController>());
3639

0 commit comments

Comments
 (0)