Skip to content

Commit bb515dc

Browse files
committed
Support cancellation for error ingestion
1 parent 477f2aa commit bb515dc

File tree

11 files changed

+48
-34
lines changed

11 files changed

+48
-34
lines changed

src/ServiceControl.Persistence.RavenDB/UnitOfWork/RavenIngestionUnitOfWork.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
namespace ServiceControl.Persistence.RavenDB
22
{
33
using System.Collections.Concurrent;
4+
using System.Threading;
45
using System.Threading.Tasks;
5-
using Raven.Client.Documents;
66
using Raven.Client.Documents.Commands.Batches;
77
using ServiceControl.Persistence.UnitOfWork;
88

@@ -22,12 +22,12 @@ public RavenIngestionUnitOfWork(IRavenSessionProvider sessionProvider, Expiratio
2222

2323
internal void AddCommand(ICommandData command) => commands.Enqueue(command);
2424

25-
public override async Task Complete()
25+
public override async Task Complete(CancellationToken cancellationToken)
2626
{
27-
using var session = await sessionProvider.OpenSession();
27+
using var session = await sessionProvider.OpenSession(cancellationToken: cancellationToken);
2828
// not really interested in the batch results since a batch is atomic
2929
session.Advanced.Defer(commands.ToArray());
30-
await session.SaveChangesAsync();
30+
await session.SaveChangesAsync(cancellationToken);
3131
}
3232
}
3333
}

src/ServiceControl.Persistence.Tests.RavenDB/Expiration/MessageExpiryTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public async Task SingleMessageMarkedAsArchiveShouldExpire()
3838
{
3939
await uow.Recoverability.RecordFailedProcessingAttempt(context, attempt, []);
4040

41-
await uow.Complete();
41+
await uow.Complete(TestContext.CurrentContext.CancellationToken);
4242
}
4343

4444
CompleteDatabaseOperation();
@@ -68,7 +68,7 @@ public async Task AllMessagesInUnArchivedGroupShouldNotExpire()
6868
await uow.Recoverability.RecordFailedProcessingAttempt(contextA, attemptA, [new FailedMessage.FailureGroup { Id = groupIdA }]);
6969
await uow.Recoverability.RecordFailedProcessingAttempt(contextB, attemptB, [new FailedMessage.FailureGroup { Id = groupIdB }]);
7070

71-
await uow.Complete();
71+
await uow.Complete(TestContext.CurrentContext.CancellationToken);
7272
}
7373

7474
CompleteDatabaseOperation();
@@ -99,7 +99,7 @@ public async Task AllMessagesInArchivedGroupShouldExpire()
9999
{
100100
await uow.Recoverability.RecordFailedProcessingAttempt(context, attempt, [new FailedMessage.FailureGroup { Id = groupId }]);
101101

102-
await uow.Complete();
102+
await uow.Complete(TestContext.CurrentContext.CancellationToken);
103103
}
104104

105105
CompleteDatabaseOperation();
@@ -122,7 +122,7 @@ public async Task SingleMessageMarkedAsResolvedShouldExpire()
122122
{
123123
await uow.Recoverability.RecordFailedProcessingAttempt(context, attempt, []);
124124

125-
await uow.Complete();
125+
await uow.Complete(TestContext.CurrentContext.CancellationToken);
126126
}
127127

128128
CompleteDatabaseOperation();
@@ -145,7 +145,7 @@ public async Task RetryConfirmationProcessingShouldTriggerExpiration()
145145
{
146146
await uow.Recoverability.RecordFailedProcessingAttempt(context, attempt, []);
147147

148-
await uow.Complete();
148+
await uow.Complete(TestContext.CurrentContext.CancellationToken);
149149
}
150150

151151
CompleteDatabaseOperation();
@@ -158,7 +158,7 @@ public async Task RetryConfirmationProcessingShouldTriggerExpiration()
158158
{
159159
await uow.Recoverability.RecordSuccessfulRetry(errors.Results.First().Id);
160160

161-
await uow.Complete();
161+
await uow.Complete(TestContext.CurrentContext.CancellationToken);
162162
}
163163

164164
await WaitUntil(async () => (await GetAllMessages()).Results.Count == 0, "Retry confirmation should cause message removal.");

src/ServiceControl.Persistence.Tests/BodyStorage/AttachmentsBodyStorageTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ async Task RunTest(Func<Dictionary<string, string>, string> getIdToQuery)
7070
var groups = new List<FailedMessage.FailureGroup>();
7171

