Skip to content

Commit 17d5c3e

Browse files
authored
Support cancellation of web api and audit storage operations (#4782)
* Support cancellation of audit ingestion * Support cancellation for error ingestion * Added code comments on how lifetime is managed for `CancellationTokenSource` with `RavenAuditIngestionUnitOfWork` * started undertaking adding support for cancellation in domain events but pretty much all storage API's aren't supporting cancellation.... * More cancellation token propagation * OnMessage: Invoke `TrySetCanceled` when cancelled to return ASAP * Fix API's are rebase with forcing accepting all changes from master # Conflicts: # src/ServiceControl.Audit/Auditing/AuditIngestion.cs # src/ServiceControl.Audit/Auditing/AuditIngestor.cs * Suggestion by feedback to pass CT's to RecordProcessedMessage, RecordSagaSnapshot and RecordKnownEndpoint * Fixes tests * Cancel OnMessage immediately IHostedService.StopAsync is invoked * Alternative to previous commit, stop receviers and shutdown with already cancelled token. This should terminate ASAP BUT still cleanup all resources if properly implemented by transports/persisters. * Add clarifying code comment on why a CancellationToken is passed in the cancelled state * More cancellation as test * APIApprovels HTTP API routes
1 parent c950dda commit 17d5c3e

File tree

67 files changed

+328
-250
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+328
-250
lines changed

src/ServiceControl.AcceptanceTests.RavenDB/Recoverability/MessageFailures/When_a_message_fails_to_import.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceControl.AcceptanceTests.RavenDB.Recoverability.MessageFailures
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using AcceptanceTesting;
67
using AcceptanceTesting.EndpointTemplates;
@@ -75,7 +76,7 @@ public async Task It_can_be_reimported()
7576

7677
class MessageFailedHandler(MyContext scenarioContext) : IDomainHandler<MessageFailed>
7778
{
78-
public Task Handle(MessageFailed domainEvent)
79+
public Task Handle(MessageFailed domainEvent, CancellationToken cancellationToken)
7980
{
8081
scenarioContext.MessageFailedEventPublished = true;
8182
return Task.CompletedTask;

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Collections.Generic;
44
using System.IO;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using ServiceControl.Audit.Auditing.BodyStorage;
89

@@ -15,7 +16,7 @@ public InMemoryAttachmentsBodyStorage()
1516
messageBodies = [];
1617
}
1718

18-
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
19+
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
1920
{
2021
var messageBody = messageBodies.FirstOrDefault(w => w.BodyId == bodyId);
2122

@@ -43,7 +44,7 @@ public Task Store(string bodyId, string contentType, int bodySize, Stream bodySt
4344
return Task.CompletedTask;
4445
}
4546

46-
public async Task<StreamResult> TryFetch(string bodyId)
47+
public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
4748
{
4849
var messageBody = messageBodies.FirstOrDefault(w => w.BodyId == bodyId);
4950

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditDataStore.cs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using ServiceControl.Audit.Auditing;
89
using ServiceControl.Audit.Auditing.BodyStorage;
@@ -29,7 +30,7 @@ public InMemoryAuditDataStore(IBodyStorage bodyStorage)
2930
failedAuditImports = [];
3031
}
3132

32-
public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input)
33+
public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken)
3334
{
3435
var sagaHistory = sagaHistories.FirstOrDefault(w => w.SagaId == input);
3536

@@ -41,7 +42,7 @@ public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input)
4142
return await Task.FromResult(new QueryResult<SagaHistory>(sagaHistory, new QueryStatsInfo(string.Empty, 1)));
4243
}
4344

44-
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo)
45+
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
4546
{
4647
var matched = messageViews
4748
.Where(w => !w.IsSystemMessage || includeSystemMessages)
@@ -50,7 +51,7 @@ public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSyst
5051
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
5152
}
5253

53-
public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo)
54+
public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
5455
{
5556
var messages = GetMessageIdsMatchingQuery(keyword);
5657

@@ -60,33 +61,33 @@ public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword
6061
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count())));
6162
}
6263

63-
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo)
64+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
6465
{
6566
var messages = GetMessageIdsMatchingQuery(keyword);
6667

6768
var matched = messageViews.Where(w => w.ReceivingEndpoint.Name == endpoint && messages.Contains(w.MessageId)).ToList();
6869
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
6970
}
7071

71-
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo)
72+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
7273
{
7374
var matched = messageViews.Where(w => w.ReceivingEndpoint.Name == endpointName).ToList();
7475
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
7576
}
7677

77-
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo)
78+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
7879
{
7980
var matched = messageViews.Where(w => w.ConversationId == conversationId).ToList();
8081
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
8182
}
8283

