Skip to content

Commit bf64782

Browse files
committed
We have something working
1 parent 6525de8 commit bf64782

File tree

3 files changed

+34
-30
lines changed

3 files changed

+34
-30
lines changed

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLFailedAuditStorage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL;
77
using ServiceControl.Audit.Persistence;
88
class PostgreSQLFailedAuditStorage : IFailedAuditStorage
99
{
10-
public Task<int> GetFailedAuditsCount() => throw new NotImplementedException();
10+
public Task<int> GetFailedAuditsCount() => Task.FromResult(0);
1111
public Task ProcessFailedMessages(Func<FailedTransportMessage, Func<CancellationToken, Task>, CancellationToken, Task> onMessage, CancellationToken cancellationToken) => throw new NotImplementedException();
1212
public Task SaveFailedAuditImport(FailedAuditImport message) => throw new NotImplementedException();
1313
}

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceInstaller.cs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,24 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL;
66
using Microsoft.Extensions.Hosting;
77
using Npgsql;
88

9-
class PostgreSQLPersistenceInstaller(DatabaseConfiguration databaseConfiguration, PostgreSQLConnectionFactory connectionFactory) : BackgroundService
9+
class PostgreSQLPersistenceInstaller(DatabaseConfiguration databaseConfiguration, PostgreSQLConnectionFactory connectionFactory) : IHostedService
1010
{
11-
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
11+
public async Task StartAsync(CancellationToken cancellationToken)
1212
{
13-
using var connection = await connectionFactory.OpenConnection(stoppingToken);
13+
using var adminConnection = await connectionFactory.OpenAdminConnection(cancellationToken);
1414

15-
using (var cmd = new NpgsqlCommand($"SELECT 1 FROM pg_database WHERE datname = @dbname", connection))
15+
using (var cmd = new NpgsqlCommand($"SELECT 1 FROM pg_database WHERE datname = @dbname", adminConnection))
1616
{
1717
cmd.Parameters.AddWithValue("@dbname", databaseConfiguration.Name);
18-
var exists = await cmd.ExecuteScalarAsync(stoppingToken);
18+
var exists = await cmd.ExecuteScalarAsync(cancellationToken);
1919
if (exists == null)
2020
{
21-
using var createCmd = new NpgsqlCommand($"CREATE DATABASE \"{databaseConfiguration.Name}\"", connection);
22-
await createCmd.ExecuteNonQueryAsync(stoppingToken);
21+
using var createCmd = new NpgsqlCommand($"CREATE DATABASE \"{databaseConfiguration.Name}\"", adminConnection);
22+
await createCmd.ExecuteNonQueryAsync(cancellationToken);
2323
}
2424
}
2525

26+
using var connection = await connectionFactory.OpenConnection(cancellationToken);
2627
// Create processed_messages table
2728
using (var cmd = new NpgsqlCommand(@"
2829
CREATE TABLE IF NOT EXISTS processed_messages (
@@ -35,7 +36,7 @@ CREATE TABLE IF NOT EXISTS processed_messages (
3536
message_id TEXT,
3637
message_type TEXT,
3738
is_system_message BOOLEAN,
38-
status TEXT,
39+
status NUMERIC,
3940
time_sent TIMESTAMPTZ,
4041
receiving_endpoint_name TEXT,
4142
critical_time INTERVAL,
@@ -48,7 +49,7 @@ query tsvector GENERATED ALWAYS AS (
4849
) STORED
4950
);", connection))
5051
{
51-
await cmd.ExecuteNonQueryAsync(stoppingToken);
52+
await cmd.ExecuteNonQueryAsync(cancellationToken);
5253
}
5354

5455
// Create saga_snapshots table
@@ -67,7 +68,7 @@ CREATE TABLE IF NOT EXISTS saga_snapshots (
6768
processed_at TIMESTAMPTZ
6869
);", connection))
6970
{
70-
await cmd.ExecuteNonQueryAsync(stoppingToken);
71+
await cmd.ExecuteNonQueryAsync(cancellationToken);
7172
}
7273

7374
// Create known_endpoints table
@@ -80,9 +81,14 @@ CREATE TABLE IF NOT EXISTS known_endpoints (
8081
last_seen TIMESTAMPTZ
8182
);", connection))
8283
{
83-
await cmd.ExecuteNonQueryAsync(stoppingToken);
84+
await cmd.ExecuteNonQueryAsync(cancellationToken);
8485
}
8586
}
87+
88+
public Task StopAsync(CancellationToken cancellationToken)
89+
{
90+
return Task.CompletedTask;
91+
}
8692
}
8793

8894

src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWork.cs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork;
66
using System.Threading.Tasks;
77
using Npgsql;
88
using ServiceControl.Audit.Auditing;
9+
using ServiceControl.Audit.Monitoring;
910
using ServiceControl.Audit.Persistence.Monitoring;
1011
using ServiceControl.Audit.Persistence.UnitOfWork;
1112
using ServiceControl.SagaAudit;
@@ -23,18 +24,14 @@ public PostgreSQLAuditIngestionUnitOfWork(NpgsqlConnection connection, NpgsqlTra
2324

2425
public async ValueTask DisposeAsync()
2526
{
27+
await transaction.CommitAsync();
2628
await transaction.DisposeAsync();
2729
await connection.DisposeAsync();
2830
}
2931

30-
public async Task CompleteAsync(CancellationToken cancellationToken)
32+
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
3133
{
32-
await transaction.CommitAsync(cancellationToken);
33-
}
34-
35-
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken = default)
36-
{
37-
object GetMetadata(string key) => processedMessage.MessageMetadata.TryGetValue(key, out var value) ? value ?? DBNull.Value : DBNull.Value;
34+
T GetMetadata<T>(string key) => processedMessage.MessageMetadata.TryGetValue(key, out var value) ? (T)value ?? default : default;
3835

3936
// Insert ProcessedMessage into processed_messages table
4037
var cmd = new NpgsqlCommand(@"
@@ -59,18 +56,19 @@ INSERT INTO processed_messages (
5956
cmd.Parameters.AddWithValue("body", DBNull.Value);
6057
}
6158
cmd.Parameters.AddWithValue("unique_message_id", processedMessage.UniqueMessageId ?? (object)DBNull.Value);
62-
cmd.Parameters.AddWithValue("message_metadata", JsonSerializer.Serialize(processedMessage.MessageMetadata));
63-
cmd.Parameters.AddWithValue("headers", JsonSerializer.Serialize(processedMessage.Headers));
59+
cmd.Parameters.AddWithValue("message_metadata", JsonSerializer.SerializeToDocument(processedMessage.MessageMetadata));
60+
cmd.Parameters.AddWithValue("headers", JsonSerializer.SerializeToDocument(processedMessage.Headers));
6461
cmd.Parameters.AddWithValue("processed_at", processedMessage.ProcessedAt);
65-
cmd.Parameters.AddWithValue("message_id", GetMetadata("MessageId"));
66-
cmd.Parameters.AddWithValue("message_type", GetMetadata("MessageType"));
67-
cmd.Parameters.AddWithValue("is_system_message", GetMetadata("IsSystemMessage"));
68-
cmd.Parameters.AddWithValue("time_sent", GetMetadata("TimeSent"));
69-
cmd.Parameters.AddWithValue("receiving_endpoint_name", GetMetadata("ReceivingEndpoint"));
70-
cmd.Parameters.AddWithValue("critical_time", GetMetadata("CriticalTime"));
71-
cmd.Parameters.AddWithValue("processing_time", GetMetadata("ProcessingTime"));
72-
cmd.Parameters.AddWithValue("delivery_time", GetMetadata("DeliveryTime"));
73-
cmd.Parameters.AddWithValue("conversation_id", GetMetadata("ConversationId"));
62+
cmd.Parameters.AddWithValue("message_id", GetMetadata<string>("MessageId"));
63+
cmd.Parameters.AddWithValue("message_type", GetMetadata<string>("MessageType"));
64+
cmd.Parameters.AddWithValue("is_system_message", GetMetadata<bool>("IsSystemMessage"));
65+
cmd.Parameters.AddWithValue("time_sent", GetMetadata<DateTime>("TimeSent"));
66+
cmd.Parameters.AddWithValue("receiving_endpoint_name", GetMetadata<EndpointDetails>("ReceivingEndpoint").Name);
67+
cmd.Parameters.AddWithValue("critical_time", GetMetadata<TimeSpan>("CriticalTime"));
68+
cmd.Parameters.AddWithValue("processing_time", GetMetadata<TimeSpan>("ProcessingTime"));
69+
cmd.Parameters.AddWithValue("delivery_time", GetMetadata<TimeSpan>("DeliveryTime"));
70+
cmd.Parameters.AddWithValue("conversation_id", GetMetadata<string>("ConversationId"));
71+
cmd.Parameters.AddWithValue("status", (int)(GetMetadata<bool>("IsRetried") ? MessageStatus.ResolvedSuccessfully : MessageStatus.Successful));
7472

7573
await cmd.ExecuteNonQueryAsync(cancellationToken);
7674
}

0 commit comments

Comments
 (0)