Skip to content

Commit 0589fba

Browse files
committed
More cancellation as test
1 parent 5e93c90 commit 0589fba

File tree

19 files changed

+116
-104
lines changed

19 files changed

+116
-104
lines changed

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAttachmentsBodyStorage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public Task Store(string bodyId, string contentType, int bodySize, Stream bodySt
4444
return Task.CompletedTask;
4545
}
4646

47-
public async Task<StreamResult> TryFetch(string bodyId)
47+
public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
4848
{
4949
var messageBody = messageBodies.FirstOrDefault(w => w.BodyId == bodyId);
5050

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditDataStore.cs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using ServiceControl.Audit.Auditing;
89
using ServiceControl.Audit.Auditing.BodyStorage;
@@ -29,7 +30,7 @@ public InMemoryAuditDataStore(IBodyStorage bodyStorage)
2930
failedAuditImports = [];
3031
}
3132

32-
public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input)
33+
public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken)
3334
{
3435
var sagaHistory = sagaHistories.FirstOrDefault(w => w.SagaId == input);
3536

@@ -41,7 +42,7 @@ public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input)
4142
return await Task.FromResult(new QueryResult<SagaHistory>(sagaHistory, new QueryStatsInfo(string.Empty, 1)));
4243
}
4344

44-
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo)
45+
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
4546
{
4647
var matched = messageViews
4748
.Where(w => !w.IsSystemMessage || includeSystemMessages)
@@ -50,7 +51,7 @@ public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSyst
5051
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
5152
}
5253

53-
public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo)
54+
public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
5455
{
5556
var messages = GetMessageIdsMatchingQuery(keyword);
5657

@@ -60,33 +61,33 @@ public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string keyword
6061
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count())));
6162
}
6263

63-
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo)
64+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
6465
{
6566
var messages = GetMessageIdsMatchingQuery(keyword);
6667

6768
var matched = messageViews.Where(w => w.ReceivingEndpoint.Name == endpoint && messages.Contains(w.MessageId)).ToList();
6869
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
6970
}
7071

71-
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo)
72+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
7273
{
7374
var matched = messageViews.Where(w => w.ReceivingEndpoint.Name == endpointName).ToList();
7475
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
7576
}
7677

77-
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo)
78+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
7879
{
7980
var matched = messageViews.Where(w => w.ConversationId == conversationId).ToList();
8081
return await Task.FromResult(new QueryResult<IList<MessagesView>>(matched, new QueryStatsInfo(string.Empty, matched.Count)));
8182
}
8283