83-
public async Task<MessageBodyView> GetMessageBody(string messageId)
84+
public async Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken)
8485
{
8586
var result = await GetMessageBodyFromMetadata(messageId);
8687

8788
if (!result.Found)
8889
{
89-
var fromAttachments = await GetMessageBodyFromAttachments(messageId);
90+
var fromAttachments = await GetMessageBodyFromAttachments(messageId, cancellationToken);
9091
if (fromAttachments.Found)
9192
{
9293
return fromAttachments;
@@ -121,9 +122,9 @@ IList<string> GetMessageIdsMatchingQuery(string keyword)
121122
.Select(pm => pm.MessageMetadata["MessageId"] as string)
122123
.ToList();
123124
}
124-
async Task<MessageBodyView> GetMessageBodyFromAttachments(string messageId)
125+
async Task<MessageBodyView> GetMessageBodyFromAttachments(string messageId, CancellationToken cancellationToken)
125126
{
126-
var fromBodyStorage = await bodyStorage.TryFetch(messageId);
127+
var fromBodyStorage = await bodyStorage.TryFetch(messageId, cancellationToken);
127128

128129
if (fromBodyStorage.HasResult)
129130
{
@@ -165,7 +166,7 @@ Task<MessageBodyView> GetMessageBodyFromMetadata(string messageId)
165166
return Task.FromResult(MessageBodyView.FromString(body, contentType, bodySize, string.Empty));
166167
}
167168

168-
public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints()
169+
public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken)
169170
{
170171
var knownEndpointsView = knownEndpoints
171172
.Select(x => new KnownEndpointsView
@@ -184,7 +185,7 @@ public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints()
184185
return await Task.FromResult(new QueryResult<IList<KnownEndpointsView>>(knownEndpointsView, new QueryStatsInfo(string.Empty, knownEndpointsView.Count)));
185186
}
186187

187-
public Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName)
188+
public Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken)
188189
{
189190
var results = messageViews
190191
.Where(m => m.ReceivingEndpoint.Name == endpointName && !m.IsSystemMessage)

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWork.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceControl.Audit.Persistence.InMemory
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Auditing.BodyStorage;
67
using ServiceControl.Audit.Auditing;
@@ -15,21 +16,21 @@ class InMemoryAuditIngestionUnitOfWork(
1516
{
1617
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
1718

18-
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint)
19+
public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken cancellationToken)
1920
{
2021
dataStore.knownEndpoints.Add(knownEndpoint);
2122
return Task.CompletedTask;
2223
}
2324

24-
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body)
25+
public async Task RecordProcessedMessage(ProcessedMessage processedMessage, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
2526
{
2627
if (!body.IsEmpty)
2728
{
28-
await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage);
29+
await bodyStorageEnricher.StoreAuditMessageBody(body, processedMessage, cancellationToken);
2930
}
3031
await dataStore.SaveProcessedMessage(processedMessage);
3132
}
3233

33-
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot) => dataStore.SaveSagaSnapshot(sagaSnapshot);
34+
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken) => dataStore.SaveSagaSnapshot(sagaSnapshot);
3435
}
3536
}

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace ServiceControl.Audit.Persistence.InMemory
22
{
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using ServiceControl.Audit.Auditing.BodyStorage;
56
using ServiceControl.Audit.Persistence.UnitOfWork;
@@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore,
1213
bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings);
1314
}
1415

15-
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
16+
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
1617
{
1718
//The batchSize argument is ignored: the in-memory storage implementation doesn't support batching.
1819
return new ValueTask<IAuditIngestionUnitOfWork>(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher));

src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceControl.Audit.Persistence.RavenDB
22
{
33
using System.IO;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Auditing.BodyStorage;
67
using Raven.Client.Documents.BulkInsert;
@@ -11,21 +12,21 @@ class RavenAttachmentsBodyStorage(
1112
int settingsMaxBodySizeToStore)
1213
: IBodyStorage
1314
{
14-
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream)
15+
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
1516
{
1617
if (bodySize > settingsMaxBodySizeToStore)
1718
{
1819
return Task.CompletedTask;
1920
}
2021

2122
return bulkInsert.AttachmentsFor(bodyId)
22-
.StoreAsync("body", bodyStream, contentType);
23+
.StoreAsync("body", bodyStream, contentType, cancellationToken);
2324
}
2425

25-
public async Task<StreamResult> TryFetch(string bodyId)
26+
public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
2627
{
27-
using var session = await sessionProvider.OpenSession();
28-
var result = await session.Advanced.Attachments.GetAsync($"MessageBodies/{bodyId}", "body");
28+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
29+
var result = await session.Advanced.Attachments.GetAsync($"MessageBodies/{bodyId}", "body", cancellationToken);
2930

3031
if (result == null)
3132
{

0 commit comments

Comments
 (0)