7272
await uow.Recoverability.RecordFailedProcessingAttempt(context, processingAttempt, groups);
73-
await uow.Complete();
73+
await uow.Complete(cancellationSource.Token);
7474
}
7575

7676
CompleteDatabaseOperation();

src/ServiceControl.Persistence.Tests/MonitoringDataStoreTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public async Task Unit_of_work_detects_endpoint()
134134
{
135135
await unitOfWork.Monitoring.RecordKnownEndpoint(knownEndpoint);
136136

137-
await unitOfWork.Complete();
137+
await unitOfWork.Complete(TestContext.CurrentContext.CancellationToken);
138138
}
139139

140140
CompleteDatabaseOperation();

src/ServiceControl.Persistence.Tests/Recoverability/RetryConfirmationProcessorTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public async Task Should_handle_multiple_retry_confirmations_in_the_error_ingest
4949
var unitOfWork = await UnitOfWorkFactory.StartNew();
5050
await Processor.Process(messageContexts, unitOfWork);
5151

52-
Assert.DoesNotThrowAsync(() => unitOfWork.Complete());
52+
Assert.DoesNotThrowAsync(() => unitOfWork.Complete(TestContext.CurrentContext.CancellationToken));
5353
}
5454

5555
[Test]
@@ -71,7 +71,7 @@ public async Task Should_handle_retry_confirmation_followed_by_legacy_command()
7171

7272
var unitOfWork = await UnitOfWorkFactory.StartNew();
7373
await Processor.Process(messageContexts, unitOfWork);
74-
await unitOfWork.Complete();
74+
await unitOfWork.Complete(TestContext.CurrentContext.CancellationToken);
7575

7676
Assert.DoesNotThrowAsync(
7777
() => Handler.Handle(CreateLegacyRetryConfirmationCommand(), new TestableInvokeHandlerContext()));
@@ -89,7 +89,7 @@ public async Task Should_handle_legacy_retry_confirmation_command_followed_by_ne
8989

9090
var unitOfWork = await UnitOfWorkFactory.StartNew();
9191
await Processor.Process(messageContexts, unitOfWork);
92-
Assert.DoesNotThrowAsync(() => unitOfWork.Complete());
92+
Assert.DoesNotThrowAsync(() => unitOfWork.Complete(TestContext.CurrentContext.CancellationToken));
9393
}
9494

9595
static MarkMessageFailureResolvedByRetry CreateLegacyRetryConfirmationCommand()

src/ServiceControl.Persistence/UnitOfWork/FallbackIngestionUnitOfWork.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace ServiceControl.Persistence.UnitOfWork
22
{
3+
using System.Threading;
34
using System.Threading.Tasks;
45

56
// HINT: This allows an implementor to provide only part of the implementation and allow the other part
@@ -18,7 +19,11 @@ public FallbackIngestionUnitOfWork(IIngestionUnitOfWork primary, IIngestionUnitO
1819
Recoverability = primary.Recoverability ?? fallback.Recoverability;
1920
}
2021

21-
public override Task Complete() => Task.WhenAll(primary.Complete(), fallback.Complete());
22+
public override Task Complete(CancellationToken cancellationToken)
23+
=> Task.WhenAll(
24+
primary.Complete(cancellationToken),
25+
fallback.Complete(cancellationToken)
26+
);
2227

2328
protected override void Dispose(bool disposing)
2429
{
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
namespace ServiceControl.Persistence.UnitOfWork
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
public interface IIngestionUnitOfWork : IDisposable
78
{
89
IMonitoringIngestionUnitOfWork Monitoring { get; }
910
IRecoverabilityIngestionUnitOfWork Recoverability { get; }
10-
Task Complete();
11+
Task Complete(CancellationToken cancellationToken);
1112
}
1213
}

src/ServiceControl.Persistence/UnitOfWork/IngestionUnitOfWorkBase.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceControl.Persistence.UnitOfWork
22
{
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
public abstract class IngestionUnitOfWorkBase : IIngestionUnitOfWork
@@ -19,6 +20,6 @@ public void Dispose()
1920

2021
public IMonitoringIngestionUnitOfWork Monitoring { get; protected set; }
2122
public IRecoverabilityIngestionUnitOfWork Recoverability { get; protected set; }
22-
public virtual Task Complete() => Task.CompletedTask;
23+
public virtual Task Complete(CancellationToken cancellationToken) => Task.CompletedTask;
2324
}
2425
}