83-
public async Task<MessageBodyView> GetMessageBody(string messageId)
84+
public async Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken)
8485
{
8586
var result = await GetMessageBodyFromMetadata(messageId);
8687

8788
if (!result.Found)
8889
{
89-
var fromAttachments = await GetMessageBodyFromAttachments(messageId);
90+
var fromAttachments = await GetMessageBodyFromAttachments(messageId, cancellationToken);
9091
if (fromAttachments.Found)
9192
{
9293
return fromAttachments;
@@ -121,9 +122,9 @@ IList<string> GetMessageIdsMatchingQuery(string keyword)
121122
.Select(pm => pm.MessageMetadata["MessageId"] as string)
122123
.ToList();
123124
}
124-
async Task<MessageBodyView> GetMessageBodyFromAttachments(string messageId)
125+
async Task<MessageBodyView> GetMessageBodyFromAttachments(string messageId, CancellationToken cancellationToken)
125126
{
126-
var fromBodyStorage = await bodyStorage.TryFetch(messageId);
127+
var fromBodyStorage = await bodyStorage.TryFetch(messageId, cancellationToken);
127128

128129
if (fromBodyStorage.HasResult)
129130
{
@@ -165,7 +166,7 @@ Task<MessageBodyView> GetMessageBodyFromMetadata(string messageId)
165166
return Task.FromResult(MessageBodyView.FromString(body, contentType, bodySize, string.Empty));
166167
}
167168

168-
public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints()
169+
public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken)
169170
{
170171
var knownEndpointsView = knownEndpoints
171172
.Select(x => new KnownEndpointsView
@@ -184,7 +185,7 @@ public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints()
184185
return await Task.FromResult(new QueryResult<IList<KnownEndpointsView>>(knownEndpointsView, new QueryStatsInfo(string.Empty, knownEndpointsView.Count)));
185186
}
186187

187-
public Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName)
188+
public Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken)
188189
{
189190
var results = messageViews
190191
.Where(m => m.ReceivingEndpoint.Name == endpointName && !m.IsSystemMessage)

src/ServiceControl.Audit.Persistence.RavenDB/RavenAttachmentsBodyStorage.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ public Task Store(string bodyId, string contentType, int bodySize, Stream bodySt
2323
.StoreAsync("body", bodyStream, contentType, cancellationToken);
2424
}
2525

26-
public async Task<StreamResult> TryFetch(string bodyId)
26+
public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
2727
{
28-
using var session = await sessionProvider.OpenSession();
29-
var result = await session.Advanced.Attachments.GetAsync($"MessageBodies/{bodyId}", "body");
28+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
29+
var result = await session.Advanced.Attachments.GetAsync($"MessageBodies/{bodyId}", "body", cancellationToken);
3030

3131
if (result == null)
3232
{

src/ServiceControl.Audit.Persistence.RavenDB/RavenAuditDataStore.cs

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Auditing.MessagesView;
89
using Extensions;
@@ -19,93 +20,93 @@
1920
class RavenAuditDataStore(IRavenSessionProvider sessionProvider, DatabaseConfiguration databaseConfiguration)
2021
: IAuditDataStore
2122
{
22-
public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input)
23+
public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken)
2324
{
24-
using var session = await sessionProvider.OpenSession();
25+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
2526
var sagaHistory = await
2627
session.Query<SagaHistory, SagaDetailsIndex>()
2728
.Statistics(out var stats)
28-
.SingleOrDefaultAsync(x => x.SagaId == input);
29+
.SingleOrDefaultAsync(x => x.SagaId == input, token: cancellationToken);
2930

3031
return sagaHistory == null ? QueryResult<SagaHistory>.Empty() : new QueryResult<SagaHistory>(sagaHistory, new QueryStatsInfo($"{stats.ResultEtag}", stats.TotalResults));
3132
}
3233

33-
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo)
34+
public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSystemMessages, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
3435
{
35-
using var session = await sessionProvider.OpenSession();
36+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
3637
var results = await session.Query<MessagesViewIndex.SortAndFilterOptions>(GetIndexName(isFullTextSearchEnabled))
3738
.Statistics(out var stats)
3839
.IncludeSystemMessagesWhere(includeSystemMessages)
3940
.Sort(sortInfo)
4041
.Paging(pagingInfo)
4142
.ToMessagesView()
42-
.ToListAsync();
43+
.ToListAsync(token: cancellationToken);
4344

4445
return new QueryResult<IList<MessagesView>>(results, stats.ToQueryStatsInfo());
4546
}
4647

47-
public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo)
48+
public async Task<QueryResult<IList<MessagesView>>> QueryMessages(string searchParam, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
4849
{
49-
using var session = await sessionProvider.OpenSession();
50+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
5051
var results = await session.Query<MessagesViewIndex.SortAndFilterOptions>(GetIndexName(isFullTextSearchEnabled))
5152
.Statistics(out var stats)
5253
.Search(x => x.Query, searchParam)
5354
.Sort(sortInfo)
5455
.Paging(pagingInfo)
5556
.ToMessagesView()
56-
.ToListAsync();
57+
.ToListAsync(token: cancellationToken);
5758

5859
return new QueryResult<IList<MessagesView>>(results, stats.ToQueryStatsInfo());
5960
}
6061

61-
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo)
62+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAndKeyword(string endpoint, string keyword, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
6263
{
63-
using var session = await sessionProvider.OpenSession();
64+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
6465
var results = await session.Query<MessagesViewIndex.SortAndFilterOptions>(GetIndexName(isFullTextSearchEnabled))
6566
.Statistics(out var stats)
6667
.Search(x => x.Query, keyword)
6768
.Where(m => m.ReceivingEndpointName == endpoint)
6869
.Sort(sortInfo)
6970
.Paging(pagingInfo)
7071
.ToMessagesView()
71-
.ToListAsync();
72+
.ToListAsync(token: cancellationToken);
7273

7374
return new QueryResult<IList<MessagesView>>(results, stats.ToQueryStatsInfo());
7475
}
7576

76-
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo)
77+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpoint(bool includeSystemMessages, string endpointName, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
7778
{
78-
using var session = await sessionProvider.OpenSession();
79+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
7980
var results = await session.Query<MessagesViewIndex.SortAndFilterOptions>(GetIndexName(isFullTextSearchEnabled))
8081
.Statistics(out var stats)
8182
.IncludeSystemMessagesWhere(includeSystemMessages)
8283
.Where(m => m.ReceivingEndpointName == endpointName)
8384
.Sort(sortInfo)
8485
.Paging(pagingInfo)
8586
.ToMessagesView()
86-
.ToListAsync();
87+
.ToListAsync(token: cancellationToken);
8788

8889
return new QueryResult<IList<MessagesView>>(results, stats.ToQueryStatsInfo());
8990
}
9091

91-
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo)
92+
public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationId(string conversationId, PagingInfo pagingInfo, SortInfo sortInfo, CancellationToken cancellationToken)
9293
{
93-
using var session = await sessionProvider.OpenSession();
94+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
9495
var results = await session.Query<MessagesViewIndex.SortAndFilterOptions>(GetIndexName(isFullTextSearchEnabled))
9596
.Statistics(out var stats)
9697
.Where(m => m.ConversationId == conversationId)
9798
.Sort(sortInfo)
9899
.Paging(pagingInfo)
99100
.ToMessagesView()
100-
.ToListAsync();
101+
.ToListAsync(token: cancellationToken);
101102

102103
return new QueryResult<IList<MessagesView>>(results, stats.ToQueryStatsInfo());
103104
}
104105

105-
public async Task<MessageBodyView> GetMessageBody(string messageId)
106+
public async Task<MessageBodyView> GetMessageBody(string messageId, CancellationToken cancellationToken)
106107
{
107-
using var session = await sessionProvider.OpenSession();
108-
var result = await session.Advanced.Attachments.GetAsync(messageId, "body");
108+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
109+
var result = await session.Advanced.Attachments.GetAsync(messageId, "body", cancellationToken);
109110

110111
if (result == null)
111112
{
@@ -120,10 +121,10 @@ public async Task<MessageBodyView> GetMessageBody(string messageId)
120121
);
121122
}
122123

123-
public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints()
124+
public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints(CancellationToken cancellationToken)
124125
{
125-
using var session = await sessionProvider.OpenSession();
126-
var endpoints = await session.Advanced.LoadStartingWithAsync<KnownEndpoint>(KnownEndpoint.CollectionName, pageSize: 1024);
126+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
127+
var endpoints = await session.Advanced.LoadStartingWithAsync<KnownEndpoint>(KnownEndpoint.CollectionName, pageSize: 1024, token: cancellationToken);
127128

128129
var knownEndpoints = endpoints
129130
.Select(x => new KnownEndpointsView
@@ -142,11 +143,11 @@ public async Task<QueryResult<IList<KnownEndpointsView>>> QueryKnownEndpoints()
142143
return new QueryResult<IList<KnownEndpointsView>>(knownEndpoints, new QueryStatsInfo(string.Empty, knownEndpoints.Count));
143144
}
144145

145-
public async Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName)
146+
public async Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpointName, CancellationToken cancellationToken)
146147
{
147148
var indexName = GetIndexName(isFullTextSearchEnabled);
148149

149-
using var session = await sessionProvider.OpenSession();
150+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
150151
// Maximum should really be 31 queries if there are 30 days of audit data, but default limit is 30
151152
session.Advanced.MaxNumberOfRequestsPerSession = 40;
152153

@@ -155,7 +156,7 @@ public async Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpoi
155156
var oldestMsg = await session.Query<MessagesViewIndex.SortAndFilterOptions>(indexName)
156157
.Where(m => m.ReceivingEndpointName == endpointName)
157158
.OrderBy(m => m.ProcessedAt)
158-
.FirstOrDefaultAsync();
159+
.FirstOrDefaultAsync(token: cancellationToken);
159160

160161
if (oldestMsg != null)
161162
{
@@ -173,11 +174,15 @@ public async Task<QueryResult<IList<AuditCount>>> QueryAuditCounts(string endpoi
173174
.Statistics(out var stats)
174175
.Where(m => m.ReceivingEndpointName == endpointName && !m.IsSystemMessage && m.ProcessedAt >= date && m.ProcessedAt < nextDate)
175176
.Take(0)
176-
.ToArrayAsync();
177+
.ToArrayAsync(token: cancellationToken);
177178

178179
if (stats.TotalResults > 0)
179180
{
180-
results.Add(new AuditCount { UtcDate = date, Count = stats.TotalResults });
181+
results.Add(new AuditCount
182+
{
183+
UtcDate = date,
184+
Count = stats.TotalResults
185+
});
181186
}
182187
}
183188
}

src/ServiceControl.Audit.Persistence.Tests.RavenDB/EmbeddedLifecycleTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public override async Task Setup()
3535
[Test]
3636
public async Task Verify_embedded_database()
3737
{
38-
await DataStore.QueryKnownEndpoints();
38+
await DataStore.QueryKnownEndpoints(TestContext.CurrentContext.CancellationToken);
3939

4040
Assert.Multiple(() =>
4141
{

0 commit comments

Comments
 (0)