Skip to content

Commit 30d06e3

Browse files
authored
Merge pull request #420 from serverlessworkflow/feat-event-bus
Publish emitted events on internal bus
2 parents 9e7ec56 + eefda9d commit 30d06e3

File tree

24 files changed

+367
-34
lines changed

24 files changed

+367
-34
lines changed

src/api/Synapse.Api.Application/Services/CloudEventPublisher.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
using Neuroglia.Eventing.CloudEvents;
1818
using Neuroglia.Serialization;
1919
using Polly;
20+
using StackExchange.Redis;
2021
using Synapse.Api.Application.Configuration;
2122
using System.Text;
2223
using System.Threading.Channels;
@@ -27,10 +28,11 @@ namespace Synapse.Api.Application.Services;
2728
/// Represents the default implementation of the <see cref="ICloudEventPublisher"/> interface
2829
/// </summary>
2930
/// <param name="logger">The service used to perform logging</param>
31+
/// <param name="connectionMultiplexer">The service used to connect to the Redis API</param>
3032
/// <param name="options">The service used to access the current <see cref="ApiServerOptions"/></param>
3133
/// <param name="jsonSerializer">The service used to serialize/deserialize data to/from JSON</param>
3234
/// <param name="httpClient">The service used to perform HTTP requests</param>
33-
public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IJsonSerializer jsonSerializer, IOptions<ApiServerOptions> options, HttpClient httpClient)
35+
public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IConnectionMultiplexer connectionMultiplexer, IJsonSerializer jsonSerializer, IOptions<ApiServerOptions> options, HttpClient httpClient)
3436
: IHostedService, ICloudEventPublisher, IDisposable, IAsyncDisposable
3537
{
3638

@@ -41,6 +43,16 @@ public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IJsonSeria
4143
/// </summary>
4244
protected ILogger Logger { get; } = logger;
4345

46+
/// <summary>
47+
/// Gets the service used to connect to the Redis API
48+
/// </summary>
49+
protected IConnectionMultiplexer ConnectionMultiplexer { get; } = connectionMultiplexer;
50+
51+
/// <summary>
52+
/// Gets the Redis database to use
53+
/// </summary>
54+
protected StackExchange.Redis.IDatabase Database { get; } = connectionMultiplexer.GetDatabase();
55+
4456
/// <summary>
4557
/// Gets the service used to serialize/deserialize data to/from JSON
4658
/// </summary>
@@ -66,11 +78,28 @@ public class CloudEventPublisher(ILogger<CloudEventPublisher> logger, IJsonSeria
6678
/// </summary>
6779
protected Channel<CloudEvent> Channel { get; } = System.Threading.Channels.Channel.CreateUnbounded<CloudEvent>();
6880

81+
/// <summary>
82+
/// Gets a boolean indicating whether or not the Redis version used by the cloud event publisher supports streaming commands
83+
/// </summary>
84+
protected bool SupportsStreaming { get; private set; }
85+
6986
/// <inheritdoc/>
7087
public virtual Task StartAsync(CancellationToken cancellationToken)
7188
{
7289
if (options.Value.CloudEvents.Endpoint == null) logger.LogWarning("No endpoint configured for cloud events. Events will not be published.");
7390
else _ = this.PublishEnqueuedEventsAsync();
91+
var version = ((string)(this.Database.Execute("INFO", "server"))!).Split('\n').FirstOrDefault(line => line.StartsWith("redis_version:"))?[14..]?.Trim() ?? "undetermined";
92+
try
93+
{
94+
this.Database.StreamInfo(SynapseDefaults.CloudEvents.Bus.StreamName);
95+
this.SupportsStreaming = true;
96+
this.Logger.LogInformation("Redis server version '{version}' supports streaming commands. Streaming feature is enabled", version);
97+
}
98+
catch (RedisServerException ex) when (ex.Message.StartsWith("ERR unknown command"))
99+
{
100+
this.SupportsStreaming = false;
101+
this.Logger.LogInformation("Redis server version '{version}' does not support streaming commands. Streaming feature is emulated using lists", version);
102+
}
74103
return Task.CompletedTask;
75104
}
76105

@@ -79,6 +108,13 @@ public virtual async Task PublishAsync(CloudEvent e, CancellationToken cancellat
79108
{
80109
ArgumentNullException.ThrowIfNull(e);
81110
await this.Channel.Writer.WriteAsync(e, cancellationToken).ConfigureAwait(false);
111+
var json = this.JsonSerializer.SerializeToText(e);
112+
if (this.SupportsStreaming) await this.Database.StreamAddAsync(SynapseDefaults.CloudEvents.Bus.StreamName, SynapseDefaults.CloudEvents.Bus.EventFieldName, json).ConfigureAwait(false);
113+
else
114+
{
115+
await this.Database.ListLeftPushAsync(SynapseDefaults.CloudEvents.Bus.StreamName, json).ConfigureAwait(false);
116+
await this.Database.ScriptEvaluateAsync(SynapseDefaults.CloudEvents.Bus.MessageDistributionScript, [SynapseDefaults.CloudEvents.Bus.StreamName, SynapseDefaults.CloudEvents.Bus.ConsumerGroupListKey]).ConfigureAwait(false);
117+
}
82118
}
83119

84120
/// <inheritdoc/>

src/api/Synapse.Api.Application/Synapse.Api.Application.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<NeutralLanguage>en</NeutralLanguage>
88
<GenerateDocumentationFile>True</GenerateDocumentationFile>
99
<VersionPrefix>1.0.0</VersionPrefix>
10-
<VersionSuffix>alpha2</VersionSuffix>
10+
<VersionSuffix>alpha3</VersionSuffix>
1111
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
1212
<FileVersion>$(VersionPrefix)</FileVersion>
1313
<Authors>The Synapse Authors</Authors>

src/api/Synapse.Api.Client.Core/Synapse.Api.Client.Core.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<NeutralLanguage>en</NeutralLanguage>
88
<GenerateDocumentationFile>True</GenerateDocumentationFile>
99
<VersionPrefix>1.0.0</VersionPrefix>
10-
<VersionSuffix>alpha2</VersionSuffix>
10+
<VersionSuffix>alpha3</VersionSuffix>
1111
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
1212
<FileVersion>$(VersionPrefix)</FileVersion>
1313
<Authors>The Synapse Authors</Authors>

src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<NeutralLanguage>en</NeutralLanguage>
88
<GenerateDocumentationFile>True</GenerateDocumentationFile>
99
<VersionPrefix>1.0.0</VersionPrefix>
10-
<VersionSuffix>alpha2</VersionSuffix>
10+
<VersionSuffix>alpha3</VersionSuffix>
1111
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
1212
<FileVersion>$(VersionPrefix)</FileVersion>
1313
<Authors>The Synapse Authors</Authors>

src/api/Synapse.Api.Http/Synapse.Api.Http.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<OutputType>Library</OutputType>
99
<GenerateDocumentationFile>True</GenerateDocumentationFile>
1010
<VersionPrefix>1.0.0</VersionPrefix>
11-
<VersionSuffix>alpha2</VersionSuffix>
11+
<VersionSuffix>alpha3</VersionSuffix>
1212
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
1313
<FileVersion>$(VersionPrefix)</FileVersion>
1414
<Authors>The Synapse Authors</Authors>

src/api/Synapse.Api.Server/Synapse.Api.Server.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<NeutralLanguage>en</NeutralLanguage>
88
<GenerateDocumentationFile>True</GenerateDocumentationFile>
99
<VersionPrefix>1.0.0</VersionPrefix>
10-
<VersionSuffix>alpha2</VersionSuffix>
10+
<VersionSuffix>alpha3</VersionSuffix>
1111
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
1212
<FileVersion>$(VersionPrefix)</FileVersion>
1313
<Authors>The Synapse Authors</Authors>

src/cli/Synapse.Cli/Synapse.Cli.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<NeutralLanguage>en</NeutralLanguage>
99
<GenerateDocumentationFile>True</GenerateDocumentationFile>
1010
<VersionPrefix>1.0.0</VersionPrefix>
11-
<VersionSuffix>alpha2</VersionSuffix>
11+
<VersionSuffix>alpha3</VersionSuffix>
1212
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
1313
<FileVersion>$(VersionPrefix)</FileVersion>
1414
<Authors>The Synapse Authors</Authors>

src/core/Synapse.Core.Infrastructure.Containers.Docker/Synapse.Core.Infrastructure.Containers.Docker.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<NeutralLanguage>en</NeutralLanguage>
88
<GenerateDocumentationFile>True</GenerateDocumentationFile>
99
<VersionPrefix>1.0.0</VersionPrefix>
10-
<VersionSuffix>alpha2</VersionSuffix>
10+
<VersionSuffix>alpha3</VersionSuffix>
1111
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
1212
<FileVersion>$(VersionPrefix)</FileVersion>
1313
<Authors>The Synapse Authors</Authors>

src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/Synapse.Core.Infrastructure.Containers.Kubernetes.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<NeutralLanguage>en</NeutralLanguage>
88
<GenerateDocumentationFile>True</GenerateDocumentationFile>
99
<VersionPrefix>1.0.0</VersionPrefix>
10-
<VersionSuffix>alpha2</VersionSuffix>
10+
<VersionSuffix>alpha3</VersionSuffix>
1111
<AssemblyVersion>$(VersionPrefix)</AssemblyVersion>
1212
<FileVersion>$(VersionPrefix)</FileVersion>
1313
<Authors>The Synapse Authors</Authors>

src/core/Synapse.Core.Infrastructure/Services/DatabaseInitializer.cs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// limitations under the License.
1313

1414
using Microsoft.Extensions.Logging;
15+
using Neuroglia;
1516
using Neuroglia.Data.Infrastructure.ResourceOriented;
1617
using Synapse.Resources;
18+
using System.Net;
1719
using System.Security.Cryptography;
1820

1921
namespace Synapse.Core.Infrastructure.Services;
@@ -29,28 +31,33 @@ public class DatabaseInitializer(ILoggerFactory loggerFactory, IServiceProvider
2931
/// <inheritdoc/>
3032
protected override async Task SeedAsync(CancellationToken cancellationToken)
3133
{
32-
foreach (var definition in SynapseDefaults.Resources.Definitions.AsEnumerable()) await this.Database.CreateResourceAsync(definition, cancellationToken: cancellationToken).ConfigureAwait(false);
34+
foreach (var definition in SynapseDefaults.Resources.Definitions.AsEnumerable())
35+
{
36+
try { await this.Database.CreateResourceAsync(definition, cancellationToken: cancellationToken).ConfigureAwait(false); }
37+
catch (ProblemDetailsException ex) when (ex.Problem.Status == (int)HttpStatusCode.Conflict || (ex.Problem.Status == (int)HttpStatusCode.BadRequest && ex.Problem.Title == "Conflict")) { continue; }
38+
}
3339
var keyBytes = new byte[64];
3440
using var rng = RandomNumberGenerator.Create();
3541
rng.GetBytes(keyBytes);
3642
var key = Convert.ToBase64String(keyBytes);
37-
await this.Database.CreateResourceAsync(new ServiceAccount()
43+
try
3844
{
39-
Metadata = new()
40-
{
41-
Namespace = Namespace.DefaultNamespaceName,
42-
Name = ServiceAccount.DefaultServiceAccountName
43-
},
44-
Spec = new()
45+
await this.Database.CreateResourceAsync(new ServiceAccount()
4546
{
46-
Key = key,
47-
Claims = new Dictionary<string, string>()
47+
Metadata = new()
4848
{
49-
49+
Namespace = Namespace.DefaultNamespaceName,
50+
Name = ServiceAccount.DefaultServiceAccountName
51+
},
52+
Spec = new()
53+
{
54+
Key = key,
55+
Claims = new Dictionary<string, string>() { }
5056
}
51-
}
52-
}, false, cancellationToken).ConfigureAwait(false);
53-
await this.Database.InitializeAsync(cancellationToken);
57+
}, false, cancellationToken).ConfigureAwait(false);
58+
await this.Database.InitializeAsync(cancellationToken);
59+
}
60+
catch (ProblemDetailsException ex) when (ex.Problem.Status == (int)HttpStatusCode.Conflict || (ex.Problem.Status == (int)HttpStatusCode.BadRequest && ex.Problem.Title == "Conflict")) { }
5461
}
5562

5663
}

0 commit comments

Comments
 (0)