src/ServiceControl/Operations/ErrorIngestion.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
9696
batchSizeMeter.Mark(contexts.Count);
9797
using (batchDurationMeter.Measure())
9898
{
99-
await ingestor.Ingest(contexts);
99+
await ingestor.Ingest(contexts, stoppingToken);
100100
}
101101
}
102102
catch (Exception e)
@@ -214,7 +214,7 @@ async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
214214

215215
if (settings.ForwardErrorMessages)
216216
{
217-
await ingestor.VerifyCanReachForwardingAddress();
217+
await ingestor.VerifyCanReachForwardingAddress(cancellationToken);
218218
}
219219

220220
await messageReceiver.StartReceive(cancellationToken);

src/ServiceControl/Operations/ErrorIngestor.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Collections.Generic;
55
using System.Diagnostics;
66
using System.Linq;
7+
using System.Threading;
78
using System.Threading.Tasks;
89
using Contracts.Operations;
910
using Infrastructure.DomainEvents;
@@ -49,7 +50,7 @@ public ErrorIngestor(Metrics metrics,
4950
logQueueAddress = new UnicastAddressTag(transportCustomization.ToTransportQualifiedQueueName(this.settings.ErrorLogQueue));
5051
}
5152

52-
public async Task Ingest(List<MessageContext> contexts)
53+
public async Task Ingest(List<MessageContext> contexts, CancellationToken cancellationToken)
5354
{
5455
var failedMessages = new List<MessageContext>(contexts.Count);
5556
var retriedMessages = new List<MessageContext>(contexts.Count);
@@ -67,7 +68,7 @@ public async Task Ingest(List<MessageContext> contexts)
6768
}
6869

6970

70-
var storedFailed = await PersistFailedMessages(failedMessages, retriedMessages);
71+
var storedFailed = await PersistFailedMessages(failedMessages, retriedMessages, cancellationToken);
7172

7273
try
7374
{
@@ -89,7 +90,7 @@ public async Task Ingest(List<MessageContext> contexts)
8990
{
9091
Logger.Debug($"Forwarding {storedFailed.Count} messages");
9192
}
92-
await Forward(storedFailed);
93+
await Forward(storedFailed, cancellationToken);
9394
if (Logger.IsDebugEnabled)
9495
{
9596
Logger.Debug("Forwarded messages");
@@ -113,7 +114,7 @@ public async Task Ingest(List<MessageContext> contexts)
113114
}
114115
}
115116

116-
async Task<IReadOnlyList<MessageContext>> PersistFailedMessages(List<MessageContext> failedMessageContexts, List<MessageContext> retriedMessageContexts)
117+
async Task<IReadOnlyList<MessageContext>> PersistFailedMessages(List<MessageContext> failedMessageContexts, List<MessageContext> retriedMessageContexts, CancellationToken cancellationToken)
117118
{
118119
var stopwatch = Stopwatch.StartNew();
119120

@@ -130,7 +131,7 @@ async Task<IReadOnlyList<MessageContext>> PersistFailedMessages(List<MessageCont
130131

131132
using (bulkInsertDurationMeter.Measure())
132133
{
133-
await unitOfWork.Complete();
134+
await unitOfWork.Complete(cancellationToken);
134135
}
135136
return storedFailedMessageContexts;
136137
}
@@ -154,7 +155,7 @@ async Task<IReadOnlyList<MessageContext>> PersistFailedMessages(List<MessageCont
154155
}
155156
}
156157

157-
Task Forward(IReadOnlyCollection<MessageContext> messageContexts)
158+
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, CancellationToken cancellationToken)
158159
{
159160
var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK
160161
var index = 0;
@@ -177,11 +178,11 @@ Task Forward(IReadOnlyCollection<MessageContext> messageContexts)
177178
return anyContext != null
178179
? messageDispatcher.Value.Dispatch(
179180
new TransportOperations(transportOperations),
180-
anyContext.TransportTransaction)
181+
anyContext.TransportTransaction, cancellationToken)
181182
: Task.CompletedTask;
182183
}
183184

184-
public async Task VerifyCanReachForwardingAddress()
185+
public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken)
185186
{
186187
try
187188
{
@@ -193,7 +194,7 @@ public async Task VerifyCanReachForwardingAddress()
193194
)
194195
);
195196

196-
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction());
197+
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken);
197198
}
198199
catch (Exception e)
199200
{

0 commit comments

Comments
 (0)