Skip to content

Commit 99e8c61

Browse files
committed
Returning results
1 parent 7adb46d commit 99e8c61

File tree

4 files changed

+225
-9
lines changed

4 files changed

+225
-9
lines changed

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLAuditDataStore.cs

Lines changed: 215 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,232 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL;
22

33
using System;
44
using System.Collections.Generic;
5+
using System.Text;
6+
using System.Text.Json;
57
using System.Threading;
68
using System.Threading.Tasks;
9+
using Npgsql;
10+
using NServiceBus;
711
using ServiceControl.Audit.Auditing;
812
using ServiceControl.Audit.Auditing.MessagesView;
913
using ServiceControl.Audit.Infrastructure;
1014
using ServiceControl.Audit.Monitoring;
1115
using ServiceControl.Audit.Persistence;
1216
using ServiceControl.SagaAudit;
1317

14-
class PostgreSQLAuditDataStore : IAuditDataStore
18+
class PostgreSQLAuditDataStore(PostgreSQLConnectionFactory connectionFactory) : IAuditDataStore
1519
{
16-
public Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken) => throw new NotImplementedException();
17-
public Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
20+
public async Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken)
21+
{
22+
using var conn = await connectionFactory.OpenConnection(cancellationToken);
23+
24+
using var cmd = new NpgsqlCommand(@"
25+
select body, headers from processed_messages
26+
where message_id = @message_id
27+
LIMIT 1;", conn);
28+
29+
cmd.Parameters.AddWithValue("message_id", messageId);
30+
31+
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
32+
if (await reader.ReadAsync(cancellationToken))
33+
{
34+
var stream = await reader.GetStreamAsync(reader.GetOrdinal("body"), cancellationToken);
35+
var contentType = reader.GetFieldValue<Dictionary<string, string>>(reader.GetOrdinal("headers")).GetValueOrDefault(Headers.ContentType, "text/xml");
36+
37+
return MessageBodyView.FromStream(stream, contentType, (int)stream.Length, string.Empty);
38+
}
39+
40+
return MessageBodyView.NotFound();
41+
}
42+
43+
async Task<QueryResult<IList<MessagesView>>> GetAllMessages(
44+
string? conversationId, bool? includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange,
45+
string? endpointName,
46+
string? q,
47+
CancellationToken cancellationToken)
48+
{
49+
using var conn = await connectionFactory.OpenConnection(cancellationToken);
50+
51+
var sql = new StringBuilder(@"select unique_message_id,
52+
message_metadata,
53+
headers,
54+
processed_at,
55+
message_id,
56+
message_type,
57+
is_system_message,
58+
status,
59+
time_sent,
60+
receiving_endpoint_name,
61+
critical_time,
62+
processing_time,
63+
delivery_time,
64+
conversation_id from processed_messages
65+
where 1 = 1");
66+
67+
if (includeSystemMessages.HasValue)
68+
{
69+
sql.Append(" and is_system_message = @is_system_message");
70+
}
71+
72+
if (!string.IsNullOrWhiteSpace(q))
73+
{
74+
sql.Append(" and query @@ to_tsquery('english', @search)");
75+
}
76+
77+
if (!string.IsNullOrWhiteSpace(conversationId))
78+
{
79+
sql.Append(" and conversation_id = @conversation_id");
80+
}
81+
82+
if (!string.IsNullOrWhiteSpace(endpointName))
83+
{
84+
sql.Append(" and receiving_endpoint_name = @endpoint_name");
85+
}
86+
87+
if (timeSentRange?.From != null)
88+
{
89+
sql.Append(" and time_sent >= @time_sent_start");
90+
}
91+
92+
if (timeSentRange?.To != null)
93+
{
94+
sql.Append(" and time_sent <= @time_sent_end");
95+
}
96+
97+
sql.Append(" ORDER BY");
98+
switch (sortInfo.Sort)
99+
{
100+
101+
case "id":
102+
case "message_id":
103+
sql.Append(" message_id");
104+
break;
105+
case "message_type":
106+
sql.Append(" message_type");
107+
break;
108+
case "critical_time":
109+
sql.Append(" critical_time");
110+
break;
111+
case "delivery_time":
112+
sql.Append(" delivery_time");
113+
break;
114+
case "processing_time":
115+
sql.Append(" processing_time");
116+
break;
117+
case "processed_at":
118+
sql.Append(" processed_at");
119+
break;
120+
case "status":
121+
sql.Append(" status");
122+
break;
123+
default:
124+
sql.Append(" time_sent");
125+
break;
126+
}
127+
128+
if (sortInfo.Direction == "asc")
129+
{
130+
sql.Append(" ASC");
131+
}
132+
else
133+
{
134+
sql.Append(" DESC");
135+
}
136+
137+
sql.Append($" LIMIT {pagingInfo.PageSize} OFFSET {pagingInfo.Offset};");
138+
139+
var query = sql.ToString();
140+
using var cmd = new NpgsqlCommand(query, conn);
141+
142+
if (!string.IsNullOrWhiteSpace(q))
143+
{
144+
cmd.Parameters.AddWithValue("search", q);
145+
}
146+
if (!string.IsNullOrWhiteSpace(endpointName))
147+
{
148+
cmd.Parameters.AddWithValue("endpoint_name", endpointName);
149+
}
150+
if (!string.IsNullOrWhiteSpace(conversationId))
151+
{
152+
cmd.Parameters.AddWithValue("conversation_id", conversationId);
153+
}
154+
if (includeSystemMessages.HasValue)
155+
{
156+
cmd.Parameters.AddWithValue("is_system_message", includeSystemMessages);
157+
}
158+
if (timeSentRange?.From != null)
159+
{
160+
cmd.Parameters.AddWithValue("time_sent_start", timeSentRange.From);
161+
}
162+
if (timeSentRange?.To != null)
163+
{
164+
cmd.Parameters.AddWithValue("time_sent_end", timeSentRange.To);
165+
}
166+
167+
return await ReturnResults(cmd, cancellationToken);
168+
}
169+
170+
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange, CancellationToken cancellationToken)
171+
{
172+
return await GetAllMessages(null, includeSystemMessages, pagingInfo, sortInfo, timeSentRange, null, null, cancellationToken);
173+
}
174+
18175
public Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken) => throw new NotImplementedException();
19176
public Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken) => throw new NotImplementedException();
20-
public Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
21-
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken) => throw new NotImplementedException();
22-
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
23-
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default) => throw new NotImplementedException();
177+
public Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default)
178+
{
179+
return GetAllMessages(null, null, pagingInfo, sortInfo, timeSentRange, searchParam, null, cancellationToken);
180+
}
181+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
182+
{
183+
return await GetAllMessages(conversationId, null, pagingInfo, sortInfo, null, null, null, cancellationToken);
184+
}
185+
186+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default)
187+
{
188+
return await GetAllMessages(null, includeSystemMessages, pagingInfo, sortInfo, timeSentRange, null, endpointName, cancellationToken);
189+
}
190+
191+
public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, DateTimeRange timeSentRange = null, CancellationToken cancellationToken = default)
192+
{
193+
return GetAllMessages(null, null, pagingInfo, sortInfo, timeSentRange, keyword, endpoint, cancellationToken);
194+
}
24195
public Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken) => throw new NotImplementedException();
196+
197+
async Task<QueryResult<IList<MessagesView>>> ReturnResults(NpgsqlCommand cmd, CancellationToken cancellationToken = default)
198+
{
199+
var results = new List<MessagesView>();
200+
201+
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
202+
while (await reader.ReadAsync(cancellationToken))
203+
{
204+
var headers = reader.GetFieldValue<Dictionary<string, string>>(reader.GetOrdinal("headers"));
205+
var messageMetadata = reader.GetFieldValue<Dictionary<string, object>>(reader.GetOrdinal("message_metadata"));
206+
207+
results.Add(new MessagesView
208+
{
209+
Id = reader.GetFieldValue<string>(reader.GetOrdinal("unique_message_id")),
210+
MessageId = reader.GetFieldValue<string>(reader.GetOrdinal("message_id")),
211+
MessageType = reader.GetFieldValue<string>(reader.GetOrdinal("message_type")),
212+
SendingEndpoint = JsonSerializer.Deserialize<EndpointDetails>((JsonElement)messageMetadata["SendingEndpoint"]),
213+
ReceivingEndpoint = JsonSerializer.Deserialize<EndpointDetails>((JsonElement)messageMetadata["ReceivingEndpoint"]),
214+
TimeSent = reader.GetFieldValue<DateTime>(reader.GetOrdinal("time_sent")),
215+
ProcessedAt = reader.GetFieldValue<DateTime>(reader.GetOrdinal("processed_at")),
216+
CriticalTime = reader.GetFieldValue<TimeSpan>(reader.GetOrdinal("critical_time")),
217+
ProcessingTime = reader.GetFieldValue<TimeSpan>(reader.GetOrdinal("processing_time")),
218+
DeliveryTime = reader.GetFieldValue<TimeSpan>(reader.GetOrdinal("delivery_time")),
219+
IsSystemMessage = reader.GetFieldValue<bool>(reader.GetOrdinal("is_system_message")),
220+
ConversationId = reader.GetFieldValue<string>(reader.GetOrdinal("conversation_id")),
221+
Headers = [.. headers],
222+
Status = (MessageStatus)reader.GetFieldValue<int>(reader.GetOrdinal("status")),
223+
MessageIntent = (MessageIntent)(messageMetadata.ContainsKey("MessageIntent") ? JsonSerializer.Deserialize<int>((JsonElement)messageMetadata["MessageIntent"]) : 1),
224+
BodyUrl = "",
225+
BodySize = messageMetadata.ContainsKey("ContentLength") ? JsonSerializer.Deserialize<int>((JsonElement)messageMetadata["ContentLength"]) : 0,
226+
InvokedSagas = messageMetadata.ContainsKey("InvokedSagas") ? JsonSerializer.Deserialize<List<SagaInfo>>((JsonElement)messageMetadata["InvokedSagas"]) : [],
227+
OriginatesFromSaga = messageMetadata.ContainsKey("OriginatesFromSaga") ? JsonSerializer.Deserialize<SagaInfo>((JsonElement)messageMetadata["OriginatesFromSaga"]) : null
228+
});
229+
}
230+
231+
return new QueryResult<IList<MessagesView>>(results, new QueryStatsInfo(string.Empty, results.Count));
232+
}
25233
}

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLConnectionFactory.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ class PostgreSQLConnectionFactory(DatabaseConfiguration databaseConfiguration)
77
{
88
public async Task<NpgsqlConnection> OpenConnection(CancellationToken cancellationToken)
99
{
10-
var conn = new NpgsqlConnection(databaseConfiguration.ConnectionString);
10+
var dataSourceBuilder = new NpgsqlDataSourceBuilder(databaseConfiguration.ConnectionString);
11+
dataSourceBuilder.EnableDynamicJson();
12+
var dataSource = dataSourceBuilder.Build();
13+
var conn = dataSource.CreateConnection();
1114
await conn.OpenAsync(cancellationToken);
1215
return conn;
1316
}
@@ -18,7 +21,10 @@ public async Task<NpgsqlConnection> OpenAdminConnection(CancellationToken cancel
1821
{
1922
Database = databaseConfiguration.AdminDatabaseName
2023
};
21-
var conn = new NpgsqlConnection(builder.ConnectionString);
24+
var dataSourceBuilder = new NpgsqlDataSourceBuilder(builder.ConnectionString);
25+
dataSourceBuilder.EnableDynamicJson();
26+
var dataSource = dataSourceBuilder.Build();
27+
var conn = dataSource.CreateConnection();
2228
await conn.OpenAsync(cancellationToken);
2329
return conn;
2430
}

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceInstaller.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ CREATE OR REPLACE FUNCTION processed_messages_tsvector_update() RETURNS trigger
6060
END
6161
$$ LANGUAGE plpgsql;
6262
63+
DROP TRIGGER IF EXISTS processed_messages_tsvector_trigger ON processed_messages;
6364
CREATE TRIGGER processed_messages_tsvector_trigger
6465
BEFORE INSERT OR UPDATE ON processed_messages
6566
FOR EACH ROW EXECUTE FUNCTION processed_messages_tsvector_update();", connection))

src/ServiceControl.Audit.Persistence.PostgreSQL/ServiceControl.Audit.Persistence.PostgreSQL.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
<TargetFramework>net8.0</TargetFramework>
55
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
66
<DisableTransitiveProjectReferences>true</DisableTransitiveProjectReferences>
7+
<Nullable>enable</Nullable>
78
</PropertyGroup>
89

910
<ItemGroup>

0 commit comments

Comments
 (0)