Skip to content

Commit ff982bd

Browse files
committed
Suggestion by feedback to pass CT's to RecordProcessedMessage, RecordSagaSnapshot and RecordKnownEndpoint
1 parent 825bc8f commit ff982bd

File tree

9 files changed

+40
-33
lines changed

9 files changed

+40
-33
lines changed

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Collections.Generic;
44
using System.IO;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using ServiceControl.Audit.Auditing.BodyStorage;
89

@@ -15,7 +16,7 @@ public InMemoryAttachmentsBodyStorage()
1516
messageBodies = [];
1617
}
1718

18-
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
19+
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
1920
{
2021
var messageBody = messageBodies.FirstOrDefault(w => w.BodyId == bodyId);
2122

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWork.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceControl.Audit.Persistence.InMemory
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Auditing.BodyStorage;
67
using ServiceControl.Audit.Auditing;
@@ -15,21 +16,21 @@ class InMemoryAuditIngestionUnitOfWork(
1516
{
1617
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
1718

18-
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint)
19+
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
1920
{
2021
dataStore.knownEndpoints.Add(knownEndpoint);
2122
return Task.CompletedTask;
2223
}
2324

24-
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body)
25+
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
2526
{
2627
if (!body.IsEmpty)
2728
{
28-
await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage);
29+
await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage, cancellationToken);
2930
}
3031
await dataStore.SaveProcessedMessage(processedMessage);
3132
}
3233

33-
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot) => dataStore.SaveSagaSnapshot(sagaSnapshot);
34+
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken) => dataStore.SaveSagaSnapshot(sagaSnapshot);
3435
}
3536
}

src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceControl.Audit.Persistence.RavenDB
22
{
33
using System.IO;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Auditing.BodyStorage;
67
using Raven.Client.Documents.BulkInsert;
@@ -11,15 +12,15 @@ class RavenAttachmentsBodyStorage(
1112
int settingsMaxBodySizeToStore)
1213
: IBodyStorage
1314
{
14-
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
15+
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
1516
{
1617
if (bodySize > settingsMaxBodySizeToStore)
1718
{
1819
return Task.CompletedTask;
1920
}
2021

2122
return bulkInsert.AttachmentsFor(bodyId)
22-
.StoreAsync("body", bodyStream, contentType);
23+
.StoreAsync("body", bodyStream, contentType, cancellationToken);
2324
}
2425

2526
public async Task<StreamResult> TryFetch(string bodyId)

src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditIngestionUnitOfWork.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class RavenAuditIngestionUnitOfWork(
2222
IBodyStorage bodyStorage)
2323
: IAuditIngestionUnitOfWork
2424
{
25-
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body)
25+
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
2626
{
2727
processedMessage.MessageMetadata["ContentLength"] = body.Length;
2828
if (!body.IsEmpty)
@@ -37,7 +37,7 @@ public async Task RecordProcessedMessage(ProcessedMessage processedMessage, Read
3737
await using var stream = new ReadOnlyStream(body);
3838
var contentType = processedMessage.Headers.GetValueOrDefault(Headers.ContentType, "text/xml");
3939

40-
await bodyStorage.Store(processedMessage.Id, contentType, body.Length, stream);
40+
await bodyStorage.Store(processedMessage.Id, contentType, body.Length, stream, cancellationToken);
4141
}
4242
}
4343

@@ -47,10 +47,10 @@ MetadataAsDictionary GetExpirationMetadata() =>
4747
[Constants.Documents.Metadata.Expires] = DateTime.UtcNow.Add(auditRetentionPeriod)
4848
};
4949

50-
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot)
50+
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken)
5151
=> bulkInsert.StoreAsync(sagaSnapshot, GetExpirationMetadata());
5252

53-
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint)
53+
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
5454
=> bulkInsert.StoreAsync(knownEndpoint, GetExpirationMetadata());
5555

5656
public async ValueTask DisposeAsync()

src/ServiceControl.Audit.Persistence/BodyStorageEnricher.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Text;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using NServiceBus;
89
using NServiceBus.Logging;
@@ -11,7 +12,7 @@
1112

