Skip to content

Commit b8d2472

Browse files
authored
Implement PostgreSQL event store for feature parity with SQL Server (#47)
1 parent 4a41e64 commit b8d2472

File tree

10 files changed

+1171
-2
lines changed

10 files changed

+1171
-2
lines changed

src/BbQ.Events.PostgreSql/BbQ.Events.PostgreSql.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
<Version>0.1.0</Version>
88
<Authors>Jean</Authors>
99
<Company>JM Mbouma</Company>
10-
<Description>PostgreSQL implementation for BbQ.Events, providing IProjectionCheckpointStore for projection checkpoints. Features durable checkpoint persistence with upsert semantics, atomic operations, and thread-safe parallel processing support.</Description>
11-
<PackageTags>events postgresql projections checkpoint persistence bbq cqrs</PackageTags>
10+
<Description>PostgreSQL implementation for BbQ.Events, providing both IEventStore for event sourcing and IProjectionCheckpointStore for projection checkpoints. Features durable event persistence with sequential positions, atomic operations, JSON serialization, and thread-safe parallel processing support.</Description>
11+
<PackageTags>events postgresql event-sourcing projections checkpoint persistence event-store bbq cqrs</PackageTags>
1212
<RepositoryUrl>https://github.com/JeanMarcMbouma/Outcome</RepositoryUrl>
1313
<PackageProjectUrl>https://github.com/JeanMarcMbouma/Outcome</PackageProjectUrl>
1414
<RepositoryType>git</RepositoryType>

src/BbQ.Events.PostgreSql/Configuration/ServiceCollectionExtensions.cs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using Microsoft.Extensions.DependencyInjection;
22
using Microsoft.Extensions.DependencyInjection.Extensions;
33
using BbQ.Events.Checkpointing;
4+
using BbQ.Events.Events;
45
using BbQ.Events.PostgreSql.Checkpointing;
6+
using BbQ.Events.PostgreSql.Events;
57

68
namespace BbQ.Events.PostgreSql.Configuration;
79

@@ -46,4 +48,85 @@ public static IServiceCollection UsePostgreSqlCheckpoints(
4648

4749
return services;
4850
}
51+
52+
/// <summary>
53+
/// Registers the PostgreSQL event store for event sourcing.
54+
/// </summary>
55+
/// <param name="services">The service collection to register with</param>
56+
/// <param name="connectionString">The PostgreSQL connection string</param>
57+
/// <returns>The service collection for chaining</returns>
58+
/// <remarks>
59+
/// This method registers IEventStore as a singleton using PostgreSqlEventStore.
60+
///
61+
/// Prerequisites:
62+
/// - PostgreSQL database must be accessible
63+
/// - bbq_events table must be created (see Schema/CreateEventsTable.sql)
64+
/// - bbq_streams table must be created (see Schema/CreateStreamsTable.sql)
65+
///
66+
/// Example usage:
67+
/// <code>
68+
/// services.UsePostgreSqlEventStore("Host=localhost;Database=mydb;Username=myuser;Password=mypass");
69+
/// </code>
70+
/// </remarks>
71+
public static IServiceCollection UsePostgreSqlEventStore(
72+
this IServiceCollection services,
73+
string connectionString)
74+
{
75+
if (string.IsNullOrWhiteSpace(connectionString))
76+
{
77+
throw new ArgumentNullException(nameof(connectionString));
78+
}
79+
80+
return services.UsePostgreSqlEventStore(options =>
81+
{
82+
options.ConnectionString = connectionString;
83+
});
84+
}
85+
86+
/// <summary>
87+
/// Registers the PostgreSQL event store for event sourcing with custom options.
88+
/// </summary>
89+
/// <param name="services">The service collection to register with</param>
90+
/// <param name="configureOptions">Action to configure event store options</param>
91+
/// <returns>The service collection for chaining</returns>
92+
/// <remarks>
93+
/// This method registers IEventStore as a singleton using PostgreSqlEventStore.
94+
///
95+
/// Prerequisites:
96+
/// - PostgreSQL database must be accessible
97+
/// - bbq_events table must be created (see Schema/CreateEventsTable.sql)
98+
/// - bbq_streams table must be created (see Schema/CreateStreamsTable.sql)
99+
///
100+
/// Example usage:
101+
/// <code>
102+
/// services.UsePostgreSqlEventStore(options =>
103+
/// {
104+
/// options.ConnectionString = "Host=localhost;Database=mydb;Username=myuser;Password=mypass";
105+
/// options.IncludeMetadata = true;
106+
/// });
107+
/// </code>
108+
/// </remarks>
109+
public static IServiceCollection UsePostgreSqlEventStore(
110+
this IServiceCollection services,
111+
Action<PostgreSqlEventStoreOptions> configureOptions)
112+
{
113+
if (configureOptions == null)
114+
{
115+
throw new ArgumentNullException(nameof(configureOptions));
116+
}
117+
118+
var options = new PostgreSqlEventStoreOptions();
119+
configureOptions(options);
120+
121+
if (string.IsNullOrWhiteSpace(options.ConnectionString))
122+
{
123+
throw new ArgumentException("Connection string must be configured via the configureOptions action.", nameof(configureOptions));
124+
}
125+
126+
// Replace any existing IEventStore registration
127+
services.Replace(ServiceDescriptor.Singleton<IEventStore>(
128+
_ => new PostgreSqlEventStore(options)));
129+
130+
return services;
131+
}
49132
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
using System.Runtime.CompilerServices;
2+
using System.Text.Json;
3+
using BbQ.Events.Events;
4+
using BbQ.Events.PostgreSql.Internal;
5+
using Npgsql;
6+
7+
namespace BbQ.Events.PostgreSql.Events;
8+
9+
/// <summary>
10+
/// PostgreSQL implementation of IEventStore.
11+
/// </summary>
12+
/// <remarks>
13+
/// This implementation provides:
14+
/// - Durable event persistence in PostgreSQL
15+
/// - Sequential position tracking per stream
16+
/// - Atomic append operations
17+
/// - Efficient event replay via streaming reads
18+
/// - JSON serialization of event data
19+
/// - Support for event metadata
20+
///
21+
/// Connection handling:
22+
/// - Each operation opens a new connection (connection pooling is handled by Npgsql)
23+
/// - Operations are fully async for optimal scalability
24+
/// - Connections are properly disposed in all code paths
25+
///
26+
/// Prerequisites:
27+
/// - bbq_events table must exist (see Schema/CreateEventsTable.sql)
28+
/// - bbq_streams table must exist (see Schema/CreateStreamsTable.sql)
29+
/// </remarks>
30+
public class PostgreSqlEventStore : IEventStore
31+
{
32+
private readonly PostgreSqlEventStoreOptions _options;
33+
private readonly JsonSerializerOptions _jsonOptions;
34+
private static readonly string MachineName = Environment.MachineName;
35+
36+
/// <summary>
37+
/// Creates a new PostgreSQL event store.
38+
/// </summary>
39+
/// <param name="options">Configuration options</param>
40+
/// <exception cref="ArgumentNullException">Thrown when options is null</exception>
41+
/// <exception cref="ArgumentException">Thrown when connection string is null or empty</exception>
42+
public PostgreSqlEventStore(PostgreSqlEventStoreOptions options)
43+
{
44+
_options = options ?? throw new ArgumentNullException(nameof(options));
45+
46+
if (string.IsNullOrWhiteSpace(_options.ConnectionString))
47+
{
48+
throw new ArgumentException("Connection string cannot be null or empty", nameof(options));
49+
}
50+
51+
_jsonOptions = _options.JsonSerializerOptions ?? new JsonSerializerOptions
52+
{
53+
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
54+
WriteIndented = false
55+
};
56+
}
57+
58+
/// <summary>
59+
/// Appends an event to a stream.
60+
/// </summary>
61+
/// <typeparam name="TEvent">The type of event</typeparam>
62+
/// <param name="stream">The stream name</param>
63+
/// <param name="event">The event to append</param>
64+
/// <param name="ct">Cancellation token</param>
65+
/// <returns>The position of the appended event</returns>
66+
/// <exception cref="ArgumentException">Thrown when stream name is null or empty</exception>
67+
/// <exception cref="ArgumentNullException">Thrown when event is null</exception>
68+
public async Task<long> AppendAsync<TEvent>(string stream, TEvent @event, CancellationToken ct = default)
69+
{
70+
if (string.IsNullOrWhiteSpace(stream))
71+
{
72+
throw new ArgumentException("Stream name cannot be null or empty", nameof(stream));
73+
}
74+
75+
if (@event == null)
76+
{
77+
throw new ArgumentNullException(nameof(@event));
78+
}
79+
80+
await using var connection = new NpgsqlConnection(_options.ConnectionString);
81+
await connection.OpenAsync(ct);
82+
83+
await using var command = connection.CreateCommand();
84+
command.CommandText = PostgreSqlConstants.AppendEventSqlSimplified;
85+
86+
var eventType = typeof(TEvent).FullName ?? typeof(TEvent).Name;
87+
var eventData = PostgreSqlHelpers.SerializeToJson(@event, _jsonOptions);
88+
89+
command.AddParameter("@stream_name", stream);
90+
command.AddParameter("@event_type", eventType);
91+
command.AddParameter("@event_data", eventData);
92+
command.AddParameter("@metadata", _options.IncludeMetadata ? CreateMetadata() : null);
93+
94+
var result = await command.ExecuteScalarAsync(ct);
95+
return Convert.ToInt64(result);
96+
}
97+
98+
/// <summary>
99+
/// Reads events from a stream starting at a given position.
100+
/// </summary>
101+
/// <typeparam name="TEvent">The type of events to read</typeparam>
102+
/// <param name="stream">The stream name</param>
103+
/// <param name="fromPosition">The position to start reading from (inclusive)</param>
104+
/// <param name="ct">Cancellation token</param>
105+
/// <returns>An async enumerable of events with their positions</returns>
106+
/// <exception cref="ArgumentException">Thrown when stream name is null or empty</exception>
107+
public async IAsyncEnumerable<StoredEvent<TEvent>> ReadAsync<TEvent>(
108+
string stream,
109+
long fromPosition = 0,
110+
[EnumeratorCancellation] CancellationToken ct = default)
111+
{
112+
if (string.IsNullOrWhiteSpace(stream))
113+
{
114+
throw new ArgumentException("Stream name cannot be null or empty", nameof(stream));
115+
}
116+
117+
await using var connection = new NpgsqlConnection(_options.ConnectionString);
118+
await connection.OpenAsync(ct);
119+
120+
await using var command = connection.CreateCommand();
121+
command.CommandText = PostgreSqlConstants.ReadEventsSql;
122+
command.AddParameter("@stream_name", stream);
123+
command.AddParameter("@from_position", fromPosition);
124+
125+
await using var reader = await command.ExecuteReaderAsync(ct);
126+
127+
while (await reader.ReadAsync(ct))
128+
{
129+
var position = reader.GetLong(PostgreSqlConstants.Position);
130+
var eventType = reader.GetString(reader.GetOrdinal(PostgreSqlConstants.EventType));
131+
var eventData = reader.GetString(reader.GetOrdinal(PostgreSqlConstants.EventData));
132+
133+
// Only deserialize if the event type matches
134+
// This allows for type filtering when reading from streams with multiple event types
135+
var expectedType = typeof(TEvent).FullName ?? typeof(TEvent).Name;
136+
if (eventType == expectedType)
137+
{
138+
var @event = PostgreSqlHelpers.DeserializeFromJson<TEvent>(eventData, _jsonOptions);
139+
yield return new StoredEvent<TEvent>(position, @event);
140+
}
141+
}
142+
}
143+
144+
/// <summary>
145+
/// Gets the current position (last event position) in a stream.
146+
/// </summary>
147+
/// <param name="stream">The stream name</param>
148+
/// <param name="ct">Cancellation token</param>
149+
/// <returns>The current position, or null if the stream doesn't exist</returns>
150+
/// <exception cref="ArgumentException">Thrown when stream name is null or empty</exception>
151+
public async Task<long?> GetStreamPositionAsync(string stream, CancellationToken ct = default)
152+
{
153+
if (string.IsNullOrWhiteSpace(stream))
154+
{
155+
throw new ArgumentException("Stream name cannot be null or empty", nameof(stream));
156+
}
157+
158+
await using var connection = new NpgsqlConnection(_options.ConnectionString);
159+
await connection.OpenAsync(ct);
160+
161+
await using var command = connection.CreateCommand();
162+
command.CommandText = PostgreSqlConstants.GetStreamPositionSql;
163+
command.AddParameter("@stream_name", stream);
164+
165+
var result = await command.ExecuteScalarAsync(ct);
166+
167+
return result == null || result == DBNull.Value
168+
? null
169+
: Convert.ToInt64(result);
170+
}
171+
172+
/// <summary>
173+
/// Creates metadata for an event.
174+
/// </summary>
175+
private string CreateMetadata()
176+
{
177+
var metadata = new Dictionary<string, object>
178+
{
179+
["timestamp"] = DateTime.UtcNow,
180+
["server"] = MachineName
181+
};
182+
183+
return PostgreSqlHelpers.SerializeToJson(metadata, _jsonOptions);
184+
}
185+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System.Text.Json;
2+
3+
namespace BbQ.Events.PostgreSql.Events;
4+
5+
/// <summary>
6+
/// Configuration options for PostgreSQL event store.
7+
/// </summary>
8+
public class PostgreSqlEventStoreOptions
9+
{
10+
/// <summary>
11+
/// Gets or sets the connection string for PostgreSQL.
12+
/// </summary>
13+
public string ConnectionString { get; set; } = string.Empty;
14+
15+
/// <summary>
16+
/// Gets or sets the JSON serializer options for event data.
17+
/// </summary>
18+
/// <remarks>
19+
/// If not provided, defaults to camelCase property naming.
20+
/// </remarks>
21+
public JsonSerializerOptions? JsonSerializerOptions { get; set; }
22+
23+
/// <summary>
24+
/// Gets or sets whether to include metadata in stored events.
25+
/// </summary>
26+
/// <remarks>
27+
/// Metadata can include correlation IDs, causation IDs, timestamps, etc.
28+
/// Default is false.
29+
/// </remarks>
30+
public bool IncludeMetadata { get; set; } = false;
31+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
namespace BbQ.Events.PostgreSql.Internal;
2+
3+
/// <summary>
4+
/// PostgreSQL constants for BbQ.Events.
5+
/// </summary>
6+
internal static class PostgreSqlConstants
7+
{
8+
// Table names
9+
public const string EventsTable = "bbq_events";
10+
public const string StreamsTable = "bbq_streams";
11+
public const string CheckpointsTable = "bbq_projection_checkpoints";
12+
13+
// Column names - Events
14+
public const string EventId = "event_id";
15+
public const string StreamName = "stream_name";
16+
public const string Position = "position";
17+
public const string EventType = "event_type";
18+
public const string EventData = "event_data";
19+
public const string Metadata = "metadata";
20+
public const string CreatedUtc = "created_utc";
21+
22+
// Column names - Streams
23+
public const string CurrentPosition = "current_position";
24+
public const string Version = "version";
25+
public const string LastUpdatedUtc = "last_updated_utc";
26+
27+
// Column names - Checkpoints
28+
public const string ProjectionName = "projection_name";
29+
public const string PartitionKey = "partition_key";
30+
31+
// SQL queries - Events
32+
// Uses CTE (Common Table Expression) for atomic upsert and event insertion
33+
public const string AppendEventSqlSimplified = @"
34+
WITH updated_stream AS (
35+
INSERT INTO bbq_streams (stream_name, current_position, version, created_utc, last_updated_utc)
36+
VALUES (@stream_name, 0, 1, (NOW() AT TIME ZONE 'UTC'), (NOW() AT TIME ZONE 'UTC'))
37+
ON CONFLICT (stream_name)
38+
DO UPDATE SET
39+
current_position = bbq_streams.current_position + 1,
40+
version = bbq_streams.version + 1,
41+
last_updated_utc = (NOW() AT TIME ZONE 'UTC')
42+
RETURNING current_position
43+
),
44+
inserted_event AS (
45+
INSERT INTO bbq_events (stream_name, position, event_type, event_data, metadata, created_utc)
46+
SELECT @stream_name, current_position, @event_type, @event_data, @metadata, (NOW() AT TIME ZONE 'UTC')
47+
FROM updated_stream
48+
RETURNING position
49+
)
50+
SELECT position FROM inserted_event;";
51+
52+
public const string ReadEventsSql = @"
53+
SELECT event_id, stream_name, position, event_type, event_data, metadata, created_utc
54+
FROM bbq_events
55+
WHERE stream_name = @stream_name AND position >= @from_position
56+
ORDER BY position";
57+
58+
public const string GetStreamPositionSql = @"
59+
SELECT current_position
60+
FROM bbq_streams
61+
WHERE stream_name = @stream_name";
62+
}

0 commit comments

Comments
 (0)