Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/workflows/publish-postgresql-package.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Publish BbQ.Events.PostgreSql to NuGet

on:
push:
tags:
- 'postgresql-v*' # Triggers on tags like postgresql-v0.1.0, postgresql-v1.0.0

jobs:
build-and-publish:
runs-on: ubuntu-latest
permissions:
contents: read

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: '8.0.x'

- name: Restore dependencies
run: dotnet restore src/BbQ.Events.PostgreSql/BbQ.Events.PostgreSql.csproj

- name: Build
run: dotnet build src/BbQ.Events.PostgreSql/BbQ.Events.PostgreSql.csproj --configuration Release --no-restore

- name: Test
run: dotnet test tests/BbQ.Events.PostgreSql.Tests/BbQ.Events.PostgreSql.Tests.csproj --configuration Release --verbosity normal
# Tests use Testcontainers which requires Docker

- name: Pack
run: dotnet pack src/BbQ.Events.PostgreSql/BbQ.Events.PostgreSql.csproj --configuration Release --no-build --output ./artifacts

- name: Push to NuGet
run: dotnet nuget push ./artifacts/*.nupkg --api-key ${{ secrets.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json --skip-duplicate
env:
NUGET_API_KEY: ${{ secrets.NUGET_API_KEY }}
2 changes: 2 additions & 0 deletions Outcome.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
<Project Path="src/BbQ.Events/BbQ.Events.csproj" />
<Project Path="src/BbQ.Events.SourceGenerators/BbQ.Events.SourceGenerators.csproj" />
<Project Path="src/BbQ.Events.SqlServer/BbQ.Events.SqlServer.csproj" />
<Project Path="src/BbQ.Events.PostgreSql/BbQ.Events.PostgreSql.csproj" />
<Project Path="tests/BbQ.Events.SqlServer.Tests/BbQ.Events.SqlServer.Tests.csproj" />
<Project Path="tests/BbQ.Events.PostgreSql.Tests/BbQ.Events.PostgreSql.Tests.csproj" />
</Folder>
<Project Path="src/Outcome.SourceGenerators/Outcome.SourceGenerators.csproj" />
<Project Path="tests/Outcome.Tests/Outcome.Tests.csproj" />
Expand Down
47 changes: 47 additions & 0 deletions src/BbQ.Events.PostgreSql/BbQ.Events.PostgreSql.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>

<PackageId>BbQ.Events.PostgreSql</PackageId>
<Version>0.1.0</Version>
<Authors>Jean</Authors>
<Company>JM Mbouma</Company>
<Description>PostgreSQL implementation for BbQ.Events, providing IProjectionCheckpointStore for projection checkpoints. Features durable checkpoint persistence with upsert semantics, atomic operations, and thread-safe parallel processing support.</Description>
<PackageTags>events postgresql projections checkpoint persistence bbq cqrs</PackageTags>
<RepositoryUrl>https://github.com/JeanMarcMbouma/Outcome</RepositoryUrl>
<PackageProjectUrl>https://github.com/JeanMarcMbouma/Outcome</PackageProjectUrl>
<RepositoryType>git</RepositoryType>
<LicenseExpression>MIT</LicenseExpression>
<PackageReadmeFile>README.md</PackageReadmeFile>

<!-- Build settings -->
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<LangVersion>latest</LangVersion>
<RootNamespace>BbQ.Events.PostgreSql</RootNamespace>

<!-- Source Link settings -->
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
<Title>BbQ.Events.PostgreSql</Title>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Npgsql" Version="8.0.5" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\BbQ.Events\BbQ.Events.csproj" />
</ItemGroup>

<ItemGroup>
<!-- Include README in the NuGet package -->
<None Include="README.md" Pack="true" PackagePath="" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
using BbQ.Events.Checkpointing;
using Npgsql;

namespace BbQ.Events.PostgreSql.Checkpointing;

/// <summary>
/// PostgreSQL implementation of projection checkpoint storage.
///
/// This implementation provides:
/// - Durable persistence of projection checkpoints in PostgreSQL
/// - Atomic upserts via INSERT ... ON CONFLICT statement
/// - Thread-safe parallel processing support
/// - Support for both partitioned and non-partitioned projections
/// </summary>
/// <remarks>
/// This checkpoint store uses Npgsql (PostgreSQL ADO.NET provider) for minimal dependencies and maximum performance.
/// Checkpoints are stored in a table with atomic INSERT ... ON CONFLICT operations to prevent race conditions.
///
/// The implementation supports partitioned projections through the database schema, though
/// the current IProjectionCheckpointStore interface doesn't expose partition key parameters.
/// The partition_key column is nullable and defaults to NULL for non-partitioned projections.
///
/// Connection handling:
/// - Each operation opens a new connection (connection pooling is handled by Npgsql)
/// - Operations are fully async for optimal scalability
/// - Connections are properly disposed in all code paths
/// </remarks>
public class PostgreSqlProjectionCheckpointStore : IProjectionCheckpointStore
{
private readonly string _connectionString;

/// <summary>
/// Creates a new PostgreSQL checkpoint store.
/// </summary>
/// <param name="connectionString">The PostgreSQL connection string</param>
/// <exception cref="ArgumentNullException">Thrown when connectionString is null or empty</exception>
public PostgreSqlProjectionCheckpointStore(string connectionString)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
throw new ArgumentNullException(nameof(connectionString));
}

_connectionString = connectionString;
}

/// <summary>
/// Gets the last checkpoint for a specific projection.
/// </summary>
/// <param name="projectionName">The unique name of the projection</param>
/// <param name="ct">Cancellation token</param>
/// <returns>The checkpoint position, or null if no checkpoint exists</returns>
public async ValueTask<long?> GetCheckpointAsync(string projectionName, CancellationToken ct = default)
{
if (string.IsNullOrWhiteSpace(projectionName))
{
throw new ArgumentNullException(nameof(projectionName));
}

await using var connection = new NpgsqlConnection(_connectionString);
await connection.OpenAsync(ct);

await using var command = connection.CreateCommand();
command.CommandText = @"
SELECT position
FROM bbq_projection_checkpoints
WHERE projection_name = @projection_name
AND partition_key IS NULL";

command.Parameters.AddWithValue("@projection_name", projectionName);

var result = await command.ExecuteScalarAsync(ct);

return result == null || result == DBNull.Value
? null
: Convert.ToInt64(result);
}

/// <summary>
/// Saves a checkpoint for a specific projection.
/// </summary>
/// <param name="projectionName">The unique name of the projection</param>
/// <param name="checkpoint">The checkpoint position to save</param>
/// <param name="ct">Cancellation token</param>
/// <returns>A task that completes when the checkpoint has been saved</returns>
/// <remarks>
/// This method uses PostgreSQL's INSERT ... ON CONFLICT statement for atomic upsert operations.
/// It's safe to call concurrently from multiple threads/processes.
/// </remarks>
public async ValueTask SaveCheckpointAsync(string projectionName, long checkpoint, CancellationToken ct = default)
{
if (string.IsNullOrWhiteSpace(projectionName))
{
throw new ArgumentNullException(nameof(projectionName));
}

await using var connection = new NpgsqlConnection(_connectionString);
await connection.OpenAsync(ct);

await using var command = connection.CreateCommand();
command.CommandText = @"
INSERT INTO bbq_projection_checkpoints (projection_name, partition_key, position, updated_at)
VALUES (@projection_name, NULL, @position, NOW())
ON CONFLICT (projection_name, partition_key)
DO UPDATE SET position = EXCLUDED.position, updated_at = NOW()";

command.Parameters.AddWithValue("@projection_name", projectionName);
command.Parameters.AddWithValue("@position", checkpoint);

await command.ExecuteNonQueryAsync(ct);
}

/// <summary>
/// Resets the checkpoint for a specific projection.
/// </summary>
/// <param name="projectionName">The unique name of the projection</param>
/// <param name="ct">Cancellation token</param>
/// <returns>A task that completes when the checkpoint has been reset</returns>
public async ValueTask ResetCheckpointAsync(string projectionName, CancellationToken ct = default)
{
if (string.IsNullOrWhiteSpace(projectionName))
{
throw new ArgumentNullException(nameof(projectionName));
}

await using var connection = new NpgsqlConnection(_connectionString);
await connection.OpenAsync(ct);

await using var command = connection.CreateCommand();
command.CommandText = @"
DELETE FROM bbq_projection_checkpoints
WHERE projection_name = @projection_name
AND partition_key IS NULL";

command.Parameters.AddWithValue("@projection_name", projectionName);

await command.ExecuteNonQueryAsync(ct);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using BbQ.Events.Checkpointing;
using BbQ.Events.PostgreSql.Checkpointing;

namespace BbQ.Events.PostgreSql.Configuration;

/// <summary>
/// Extension methods for registering PostgreSQL checkpoint store with dependency injection.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Registers the PostgreSQL checkpoint store for projection checkpointing.
/// </summary>
/// <param name="services">The service collection to register with</param>
/// <param name="connectionString">The PostgreSQL connection string</param>
/// <returns>The service collection for chaining</returns>
/// <remarks>
/// This method registers IProjectionCheckpointStore as a singleton using PostgreSqlProjectionCheckpointStore.
///
/// Prerequisites:
/// - PostgreSQL database must be accessible
/// - bbq_projection_checkpoints table must be created (see README for schema)
///
/// Example usage:
/// <code>
/// services.AddInMemoryEventBus();
/// services.AddProjection&lt;MyProjection&gt;();
/// services.UsePostgreSqlCheckpoints("Host=localhost;Database=mydb;Username=myuser;Password=mypass");
/// services.AddProjectionEngine();
/// </code>
/// </remarks>
public static IServiceCollection UsePostgreSqlCheckpoints(
this IServiceCollection services,
string connectionString)
{
if (string.IsNullOrWhiteSpace(connectionString))
{
throw new ArgumentNullException(nameof(connectionString));
}

// Replace any existing IProjectionCheckpointStore registration
services.Replace(ServiceDescriptor.Singleton<IProjectionCheckpointStore>(
_ => new PostgreSqlProjectionCheckpointStore(connectionString)));

return services;
}
}
Loading
Loading