Skip to content

Commit 2e96879

Browse files
committed
Fix issue with body retrieval api
1 parent 5642bf3 commit 2e96879

File tree

4 files changed

+31
-24
lines changed

4 files changed

+31
-24
lines changed

src/Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
<PackageVersion Include="Microsoft.Extensions.Logging.Configuration" Version="8.0.1" />
2929
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.3" />
3030
<PackageVersion Include="Microsoft.Extensions.TimeProvider.Testing" Version="8.10.0" />
31+
<PackageVersion Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.1" />
3132
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
3233
<PackageVersion Include="Microsoft-WindowsAPICodePack-Shell" Version="1.1.5" />
3334
<PackageVersion Include="Mindscape.Raygun4Net.NetCore" Version="11.2.1" />

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLAuditDataStore.cs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL;
66
using System.Text.Json;
77
using System.Threading;
88
using System.Threading.Tasks;
9+
using Microsoft.IO;
910
using Npgsql;
1011
using NServiceBus;
1112
using ServiceControl.Audit.Auditing;
@@ -19,21 +20,25 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL;
1920

2021
class PostgreSQLAuditDataStore(PostgreSQLConnectionFactory connectionFactory) : IAuditDataStore
2122
{
23+
static readonly RecyclableMemoryStreamManager manager = new RecyclableMemoryStreamManager();
24+
2225
public async Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken)
2326
{
24-
using var conn = await connectionFactory.OpenConnection(cancellationToken);
25-
using var cmd = new NpgsqlCommand(@"
26-
select body, headers from processed_messages
27+
await using var conn = await connectionFactory.OpenConnection(cancellationToken);
28+
await using var cmd = new NpgsqlCommand(@"
29+
select headers, body from processed_messages
2730
where message_id = @message_id
2831
LIMIT 1;", conn);
2932
cmd.Parameters.AddWithValue("message_id", messageId);
30-
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
33+
await using var reader = await cmd.ExecuteReaderAsync(System.Data.CommandBehavior.SequentialAccess, cancellationToken);
3134
if (await reader.ReadAsync(cancellationToken))
3235
{
33-
//var stream = await reader.GetStreamAsync(reader.GetOrdinal("body"), cancellationToken);
34-
var stream = reader.GetStream(reader.GetOrdinal("body"));
3536
var contentType = reader.GetFieldValue<Dictionary<string, string>>(reader.GetOrdinal("headers")).GetValueOrDefault(Headers.ContentType, "text/xml");
36-
return MessageBodyView.FromStream(stream, contentType, (int)stream.Length, string.Empty);
37+
using var stream = await reader.GetStreamAsync(reader.GetOrdinal("body"), cancellationToken);
38+
var responseStream = manager.GetStream();
39+
await stream.CopyToAsync(responseStream, cancellationToken);
40+
responseStream.Position = 0;
41+
return MessageBodyView.FromStream(responseStream, contentType, (int)stream.Length, string.Empty);
3742
}
3843
return MessageBodyView.NotFound();
3944
}
@@ -52,8 +57,8 @@ public async Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpoi
5257
{
5358
var startDate = DateTime.UtcNow.AddDays(-30);
5459
var endDate = DateTime.UtcNow;
55-
using var connection = await connectionFactory.OpenConnection(cancellationToken);
56-
using var cmd = new NpgsqlCommand(@"
60+
await using var connection = await connectionFactory.OpenConnection(cancellationToken);
61+
await using var cmd = new NpgsqlCommand(@"
5762
SELECT
5863
DATE_TRUNC('day', processed_at) AS day,
5964
COUNT(*) AS count
@@ -66,7 +71,7 @@ GROUP BY day
6671
cmd.Parameters.AddWithValue("start_date", startDate);
6772
cmd.Parameters.AddWithValue("end_date", endDate);
6873

69-
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
74+
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
7075
var results = new List<AuditCount>();
7176
while (await reader.ReadAsync(cancellationToken))
7277
{
@@ -83,8 +88,8 @@ GROUP BY day
8388
public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken)
8489
{
8590
// We need to return all the data from known_endpoints table in postgress
86-
using var connection = await connectionFactory.OpenConnection(cancellationToken);
87-
using var cmd = new NpgsqlCommand(@"
91+
await using var connection = await connectionFactory.OpenConnection(cancellationToken);
92+
await using var cmd = new NpgsqlCommand(@"
8893
SELECT
8994
id,
9095
name,
@@ -93,7 +98,7 @@ public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(Ca
9398
last_seen
9499
FROM known_endpoints;", connection);
95100

96-
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
101+
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
97102
var results = new List<KnownEndpointsView>();
98103
while (await reader.ReadAsync(cancellationToken))
99104
{
@@ -164,9 +169,9 @@ async Task<QueryResult<IList<MessagesView>>> ExecuteMessagesQuery(
164169
PostgresqlMessagesQueryBuilder builder,
165170
CancellationToken cancellationToken)
166171
{
167-
using var conn = await connectionFactory.OpenConnection(cancellationToken);
172+
await using var conn = await connectionFactory.OpenConnection(cancellationToken);
168173
var (query, parameters) = builder.Build();
169-
using var cmd = new NpgsqlCommand(query, conn);
174+
await using var cmd = new NpgsqlCommand(query, conn);
170175
foreach (var param in parameters)
171176
{
172177
cmd.Parameters.Add(param);
@@ -193,7 +198,7 @@ static T GetValue<T>(NpgsqlDataReader reader, string column)
193198
async Task<QueryResult<IList<MessagesView>>> ReturnResults(NpgsqlCommand cmd, CancellationToken cancellationToken = default)
194199
{
195200
var results = new List<MessagesView>();
196-
using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
201+
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
197202
while (await reader.ReadAsync(cancellationToken))
198203
{
199204
var headers = GetValue<Dictionary<string, string>>(reader, "headers");

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceInstaller.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ class PostgreSQLPersistenceInstaller(DatabaseConfiguration databaseConfiguration
1010
{
1111
public async Task StartAsync(CancellationToken cancellationToken)
1212
{
13-
using var adminConnection = await connectionFactory.OpenAdminConnection(cancellationToken);
13+
await using var adminConnection = await connectionFactory.OpenAdminConnection(cancellationToken);
1414

15-
using (var cmd = new NpgsqlCommand($"SELECT 1 FROM pg_database WHERE datname = @dbname", adminConnection))
15+
await using (var cmd = new NpgsqlCommand($"SELECT 1 FROM pg_database WHERE datname = @dbname", adminConnection))
1616
{
1717
cmd.Parameters.AddWithValue("@dbname", databaseConfiguration.Name);
1818
var exists = await cmd.ExecuteScalarAsync(cancellationToken);
@@ -23,9 +23,9 @@ public async Task StartAsync(CancellationToken cancellationToken)
2323
}
2424
}
2525

26-
using var connection = await connectionFactory.OpenConnection(cancellationToken);
26+
await using var connection = await connectionFactory.OpenConnection(cancellationToken);
2727
// Create processed_messages table
28-
using (var cmd = new NpgsqlCommand(@"
28+
await using (var cmd = new NpgsqlCommand(@"
2929
CREATE TABLE IF NOT EXISTS processed_messages (
3030
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
3131
unique_message_id TEXT,
@@ -50,7 +50,7 @@ query tsvector
5050
}
5151

5252
// Create trigger for full text search
53-
using (var cmd = new NpgsqlCommand(@"
53+
await using (var cmd = new NpgsqlCommand(@"
5454
CREATE OR REPLACE FUNCTION processed_messages_tsvector_update() RETURNS trigger AS $$
5555
BEGIN
5656
NEW.query :=
@@ -68,7 +68,7 @@ BEFORE INSERT OR UPDATE ON processed_messages
6868
await cmd.ExecuteNonQueryAsync(cancellationToken);
6969
}
7070
// Create index on processed_messages for specified columns
71-
using (var cmd = new NpgsqlCommand(@"
71+
await using (var cmd = new NpgsqlCommand(@"
7272
CREATE INDEX IF NOT EXISTS idx_processed_messages_multi ON processed_messages (
7373
message_id,
7474
time_sent,
@@ -84,7 +84,7 @@ CREATE INDEX IF NOT EXISTS idx_processed_messages_multi ON processed_messages (
8484
}
8585

8686
// Create saga_snapshots table
87-
using (var cmd = new NpgsqlCommand(@"
87+
await using (var cmd = new NpgsqlCommand(@"
8888
CREATE TABLE IF NOT EXISTS saga_snapshots (
8989
id TEXT PRIMARY KEY,
9090
saga_id UUID,
@@ -103,7 +103,7 @@ processed_at TIMESTAMPTZ
103103
}
104104

105105
// Create known_endpoints table
106-
using (var cmd = new NpgsqlCommand(@"
106+
await using (var cmd = new NpgsqlCommand(@"
107107
CREATE TABLE IF NOT EXISTS known_endpoints (
108108
id TEXT PRIMARY KEY,
109109
name TEXT,

src/ServiceControl.Audit.Persistence.PostgreSQL/ServiceControl.Audit.Persistence.PostgreSQL.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
</ItemGroup>
2424

2525
<ItemGroup>
26+
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" />
2627
<PackageReference Include="Npgsql" />
2728
</ItemGroup>
2829

0 commit comments

Comments
 (0)