1213
public class BodyStorageEnricher(IBodyStorage bodyStorage, PersistenceSettings settings)
1314
{
14-
public async ValueTask StoreAuditMessageBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage)
15+
public async ValueTask StoreAuditMessageBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage, CancellationToken cancellationToken)
1516
{
1617
var bodySize = body.Length;
1718
processedMessage.MessageMetadata.Add("ContentLength", bodySize);
@@ -23,7 +24,7 @@ public async ValueTask StoreAuditMessageBody(ReadOnlyMemory<byte> body, Processe
2324
var contentType = GetContentType(processedMessage.Headers, "text/xml");
2425
processedMessage.MessageMetadata.Add("ContentType", contentType);
2526

26-
var stored = await TryStoreBody(body, processedMessage, bodySize, contentType);
27+
var stored = await TryStoreBody(body, processedMessage, bodySize, contentType, cancellationToken);
2728
if (!stored)
2829
{
2930
processedMessage.MessageMetadata.Add("BodyNotStored", true);
@@ -33,7 +34,7 @@ public async ValueTask StoreAuditMessageBody(ReadOnlyMemory<byte> body, Processe
3334
static string GetContentType(IReadOnlyDictionary<string, string> headers, string defaultContentType)
3435
=> headers.GetValueOrDefault(Headers.ContentType, defaultContentType);
3536

36-
async ValueTask<bool> TryStoreBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage, int bodySize, string contentType)
37+
async ValueTask<bool> TryStoreBody(ReadOnlyMemory<byte> body, ProcessedMessage processedMessage, int bodySize, string contentType, CancellationToken cancellationToken)
3738
{
3839
var bodyId = MessageId(processedMessage.Headers);
3940
var bodyUrl = string.Format(BodyUrlFormatString, bodyId);
@@ -71,7 +72,7 @@ async ValueTask<bool> TryStoreBody(ReadOnlyMemory<byte> body, ProcessedMessage p
7172

7273
if (useBodyStore)
7374
{
74-
await StoreBodyInBodyStorage(body, bodyId, contentType, bodySize);
75+
await StoreBodyInBodyStorage(body, bodyId, contentType, bodySize, cancellationToken);
7576
storedInBodyStorage = true;
7677
}
7778
}
@@ -80,10 +81,10 @@ async ValueTask<bool> TryStoreBody(ReadOnlyMemory<byte> body, ProcessedMessage p
8081
return storedInBodyStorage;
8182
}
8283

83-
async Task StoreBodyInBodyStorage(ReadOnlyMemory<byte> body, string bodyId, string contentType, int bodySize)
84+
async Task StoreBodyInBodyStorage(ReadOnlyMemory<byte> body, string bodyId, string contentType, int bodySize, CancellationToken cancellationToken)
8485
{
8586
await using var bodyStream = new ReadOnlyStream(body);
86-
await bodyStorage.Store(bodyId, contentType, bodySize, bodyStream);
87+
await bodyStorage.Store(bodyId, contentType, bodySize, bodyStream, cancellationToken);
8788
}
8889

8990
static string MessageId(IReadOnlyDictionary<string, string> headers)

src/ServiceControl.Audit.Persistence/IBodyStorage.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
namespace ServiceControl.Audit.Auditing.BodyStorage
22
{
33
using System.IO;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
public interface IBodyStorage
78
{
8-
Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream);
9+
Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken);
910
Task<StreamResult> TryFetch(string bodyId);
1011
}
1112

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
namespace ServiceControl.Audit.Persistence.UnitOfWork
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Auditing;
67
using Monitoring;
78
using ServiceControl.SagaAudit;
89

910
public interface IAuditIngestionUnitOfWork : IAsyncDisposable
1011
{
11-
Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body = default);
12-
Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot);
13-
Task RecordKnownEndpoint(KnownEndpoint knownEndpoint);
12+
Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body = default, CancellationToken cancellationToken = default);
13+
Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken = default);
14+
Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken = default);
1415
}
1516
}

src/ServiceControl.Audit.UnitTests/BodyStorage/BodyStorageEnricherTests.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ namespace ServiceControl.UnitTests.BodyStorage
44
using System.Collections.Generic;
55
using System.IO;
66
using System.Text;
7+
using System.Threading;
78
using System.Threading.Tasks;
89
using Audit.Auditing;
910
using Audit.Auditing.BodyStorage;
@@ -33,7 +34,7 @@ public async Task Should_remove_body_when_above_threshold()
3334

