Skip to content

Commit 477f2aa

Browse files
committed
Support cancellation of audit ingestion
1 parent fca1ba5 commit 477f2aa

File tree

5 files changed

+16
-7
lines changed

5 files changed

+16
-7
lines changed

src/ServiceControl.Audit.Persistence.InMemory/InMemoryAuditIngestionUnitOfWorkFactory.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace ServiceControl.Audit.Persistence.InMemory
22
{
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using ServiceControl.Audit.Auditing.BodyStorage;
56
using ServiceControl.Audit.Persistence.UnitOfWork;
@@ -12,7 +13,7 @@ public InMemoryAuditIngestionUnitOfWorkFactory(InMemoryAuditDataStore dataStore,
1213
bodyStorageEnricher = new BodyStorageEnricher(bodyStorage, settings);
1314
}
1415

15-
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
16+
public ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
1617
{
1718
//The batchSize argument is ignored: the in-memory storage implementation doesn't support batching.
1819
return new ValueTask<IAuditIngestionUnitOfWork>(new InMemoryAuditIngestionUnitOfWork(dataStore, bodyStorageEnricher));

src/ServiceControl.Audit.Persistence.RavenDB/UnitOfWork/RavenAuditUnitOfWorkFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class RavenAuditIngestionUnitOfWorkFactory(
1313
MinimumRequiredStorageState customCheckState)
1414
: IAuditIngestionUnitOfWorkFactory
1515
{
16-
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize)
16+
public async ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken)
1717
{
1818
var timedCancellationSource = new CancellationTokenSource(databaseConfiguration.BulkInsertCommitTimeout);
1919
var bulkInsert = (await documentStoreProvider.GetDocumentStore(timedCancellationSource.Token))

src/ServiceControl.Audit.Persistence.Tests/PersistenceTestFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ protected string GetManifestPath()
6464
configuration.AuditIngestionUnitOfWorkFactory;
6565

6666
protected ValueTask<IAuditIngestionUnitOfWork> StartAuditUnitOfWork(int batchSize) =>
67-
AuditIngestionUnitOfWorkFactory.StartNew(batchSize);
67+
AuditIngestionUnitOfWorkFactory.StartNew(batchSize, TestContext.CurrentContext.CancellationToken);
6868

6969
protected IServiceProvider ServiceProvider => configuration.ServiceProvider;
7070

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
namespace ServiceControl.Audit.Persistence.UnitOfWork
22
{
3+
using System.Threading;
34
using System.Threading.Tasks;
45

56
public interface IAuditIngestionUnitOfWorkFactory
67
{
7-
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize); //Throws if not enough space or some other problem preventing from writing data
8+
ValueTask<IAuditIngestionUnitOfWork> StartNew(int batchSize, CancellationToken cancellationToken); //Throws if not enough space or some other problem preventing from writing data
89
bool CanIngestMore();
910
}
1011
}

src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public ImportFailedAudits(
2323

2424
public async Task Run(CancellationToken cancellationToken = default)
2525
{
26-
await auditIngestor.VerifyCanReachForwardingAddress();
26+
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);
2727

2828
var succeeded = 0;
2929
var failed = 0;
@@ -33,11 +33,18 @@ await failedAuditStore.ProcessFailedMessages(
3333
{
3434
try
3535
{
36-
var messageContext = new MessageContext(transportMessage.Id, transportMessage.Headers, transportMessage.Body, EmptyTransaction, settings.AuditQueue, EmptyContextBag);
36+
var messageContext = new MessageContext(
37+
transportMessage.Id,
38+
transportMessage.Headers,
39+
transportMessage.Body,
40+
EmptyTransaction,
41+
settings.AuditQueue,
42+
EmptyContextBag
43+
);
3744
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
3845
messageContext.SetTaskCompletionSource(taskCompletionSource);
3946

40-
await auditIngestor.Ingest([messageContext]);
47+
await auditIngestor.Ingest([messageContext], cancellationToken);
4148

4249
await taskCompletionSource.Task;
4350

0 commit comments

Comments
 (0)