Skip to content

Commit 04c887f

Browse files
committed
Adding batch
1 parent 176fba3 commit 04c887f

File tree

2 files changed

+22
-26
lines changed

2 files changed

+22
-26
lines changed

src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWork.cs

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,27 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL.UnitOfWork;
1313

1414
class PostgreSQLAuditIngestionUnitOfWork : IAuditIngestionUnitOfWork
1515
{
16-
readonly NpgsqlConnection connection;
17-
readonly NpgsqlTransaction transaction;
16+
readonly NpgsqlBatch batch;
1817

19-
public PostgreSQLAuditIngestionUnitOfWork(NpgsqlConnection connection, NpgsqlTransaction transaction)
18+
public PostgreSQLAuditIngestionUnitOfWork(NpgsqlConnection connection)
2019
{
21-
this.connection = connection;
22-
this.transaction = transaction;
20+
batch = new NpgsqlBatch(connection);
2321
}
2422

2523
public async ValueTask DisposeAsync()
2624
{
27-
await transaction.CommitAsync();
28-
await transaction.DisposeAsync();
29-
await connection.DisposeAsync();
25+
await batch.PrepareAsync();
26+
await batch.ExecuteNonQueryAsync();
27+
await batch.DisposeAsync();
3028
}
3129

32-
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
30+
public Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
3331
{
3432
T GetMetadata<T>(string key) => processedMessage.MessageMetadata.TryGetValue(key, out var value) ? (T)value ?? default : default;
3533

3634
// Insert ProcessedMessage into processed_messages table
37-
var cmd = new NpgsqlCommand(@"
35+
var cmd = batch.CreateBatchCommand();
36+
cmd.CommandText = @"
3837
INSERT INTO processed_messages (
3938
unique_message_id, message_metadata, headers, processed_at, body,
4039
message_id, message_type, is_system_message, status, time_sent, receiving_endpoint_name,
@@ -43,8 +42,7 @@ INSERT INTO processed_messages (
4342
@unique_message_id, @message_metadata, @headers, @processed_at, @body,
4443
@message_id, @message_type, @is_system_message, @status, @time_sent, @receiving_endpoint_name,
4544
@critical_time, @processing_time, @delivery_time, @conversation_id
46-
)
47-
;", connection, transaction);
45+
);";
4846

4947
processedMessage.MessageMetadata["ContentLength"] = body.Length;
5048
if (!body.IsEmpty)
@@ -70,20 +68,20 @@ INSERT INTO processed_messages (
7068
cmd.Parameters.AddWithValue("conversation_id", GetMetadata<string>("ConversationId"));
7169
cmd.Parameters.AddWithValue("status", (int)(GetMetadata<bool>("IsRetried") ? MessageStatus.ResolvedSuccessfully : MessageStatus.Successful));
7270

73-
await cmd.PrepareAsync(cancellationToken);
74-
await cmd.ExecuteNonQueryAsync(cancellationToken);
71+
return Task.CompletedTask;
7572
}
7673

77-
public async Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken)
74+
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken)
7875
{
7976
// Insert SagaSnapshot into saga_snapshots table
80-
var cmd = new NpgsqlCommand(@"
77+
var cmd = batch.CreateBatchCommand();
78+
cmd.CommandText = @"
8179
INSERT INTO saga_snapshots (
8280
id, saga_id, saga_type, start_time, finish_time, status, state_after_change, initiating_message, outgoing_messages, endpoint, processed_at
8381
) VALUES (
8482
@id, @saga_id, @saga_type, @start_time, @finish_time, @status, @state_after_change, @initiating_message, @outgoing_messages, @endpoint, @processed_at
8583
)
86-
ON CONFLICT (id) DO NOTHING;", connection, transaction);
84+
ON CONFLICT (id) DO NOTHING;";
8785

8886
cmd.Parameters.AddWithValue("id", sagaSnapshot.Id ?? (object)DBNull.Value);
8987
cmd.Parameters.AddWithValue("saga_id", sagaSnapshot.SagaId);
@@ -97,28 +95,27 @@ INSERT INTO saga_snapshots (
9795
cmd.Parameters.AddWithValue("endpoint", sagaSnapshot.Endpoint ?? (object)DBNull.Value);
9896
cmd.Parameters.AddWithValue("processed_at", sagaSnapshot.ProcessedAt);
9997

100-
await cmd.PrepareAsync(cancellationToken);
101-
await cmd.ExecuteNonQueryAsync(cancellationToken);
98+
return Task.CompletedTask;
10299
}
103100

104-
public async Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
101+
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
105102
{
106103
// Insert KnownEndpoint into known_endpoints table
107-
var cmd = new NpgsqlCommand(@"
104+
var cmd = batch.CreateBatchCommand();
105+
cmd.CommandText = @"
108106
INSERT INTO known_endpoints (
109107
id, name, host_id, host, last_seen
110108
) VALUES (
111109
@id, @name, @host_id, @host, @last_seen
112110
)
113-
ON CONFLICT (id) DO NOTHING;", connection, transaction);
111+
ON CONFLICT (id) DO NOTHING;";
114112

115113
cmd.Parameters.AddWithValue("id", knownEndpoint.Id ?? (object)DBNull.Value);
116114
cmd.Parameters.AddWithValue("name", knownEndpoint.Name ?? (object)DBNull.Value);
117115
cmd.Parameters.AddWithValue("host_id", knownEndpoint.HostId);
118116
cmd.Parameters.AddWithValue("host", knownEndpoint.Host ?? (object)DBNull.Value);
119117
cmd.Parameters.AddWithValue("last_seen", knownEndpoint.LastSeen);
120118

121-
await cmd.PrepareAsync(cancellationToken);
122-
await cmd.ExecuteNonQueryAsync(cancellationToken);
119+
return Task.CompletedTask;
123120
}
124121
}

src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWorkFactory.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ public PostgreSQLAuditIngestionUnitOfWorkFactory(PostgreSQLConnectionFactory con
1616
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
1717
{
1818
var connection = await connectionFactory.OpenConnection(cancellationToken);
19-
var transaction = await connection.BeginTransactionAsync(cancellationToken);
20-
return new PostgreSQLAuditIngestionUnitOfWork(connection, transaction);
19+
return new PostgreSQLAuditIngestionUnitOfWork(connection);
2120
}
2221

2322
public bool CanIngestMore() => true;

0 commit comments

Comments
 (0)