3435
var message = new ProcessedMessage(headers, metadata);
3536

36-
await enricher.StoreAuditMessageBody(body, message);
37+
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);
3738

3839
Assert.Multiple(() =>
3940
{
@@ -63,7 +64,7 @@ public async Task Should_remove_body_when_above_threshold_and_binary()
6364

6465
var message = new ProcessedMessage(headers, metadata);
6566

66-
await enricher.StoreAuditMessageBody(body, message);
67+
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);
6768

6869
Assert.Multiple(() =>
6970
{
@@ -95,7 +96,7 @@ public async Task Should_store_body_in_metadata_when_below_large_object_heap_and
9596

9697
var message = new ProcessedMessage(headers, metadata);
9798

98-
await enricher.StoreAuditMessageBody(body, message);
99+
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);
99100

100101
Assert.Multiple(() =>
101102
{
@@ -127,7 +128,7 @@ public async Task Should_store_body_in_body_property_when_full_text_disabled_and
127128

128129
var message = new ProcessedMessage(headers, metadata);
129130

130-
await enricher.StoreAuditMessageBody(body, message);
131+
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);
131132

132133
Assert.Multiple(() =>
133134
{
@@ -159,7 +160,7 @@ public async Task Should_store_body_in_storage_when_above_large_object_heap_but_
159160

160161
var message = new ProcessedMessage(headers, metadata);
161162

162-
await enricher.StoreAuditMessageBody(body, message);
163+
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);
163164

164165
Assert.Multiple(() =>
165166
{
@@ -190,7 +191,7 @@ public async Task Should_store_body_in_storage_when_below_threshold_and_binary()
190191

191192
var message = new ProcessedMessage(headers, metadata);
192193

193-
await enricher.StoreAuditMessageBody(body, message);
194+
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);
194195

195196
Assert.Multiple(() =>
196197
{
@@ -221,7 +222,7 @@ public async Task Should_store_body_in_storage_when_below_threshold()
221222

222223
var message = new ProcessedMessage(headers, metadata);
223224

224-
await enricher.StoreAuditMessageBody(body, message);
225+
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);
225226

226227
Assert.Multiple(() =>
227228
{
@@ -251,7 +252,7 @@ public async Task Should_store_body_in_storage_when_encoding_fails()
251252

252253
var message = new ProcessedMessage(headers, metadata);
253254

254-
await enricher.StoreAuditMessageBody(body, message);
255+
await enricher.StoreAuditMessageBody(body, message, TestContext.CurrentContext.CancellationToken);
255256

256257
Assert.That(fakeStorage.StoredBodySize, Is.GreaterThan(0));
257258
}
@@ -260,7 +261,7 @@ class FakeBodyStorage : IBodyStorage
260261
{
261262
public int StoredBodySize { get; set; }
262263

263-
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
264+
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
264265
{
265266
StoredBodySize = bodySize;
266267
return Task.CompletedTask;

src/ServiceControl.Audit/Auditing/AuditPersister.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
6060
RecordKnownEndpoints(receivingEndpoint, knownEndpoints, processedMessage);
6161
}
6262

63-
await unitOfWork.RecordProcessedMessage(processedMessage, context.Body);
63+
await unitOfWork.RecordProcessedMessage(processedMessage, context.Body, cancellationToken);
6464
}
6565
else if (context.Extensions.TryGet(out SagaSnapshot sagaSnapshot))
6666
{
67-
await unitOfWork.RecordSagaSnapshot(sagaSnapshot);
67+
await unitOfWork.RecordSagaSnapshot(sagaSnapshot, cancellationToken);
6868
}
6969

7070
storedContexts.Add(context);
@@ -77,7 +77,7 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
7777
Logger.Debug($"Adding known endpoint '{endpoint.Name}' for bulk storage");
7878
}
7979

80-
await unitOfWork.RecordKnownEndpoint(endpoint);
80+
await unitOfWork.RecordKnownEndpoint(endpoint, cancellationToken);
8181
}
8282
}
8383
catch (Exception e)

0 commit comments

Comments
 (0)