Skip to content

Commit 49b3dd1

Browse files
committed
Moving things around
1 parent 684d265 commit 49b3dd1

11 files changed

+139
-92
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[*.cs]
2+
3+
# Justification: ServiceControl app has no synchronization context
4+
dotnet_diagnostic.CA2007.severity = none

src/ServiceControl.Audit.Persistence.PostgreSQL/BodyStorage/PostgreSQLAttachmentsBodyStorage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL.BodyStorage
55
using System.Threading.Tasks;
66
using ServiceControl.Audit.Auditing.BodyStorage;
77

8-
public class PostgreSQLAttachmentsBodyStorage : IBodyStorage
8+
class PostgreSQLAttachmentsBodyStorage : IBodyStorage
99
{
1010
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken) => throw new System.NotImplementedException();
1111
public Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken) => throw new System.NotImplementedException();
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL
2+
{
3+
using System;
4+
5+
class DatabaseConfiguration(
6+
string databaseName,
7+
int expirationProcessTimerInSeconds,
8+
TimeSpan auditRetentionPeriod,
9+
int maxBodySizeToStore,
10+
string connectionString)
11+
{
12+
public string Name { get; } = databaseName;
13+
14+
public int ExpirationProcessTimerInSeconds { get; } = expirationProcessTimerInSeconds;
15+
16+
public TimeSpan AuditRetentionPeriod { get; } = auditRetentionPeriod;
17+
18+
public int MaxBodySizeToStore { get; } = maxBodySizeToStore;
19+
public string ConnectionString { get; } = connectionString;
20+
}
21+
}

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLAuditDataStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL
1111
using ServiceControl.Audit.Persistence;
1212
using ServiceControl.SagaAudit;
1313

14-
public class PostgreSQLAuditDataStore : IAuditDataStore
14+
class PostgreSQLAuditDataStore : IAuditDataStore
1515
{
1616
public Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken) => throw new NotImplementedException();
1717
public Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLConnectionFactory.cs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,16 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL
44
using System.Threading.Tasks;
55
using System.Threading;
66

7-
public class PostgreSQLConnectionFactory
7+
class PostgreSQLConnectionFactory
88
{
99
readonly string connectionString;
1010

11-
public PostgreSQLConnectionFactory(string connectionString)
12-
{
13-
this.connectionString = connectionString;
14-
}
15-
16-
public NpgsqlConnection CreateConnection()
17-
{
18-
return new NpgsqlConnection(connectionString);
19-
}
11+
public PostgreSQLConnectionFactory(DatabaseConfiguration databaseConfiguration) => connectionString = databaseConfiguration.ConnectionString;
2012

21-
public async Task<NpgsqlConnection> OpenConnectionAsync(CancellationToken cancellationToken = default)
13+
public async Task<NpgsqlConnection> OpenConnection(CancellationToken cancellationToken)
2214
{
2315
var conn = new NpgsqlConnection(connectionString);
24-
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
16+
await conn.OpenAsync(cancellationToken);
2517
return conn;
2618
}
2719
}

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLFailedAuditStorage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL
66
using ServiceControl.Audit.Auditing;
77
using ServiceControl.Audit.Persistence;
88

