Skip to content

Commit 684d265

Browse files
committed
Adding it back
1 parent 5f091b1 commit 684d265

11 files changed

+391
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL.BodyStorage
2+
{
3+
using System.IO;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using ServiceControl.Audit.Auditing.BodyStorage;
7+
8+
public class PostgreSQLAttachmentsBodyStorage : IBodyStorage
9+
{
10+
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken) => throw new System.NotImplementedException();
11+
public Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken) => throw new System.NotImplementedException();
12+
}
13+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using ServiceControl.Audit.Auditing;
8+
using ServiceControl.Audit.Auditing.MessagesView;
9+
using ServiceControl.Audit.Infrastructure;
10+
using ServiceControl.Audit.Monitoring;
11+
using ServiceControl.Audit.Persistence;
12+
using ServiceControl.SagaAudit;
13+
14+
public class PostgreSQLAuditDataStore : IAuditDataStore
15+
{
16+
public Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken) => throw new NotImplementedException();
17+
public Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
18+
public Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken) => throw new NotImplementedException();
19+
public Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken) => throw new NotImplementedException();
20+
public Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
21+
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) => throw new NotImplementedException();
22+
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
23+
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
24+
public Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken) => throw new NotImplementedException();
25+
}
26+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL
2+
{
3+
using Npgsql;
4+
using System.Threading.Tasks;
5+
using System.Threading;
6+
7+
public class PostgreSQLConnectionFactory
8+
{
9+
readonly string connectionString;
10+
11+
public PostgreSQLConnectionFactory(string connectionString)
12+
{
13+
this.connectionString = connectionString;
14+
}
15+
16+
public NpgsqlConnection CreateConnection()
17+
{
18+
return new NpgsqlConnection(connectionString);
19+
}
20+
21+
public async Task<NpgsqlConnection> OpenConnectionAsync(CancellationToken cancellationToken = default)
22+
{
23+
var conn = new NpgsqlConnection(connectionString);
24+
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
25+
return conn;
26+
}
27+
}
28+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL
2+
{
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using ServiceControl.Audit.Auditing;
7+
using ServiceControl.Audit.Persistence;
8+
9+
public class PostgreSQLFailedAuditStorage : IFailedAuditStorage
10+
{
11+
public Task<int> GetFailedAuditsCount() => throw new NotImplementedException();
12+
public Task ProcessFailedMessages(Func<FailedTransportMessage, Func<CancellationToken, Task>, CancellationToken, Task> onMessage, CancellationToken cancellationToken) => throw new NotImplementedException();
13+
public Task SaveFailedAuditImport(FailedAuditImport message) => throw new NotImplementedException();
14+
}
15+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL
2+
{
3+
using Microsoft.Extensions.DependencyInjection;
4+
using ServiceControl.Audit.Auditing.BodyStorage;
5+
using ServiceControl.Audit.Persistence;
6+
using ServiceControl.Audit.Persistence.PostgreSQL.BodyStorage;
7+
using ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork;
8+
using ServiceControl.Audit.Persistence.UnitOfWork;
9+
10+
public class PostgreSQLPersistence : IPersistence
11+
{
12+
public void AddInstaller(IServiceCollection services)
13+
{
14+
AddPersistence(services);
15+
}
16+
17+
public void AddPersistence(IServiceCollection services)
18+
{
19+
services.AddSingleton<IAuditDataStore, PostgreSQLAuditDataStore>();
20+
services.AddSingleton<IAuditIngestionUnitOfWorkFactory, PostgreSQLAuditIngestionUnitOfWorkFactory>();
21+
services.AddSingleton<IFailedAuditStorage, PostgreSQLFailedAuditStorage>();
22+
services.AddSingleton<IBodyStorage, PostgreSQLAttachmentsBodyStorage>();
23+
services.AddSingleton<PostgreSQLConnectionFactory>();
24+
}
25+
}
26+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL
2+
{
3+
using System.Collections.Generic;
4+
using Npgsql;
5+
using ServiceControl.Audit.Persistence;
6+
7+
public class PostgreSQLPersistenceConfiguration : IPersistenceConfiguration
8+
{
9+
public string Name => "PostgreSQL";
10+
11+
public IEnumerable<string> ConfigurationKeys => new[] { "PostgreSqlConnectionString" };
12+
13+
public IPersistence Create(PersistenceSettings settings)
14+
{
15+
settings.PersisterSpecificSettings.TryGetValue("PostgreSqlConnectionString", out var connectionString);
16+
using var connection = new NpgsqlConnection(connectionString);
17+
connection.Open();
18+
19+
// Create processed_messages table
20+
using (var cmd = new NpgsqlCommand(@"
21+
CREATE TABLE IF NOT EXISTS processed_messages (
22+
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
23+
unique_message_id TEXT,
24+
message_metadata JSONB,
25+
headers JSONB,
26+
processed_at TIMESTAMPTZ,
27+
body BYTEA,
28+
message_id TEXT,
29+
message_type TEXT,
30+
is_system_message BOOLEAN,
31+
status TEXT,
32+
time_sent TIMESTAMPTZ,
33+
receiving_endpoint_name TEXT,
34+
critical_time INTERVAL,
35+
processing_time INTERVAL,
36+
delivery_time INTERVAL,
37+
conversation_id TEXT,
38+
query tsvector GENERATED ALWAYS AS (
39+
setweight(to_tsvector('english', coalesce(headers::text, '')), 'A') ||
40+
setweight(to_tsvector('english', coalesce(body::text, '')), 'B')
41+
) STORED
42+
);", connection))
43+
{
44+
cmd.ExecuteNonQuery();
45+
}
46+
47+
// Create saga_snapshots table
48+
using (var cmd = new NpgsqlCommand(@"
49+
CREATE TABLE IF NOT EXISTS saga_snapshots (
50+
id TEXT PRIMARY KEY,
51+
saga_id UUID,
52+
saga_type TEXT,
53+
start_time TIMESTAMPTZ,
54+
finish_time TIMESTAMPTZ,
55+
status TEXT,
56+
state_after_change TEXT,
57+
initiating_message JSONB,
58+
outgoing_messages JSONB,
59+
endpoint TEXT,
60+
processed_at TIMESTAMPTZ
61+
);", connection))
62+
{
63+
cmd.ExecuteNonQuery();
64+
}
65+
66+
// Create known_endpoints table
67+
using (var cmd = new NpgsqlCommand(@"
68+
CREATE TABLE IF NOT EXISTS known_endpoints (
69+
id TEXT PRIMARY KEY,
70+
name TEXT,
71+
host_id UUID,
72+
host TEXT,
73+
last_seen TIMESTAMPTZ
74+
);", connection))
75+
{
76+
cmd.ExecuteNonQuery();
77+
}
78+
79+
return new PostgreSQLPersistence();
80+
}
81+
}
82+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
6+
<DisableTransitiveProjectReferences>true</DisableTransitiveProjectReferences>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<!-- Private=false & ExcludeAssets=runtime prevent repeatedly including binary dependencies of ServiceControl.Audit.Persistence and its depenencies in each persister directory -->
11+
<ProjectReference Include="..\ServiceControl.Audit.Persistence\ServiceControl.Audit.Persistence.csproj" Private="false" ExcludeAssets="runtime" />
12+
<ProjectReference Include="..\ServiceControl.Audit.Persistence.SagaAudit\ServiceControl.Audit.Persistence.SagaAudit.csproj" Private="false" ExcludeAssets="runtime" />
13+
</ItemGroup>
14+
15+
<ItemGroup>
16+
<None Update="persistence.manifest" CopyToOutputDirectory="PreserveNewest" />
17+
</ItemGroup>
18+
19+
<ItemGroup>
20+
<!-- Artifact does not include RavenDBServer directory. Primary instance is responsible for copying that to deploy directory. -->
21+
<Artifact Include="$(OutputPath)" DestinationFolder="$(ArtifactsPath)Particular.ServiceControl.Audit\Persisters\PostgreSQL" />
22+
</ItemGroup>
23+
24+
<ItemGroup>
25+
<PackageReference Include="Npgsql" />
26+
</ItemGroup>
27+
28+
</Project>
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork
2+
{
3+
using System;
4+
using System.Text.Json;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Npgsql;
8+
using ServiceControl.Audit.Auditing;
9+
using ServiceControl.Audit.Persistence.Monitoring;
10+
using ServiceControl.Audit.Persistence.UnitOfWork;
11+
using ServiceControl.SagaAudit;
12+
13+
public class PostgreSQLAuditIngestionUnitOfWork : IAuditIngestionUnitOfWork
14+
{
15+
readonly NpgsqlConnection connection;
16+
readonly NpgsqlTransaction transaction;
17+
18+
public PostgreSQLAuditIngestionUnitOfWork(NpgsqlConnection connection, NpgsqlTransaction transaction)
19+
{
20+
this.connection = connection;
21+
this.transaction = transaction;
22+
}
23+
24+
public async ValueTask DisposeAsync()
25+
{
26+
await transaction.DisposeAsync().ConfigureAwait(false);
27+
await connection.DisposeAsync().ConfigureAwait(false);
28+
}
29+
30+
public async Task CompleteAsync(CancellationToken cancellationToken = default)
31+
{
32+
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
33+
}
34+
35+
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken = default)
36+
{
37+
object GetMetadata(string key) => processedMessage.MessageMetadata.TryGetValue(key, out var value) ? value ?? DBNull.Value : DBNull.Value;
38+
39+
// Insert ProcessedMessage into processed_messages table
40+
var cmd = new NpgsqlCommand(@"
41+
INSERT INTO processed_messages (
42+
unique_message_id, message_metadata, headers, processed_at, body,
43+
message_id, message_type, is_system_message, status, time_sent, receiving_endpoint_name,
44+
critical_time, processing_time, delivery_time, conversation_id
45+
) VALUES (
46+
@unique_message_id, @message_metadata, @headers, @processed_at, @body,
47+
@message_id, @message_type, @is_system_message, @status, @time_sent, @receiving_endpoint_name,
48+
@critical_time, @processing_time, @delivery_time, @conversation_id
49+
)
50+
;", connection, transaction);
51+
52+
processedMessage.MessageMetadata["ContentLength"] = body.Length;
53+
if (!body.IsEmpty)
54+
{
55+
cmd.Parameters.AddWithValue("body", body);
56+
}
57+
else
58+
{
59+
cmd.Parameters.AddWithValue("body", DBNull.Value);
60+
}
61+
cmd.Parameters.AddWithValue("unique_message_id", processedMessage.UniqueMessageId ?? (object)DBNull.Value);
62+
cmd.Parameters.AddWithValue("message_metadata", JsonSerializer.Serialize(processedMessage.MessageMetadata));
63+
cmd.Parameters.AddWithValue("headers", JsonSerializer.Serialize(processedMessage.Headers));
64+
cmd.Parameters.AddWithValue("processed_at", processedMessage.ProcessedAt);
65+
cmd.Parameters.AddWithValue("message_id", GetMetadata("MessageId"));
66+
cmd.Parameters.AddWithValue("message_type", GetMetadata("MessageType"));
67+
cmd.Parameters.AddWithValue("is_system_message", GetMetadata("IsSystemMessage"));
68+
cmd.Parameters.AddWithValue("time_sent", GetMetadata("TimeSent"));
69+
cmd.Parameters.AddWithValue("receiving_endpoint_name", GetMetadata("ReceivingEndpoint"));
70+
cmd.Parameters.AddWithValue("critical_time", GetMetadata("CriticalTime"));
71+
cmd.Parameters.AddWithValue("processing_time", GetMetadata("ProcessingTime"));
72+
cmd.Parameters.AddWithValue("delivery_time", GetMetadata("DeliveryTime"));
73+
cmd.Parameters.AddWithValue("conversation_id", GetMetadata("ConversationId"));
74+
75+
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
76+
}
77+
78+
public async Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken = default)
79+
{
80+
// Insert SagaSnapshot into saga_snapshots table
81+
var cmd = new NpgsqlCommand(@"
82+
INSERT INTO saga_snapshots (
83+
id, saga_id, saga_type, start_time, finish_time, status, state_after_change, initiating_message, outgoing_messages, endpoint, processed_at
84+
) VALUES (
85+
@id, @saga_id, @saga_type, @start_time, @finish_time, @status, @state_after_change, @initiating_message, @outgoing_messages, @endpoint, @processed_at
86+
)
87+
ON CONFLICT (id) DO NOTHING;", connection, transaction);
88+
89+
cmd.Parameters.AddWithValue("id", sagaSnapshot.Id ?? (object)DBNull.Value);
90+
cmd.Parameters.AddWithValue("saga_id", sagaSnapshot.SagaId);
91+
cmd.Parameters.AddWithValue("saga_type", sagaSnapshot.SagaType ?? (object)DBNull.Value);
92+
cmd.Parameters.AddWithValue("start_time", sagaSnapshot.StartTime);
93+
cmd.Parameters.AddWithValue("finish_time", sagaSnapshot.FinishTime);
94+
cmd.Parameters.AddWithValue("status", sagaSnapshot.Status.ToString());
95+
cmd.Parameters.AddWithValue("state_after_change", sagaSnapshot.StateAfterChange ?? (object)DBNull.Value);
96+
cmd.Parameters.AddWithValue("initiating_message", JsonSerializer.Serialize(sagaSnapshot.InitiatingMessage));
97+
cmd.Parameters.AddWithValue("outgoing_messages", JsonSerializer.Serialize(sagaSnapshot.OutgoingMessages));
98+
cmd.Parameters.AddWithValue("endpoint", sagaSnapshot.Endpoint ?? (object)DBNull.Value);
99+
cmd.Parameters.AddWithValue("processed_at", sagaSnapshot.ProcessedAt);
100+
101+
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
102+
}
103+
104+
public async Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken = default)
105+
{
106+
// Insert KnownEndpoint into known_endpoints table
107+
var cmd = new NpgsqlCommand(@"
108+
INSERT INTO known_endpoints (
109+
id, name, host_id, host, last_seen
110+
) VALUES (
111+
@id, @name, @host_id, @host, @last_seen
112+
)
113+
ON CONFLICT (id) DO NOTHING;", connection, transaction);
114+
115+
cmd.Parameters.AddWithValue("id", knownEndpoint.Id ?? (object)DBNull.Value);
116+
cmd.Parameters.AddWithValue("name", knownEndpoint.Name ?? (object)DBNull.Value);
117+
cmd.Parameters.AddWithValue("host_id", knownEndpoint.HostId);
118+
cmd.Parameters.AddWithValue("host", knownEndpoint.Host ?? (object)DBNull.Value);
119+
cmd.Parameters.AddWithValue("last_seen", knownEndpoint.LastSeen);
120+
121+
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
122+
}
123+
}
124+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork
2+
{
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Npgsql;
6+
using ServiceControl.Audit.Persistence.UnitOfWork;
7+
using ServiceControl.Audit.Persistence.PostgreSQL;
8+
9+
public class PostgreSQLAuditIngestionUnitOfWorkFactory : IAuditIngestionUnitOfWorkFactory
10+
{
11+
readonly PostgreSQLConnectionFactory connectionFactory;
12+
13+
public PostgreSQLAuditIngestionUnitOfWorkFactory(PostgreSQLConnectionFactory connectionFactory)
14+
{
15+
this.connectionFactory = connectionFactory;
16+
}
17+
18+
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
19+
{
20+
var connection = await connectionFactory.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
21+
var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
22+
return new PostgreSQLAuditIngestionUnitOfWork(connection, transaction);
23+
}
24+
25+
public bool CanIngestMore() => true; // TODO: Implement logic based on storage state
26+
}
27+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"Name": "PostgreSQL",
3+
"DisplayName": "PostgreSQL",
4+
"Description": "PostgreSQL ServiceControl Audit persister",
5+
"AssemblyName": "ServiceControl.Audit.Persistence.PostgreSQL",
6+
"TypeName": "ServiceControl.Audit.Persistence.PostgreSQL.PostgreSQLPersistenceConfiguration, ServiceControl.Audit.Persistence.PostgreSQL"
7+
}

0 commit comments

Comments
 (0)