Skip to content

Commit 21ffe9c

Browse files
committed
New cancellation token allows further improvement to reduce shutdown duration to close within 30 seconds
1 parent 32d1c05 commit 21ffe9c

File tree

7 files changed

+25
-18
lines changed

7 files changed

+25
-18
lines changed

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace ServiceControl.Audit.Persistence.InMemory
22
{
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using ServiceControl.Audit.Auditing.BodyStorage;
56
using ServiceControl.Audit.Persistence.UnitOfWork;
@@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore,
1213
bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings);
1314
}
1415

15-
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
16+
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
1617
{
1718
//The batchSize argument is ignored: the in-memory storage implementation doesn't support batching.
1819
return new ValueTask<IAuditIngestionUnitOfWork>(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher));

src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ class RavenAuditIngestionUnitOfWorkFactory(
1313
MinimumRequiredStorageState customCheckState)
1414
: IAuditIngestionUnitOfWorkFactory
1515
{
16-
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
16+
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
1717
{
18-
var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout);
18+
var timedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
19+
timedCancellationSource.CancelAfter(databaseConfiguration.BulkInsertCommitTimeout);
1920
var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token))
2021
.BulkInsert(new BulkInsertOptions { SkipOverwriteIfUnchanged = true, }, timedCancellationSource.Token);
2122

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
namespace ServiceControl.Audit.Persistence.UnitOfWork
22
{
3+
using System.Threading;
34
using System.Threading.Tasks;
45

56
public interface IAuditIngestionUnitOfWorkFactory
67
{
7-
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data
8+
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken = default); //Throws if not enough space or some other problem preventing from writing data
89
bool CanIngestMore();
910
}
1011
}

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
129129

130130
queueIngestor = transportInfrastructure.Receivers[inputEndpoint];
131131

132-
await auditIngestor.VerifyCanReachForwardingAddress();
132+
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);
133133

134134
await queueIngestor.StartReceive(cancellationToken);
135135

@@ -230,7 +230,7 @@ async Task Loop(CancellationToken cancellationToken)
230230
auditBatchSize.Record(contexts.Count);
231231
var sw = Stopwatch.StartNew();
232232

233-
await auditIngestor.Ingest(contexts);
233+
await auditIngestor.Ingest(contexts, cancellationToken);
234234
auditBatchDuration.Record(sw.ElapsedMilliseconds);
235235
}
236236
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)

src/ServiceControl.Audit/Auditing/AuditIngestor.cs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Infrastructure.Settings;
89
using Monitoring;
@@ -41,14 +42,14 @@ ITransportCustomization transportCustomization
4142
);
4243
}
4344

44-
public async Task Ingest(List<MessageContext> contexts)
45+
public async Task Ingest(List<MessageContext> contexts, CancellationToken cancellationToken)
4546
{
4647
if (Log.IsDebugEnabled)
4748
{
4849
Log.Debug($"Ingesting {contexts.Count} message contexts");
4950
}
5051

51-
var stored = await auditPersister.Persist(contexts);
52+
var stored = await auditPersister.Persist(contexts, cancellationToken);
5253

5354
try
5455
{
@@ -59,7 +60,7 @@ public async Task Ingest(List<MessageContext> contexts)
5960
Log.Debug($"Forwarding {stored.Count} messages");
6061
}
6162

62-
await Forward(stored, logQueueAddress);
63+
await Forward(stored, logQueueAddress, cancellationToken);
6364
if (Log.IsDebugEnabled)
6465
{
6566
Log.Debug("Forwarded messages");
@@ -86,7 +87,7 @@ public async Task Ingest(List<MessageContext> contexts)
8687
}
8788
}
8889

89-
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress)
90+
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress, CancellationToken cancellationToken)
9091
{
9192
var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK
9293
var index = 0;
@@ -103,7 +104,8 @@ Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forward
103104
var outgoingMessage = new OutgoingMessage(
104105
messageContext.NativeMessageId,
105106
messageContext.Headers,
106-
messageContext.Body);
107+
messageContext.Body
108+
);
107109

108110
// Forwarded messages should last as long as possible
109111
outgoingMessage.Headers.Remove(Headers.TimeToBeReceived);
@@ -115,12 +117,13 @@ Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forward
115117
return anyContext != null
116118
? messageDispatcher.Value.Dispatch(
117119
new TransportOperations(transportOperations),
118-
anyContext.TransportTransaction
120+
anyContext.TransportTransaction,
121+
cancellationToken
119122
)
120123
: Task.CompletedTask;
121124
}
122125

123-
public async Task VerifyCanReachForwardingAddress()
126+
public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken)
124127
{
125128
if (!settings.ForwardAuditMessages)
126129
{
@@ -137,7 +140,7 @@ public async Task VerifyCanReachForwardingAddress()
137140
)
138141
);
139142

140-
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction());
143+
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken);
141144
}
142145
catch (Exception e)
143146
{

src/ServiceControl.Audit/Auditing/AuditPersister.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Diagnostics;
66
using System.Diagnostics.Metrics;
77
using System.Text.Json;
8+
using System.Threading;
89
using System.Threading.Tasks;
910
using Infrastructure;
1011
using Monitoring;
@@ -23,7 +24,7 @@ class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
2324
IMessageSession messageSession,
2425
Lazy<IMessageDispatcher> messageDispatcher)
2526
{
26-
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts)
27+
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts, CancellationToken cancellationToken)
2728
{
2829
var stopwatch = Stopwatch.StartNew();
2930

@@ -37,7 +38,7 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
3738
try
3839
{
3940
// deliberately not using the using statement because we dispose async explicitly
40-
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count);
41+
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count, cancellationToken);
4142
var inserts = new List<Task>(contexts.Count);
4243
foreach (var context in contexts)
4344
{

src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public ImportFailedAudits(
2323

2424
public async Task Run(CancellationToken cancellationToken = default)
2525
{
26-
await auditIngestor.VerifyCanReachForwardingAddress();
26+
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);
2727

2828
var succeeded = 0;
2929
var failed = 0;
@@ -37,7 +37,7 @@ await failedAuditStore.ProcessFailedMessages(
3737
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
3838
messageContext.SetTaskCompletionSource(taskCompletionSource);
3939

40-
await auditIngestor.Ingest([messageContext]);
40+
await auditIngestor.Ingest([messageContext], cancellationToken);
4141

4242
await taskCompletionSource.Task;
4343

0 commit comments

Comments
 (0)