9-
public class PostgreSQLFailedAuditStorage : IFailedAuditStorage
9+
class PostgreSQLFailedAuditStorage : IFailedAuditStorage
1010
{
1111
public Task<int> GetFailedAuditsCount() => throw new NotImplementedException();
1212
public Task ProcessFailedMessages(Func<FailedTransportMessage, Func<CancellationToken, Task>, CancellationToken, Task> onMessage, CancellationToken cancellationToken) => throw new NotImplementedException();

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistence.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,18 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL
77
using ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork;
88
using ServiceControl.Audit.Persistence.UnitOfWork;
99

10-
public class PostgreSQLPersistence : IPersistence
10+
class PostgreSQLPersistence(DatabaseConfiguration databaseConfiguration) : IPersistence
1111
{
1212
public void AddInstaller(IServiceCollection services)
1313
{
14-
AddPersistence(services);
14+
services.AddSingleton(databaseConfiguration);
15+
services.AddSingleton<PostgreSQLConnectionFactory>();
16+
services.AddHostedService<PostgreSQLPersistenceInstaller>();
1517
}
1618

1719
public void AddPersistence(IServiceCollection services)
1820
{
21+
services.AddSingleton(databaseConfiguration);
1922
services.AddSingleton<IAuditDataStore, PostgreSQLAuditDataStore>();
2023
services.AddSingleton<IAuditIngestionUnitOfWorkFactory, PostgreSQLAuditIngestionUnitOfWorkFactory>();
2124
services.AddSingleton<IFailedAuditStorage, PostgreSQLFailedAuditStorage>();
Lines changed: 14 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,35 @@
11
namespace ServiceControl.Audit.Persistence.PostgreSQL
22
{
3+
using System;
34
using System.Collections.Generic;
4-
using Npgsql;
55
using ServiceControl.Audit.Persistence;
66

77
public class PostgreSQLPersistenceConfiguration : IPersistenceConfiguration
88
{
99
public string Name => "PostgreSQL";
1010

11-
public IEnumerable<string> ConfigurationKeys => new[] { "PostgreSqlConnectionString" };
11+
public IEnumerable<string> ConfigurationKeys => ["PostgreSql/ConnectionString", "PostgreSql/DatabaseName"];
12+
13+
const int ExpirationProcessTimerInSecondsDefault = 600;
1214

1315
public IPersistence Create(PersistenceSettings settings)
1416
{
15-
settings.PersisterSpecificSettings.TryGetValue("PostgreSqlConnectionString", out var connectionString);
16-
using var connection = new NpgsqlConnection(connectionString);
17-
connection.Open();
18-
19-
// Create processed_messages table
20-
using (var cmd = new NpgsqlCommand(@"
21-
CREATE TABLE IF NOT EXISTS processed_messages (
22-
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
23-
unique_message_id TEXT,
24-
message_metadata JSONB,
25-
headers JSONB,
26-
processed_at TIMESTAMPTZ,
27-
body BYTEA,
28-
message_id TEXT,
29-
message_type TEXT,
30-
is_system_message BOOLEAN,
31-
status TEXT,
32-
time_sent TIMESTAMPTZ,
33-
receiving_endpoint_name TEXT,
34-
critical_time INTERVAL,
35-
processing_time INTERVAL,
36-
delivery_time INTERVAL,
37-
conversation_id TEXT,
38-
query tsvector GENERATED ALWAYS AS (
39-
setweight(to_tsvector('english', coalesce(headers::text, '')), 'A') ||
40-
setweight(to_tsvector('english', coalesce(body::text, '')), 'B')
41-
) STORED
42-
);", connection))
43-
{
44-
cmd.ExecuteNonQuery();
45-
}
46-
47-
// Create saga_snapshots table
48-
using (var cmd = new NpgsqlCommand(@"
49-
CREATE TABLE IF NOT EXISTS saga_snapshots (
50-
id TEXT PRIMARY KEY,
51-
saga_id UUID,
52-
saga_type TEXT,
53-
start_time TIMESTAMPTZ,
54-
finish_time TIMESTAMPTZ,
55-
status TEXT,
56-
state_after_change TEXT,
57-
initiating_message JSONB,
58-
outgoing_messages JSONB,
59-
endpoint TEXT,
60-
processed_at TIMESTAMPTZ
61-
);", connection))
17+
if (!settings.PersisterSpecificSettings.TryGetValue("PostgreSql/ConnectionString", out var connectionString))
6218
{
63-
cmd.ExecuteNonQuery();
19+
throw new Exception("PostgreSql/ConnectionString is not configured.");
6420
}
6521

66-
// Create known_endpoints table
67-
using (var cmd = new NpgsqlCommand(@"
68-
CREATE TABLE IF NOT EXISTS known_endpoints (
69-
id TEXT PRIMARY KEY,
70-
name TEXT,
71-
host_id UUID,
72-
host TEXT,
73-
last_seen TIMESTAMPTZ
74-
);", connection))
22+
if (!settings.PersisterSpecificSettings.TryGetValue("PostgreSql/DatabaseName", out var databaseName))
7523
{
76-
cmd.ExecuteNonQuery();
24+
databaseName = "servicecontrol-audit";
7725
}
7826

79-
return new PostgreSQLPersistence();
27+
return new PostgreSQLPersistence(new DatabaseConfiguration(
28+
databaseName,
29+
ExpirationProcessTimerInSecondsDefault,
30+
settings.AuditRetentionPeriod,
31+
settings.MaxBodySizeToStore,
32+
connectionString));
8033
}
8134
}
8235
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL
2+
{
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Hosting;
6+
using Npgsql;
7+
8+
class PostgreSQLPersistenceInstaller(PostgreSQLConnectionFactory connectionFactory) : BackgroundService
9+
{
10+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
11+
{
12+
using var connection = await connectionFactory.OpenConnection(stoppingToken);
13+
14+
// Create processed_messages table
15+
using (var cmd = new NpgsqlCommand(@"
16+
CREATE TABLE IF NOT EXISTS processed_messages (
17+
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
18+
unique_message_id TEXT,
19+
message_metadata JSONB,
20+
headers JSONB,
21+
processed_at TIMESTAMPTZ,
22+
body BYTEA,
23+
message_id TEXT,
24+
message_type TEXT,
25+
is_system_message BOOLEAN,
26+
status TEXT,
27+
time_sent TIMESTAMPTZ,
28+
receiving_endpoint_name TEXT,
29+
critical_time INTERVAL,
30+
processing_time INTERVAL,
31+
delivery_time INTERVAL,
32+
conversation_id TEXT,
33+
query tsvector GENERATED ALWAYS AS (
34+
setweight(to_tsvector('english', coalesce(headers::text, '')), 'A') ||
35+
setweight(to_tsvector('english', coalesce(body::text, '')), 'B')
36+
) STORED
37+
);", connection))
38+
{
39+
await cmd.ExecuteNonQueryAsync(stoppingToken);
40+
}
41+
42+
// Create saga_snapshots table
43+
using (var cmd = new NpgsqlCommand(@"
44+
CREATE TABLE IF NOT EXISTS saga_snapshots (
45+
id TEXT PRIMARY KEY,
46+
saga_id UUID,
47+
saga_type TEXT,
48+
start_time TIMESTAMPTZ,
49+
finish_time TIMESTAMPTZ,
50+
status TEXT,
51+
state_after_change TEXT,
52+
initiating_message JSONB,
53+
outgoing_messages JSONB,
54+
endpoint TEXT,
55+
processed_at TIMESTAMPTZ
56+
);", connection))
57+
{
58+
await cmd.ExecuteNonQueryAsync(stoppingToken);
59+
}
60+
61+
// Create known_endpoints table
62+
using (var cmd = new NpgsqlCommand(@"
63+
CREATE TABLE IF NOT EXISTS known_endpoints (
64+
id TEXT PRIMARY KEY,
65+
name TEXT,
66+
host_id UUID,
67+
host TEXT,
68+
last_seen TIMESTAMPTZ
69+
);", connection))
70+
{
71+
await cmd.ExecuteNonQueryAsync(stoppingToken);
72+
}
73+
}
74+
}
75+
}

src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWork.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork
1010
using ServiceControl.Audit.Persistence.UnitOfWork;
1111
using ServiceControl.SagaAudit;
1212

13-
public class PostgreSQLAuditIngestionUnitOfWork : IAuditIngestionUnitOfWork
13+
class PostgreSQLAuditIngestionUnitOfWork : IAuditIngestionUnitOfWork
1414
{
1515
readonly NpgsqlConnection connection;
1616
readonly NpgsqlTransaction transaction;
@@ -23,13 +23,13 @@ public PostgreSQLAuditIngestionUnitOfWork(NpgsqlConnection connection, NpgsqlTra
2323

2424
public async ValueTask DisposeAsync()
2525
{
26-
await transaction.DisposeAsync().ConfigureAwait(false);
27-
await connection.DisposeAsync().ConfigureAwait(false);
26+
await transaction.DisposeAsync();
27+
await connection.DisposeAsync();
2828
}
2929

30-
public async Task CompleteAsync(CancellationToken cancellationToken = default)
30+
public async Task CompleteAsync(CancellationToken cancellationToken)
3131
{
32-
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
32+
await transaction.CommitAsync(cancellationToken);
3333
}
3434

3535
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken = default)
@@ -72,10 +72,10 @@ INSERT INTO processed_messages (
7272
cmd.Parameters.AddWithValue("delivery_time", GetMetadata("DeliveryTime"));
7373
cmd.Parameters.AddWithValue("conversation_id", GetMetadata("ConversationId"));
7474

75-
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
75+
await cmd.ExecuteNonQueryAsync(cancellationToken);
7676
}
7777

78-
public async Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken = default)
78+
public async Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken)
7979
{
8080
// Insert SagaSnapshot into saga_snapshots table
8181
var cmd = new NpgsqlCommand(@"
@@ -98,10 +98,10 @@ INSERT INTO saga_snapshots (
9898
cmd.Parameters.AddWithValue("endpoint", sagaSnapshot.Endpoint ?? (object)DBNull.Value);
9999
cmd.Parameters.AddWithValue("processed_at", sagaSnapshot.ProcessedAt);
100100

101-
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
101+
await cmd.ExecuteNonQueryAsync(cancellationToken);
102102
}
103103

104-
public async Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken = default)
104+
public async Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
105105
{
106106
// Insert KnownEndpoint into known_endpoints table
107107
var cmd = new NpgsqlCommand(@"
@@ -118,7 +118,7 @@ INSERT INTO known_endpoints (
118118
cmd.Parameters.AddWithValue("host", knownEndpoint.Host ?? (object)DBNull.Value);
119119
cmd.Parameters.AddWithValue("last_seen", knownEndpoint.LastSeen);
120120

121-
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
121+
await cmd.ExecuteNonQueryAsync(cancellationToken);
122122
}
123123
}
124124
}

0 commit comments

Comments
 (0)