Skip to content

Commit 825bc8f

Browse files
committed
Fix API's are rebase with forcing accepting all changes from master
1 parent bf99b69 commit 825bc8f

File tree

3 files changed

+13
-12
lines changed

3 files changed

+13
-12
lines changed

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
128128

129129
messageReceiver = transportInfrastructure.Receivers[inputEndpoint];
130130

131-
await auditIngestor.VerifyCanReachForwardingAddress();
131+
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);
132132
await messageReceiver.StartReceive(cancellationToken);
133133

134134
logger.Info(LogMessages.StartedInfrastructure);
@@ -235,7 +235,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
235235

236236
recorder.Tags.Add("ingestion.batch_size", contexts.Count);
237237

238-
await auditIngestor.Ingest(contexts);
238+
await auditIngestor.Ingest(contexts, stoppingToken);
239239
}
240240

241241
consecutiveBatchFailuresCounter.Record(0);

src/ServiceControl.Audit/Auditing/AuditIngestor.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Collections.Generic;
55
using System.Diagnostics.Metrics;
66
using System.Linq;
7+
using System.Threading;
78
using System.Threading.Tasks;
89
using Infrastructure.Settings;
910
using Monitoring;
@@ -43,15 +44,15 @@ ITransportCustomization transportCustomization
4344
);
4445
}
4546

46-
public async Task Ingest(List<MessageContext> contexts)
47+
public async Task Ingest(List<MessageContext> contexts, CancellationToken cancellationToken)
4748
{
48-
var stored = await auditPersister.Persist(contexts);
49+
var stored = await auditPersister.Persist(contexts, cancellationToken);
4950

5051
try
5152
{
5253
if (settings.ForwardAuditMessages)
5354
{
54-
await Forward(stored, logQueueAddress);
55+
await Forward(stored, logQueueAddress, cancellationToken);
5556
forwardedMessagesCounter.Add(stored.Count);
5657
}
5758

@@ -69,7 +70,7 @@ public async Task Ingest(List<MessageContext> contexts)
6970
}
7071
}
7172

72-
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress)
73+
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress, CancellationToken cancellationToken)
7374
{
7475
var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK
7576
var index = 0;
@@ -98,12 +99,11 @@ Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forward
9899
return anyContext != null
99100
? messageDispatcher.Value.Dispatch(
100101
new TransportOperations(transportOperations),
101-
anyContext.TransportTransaction
102-
)
102+
anyContext.TransportTransaction, cancellationToken)
103103
: Task.CompletedTask;
104104
}
105105

106-
public async Task VerifyCanReachForwardingAddress()
106+
public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken)
107107
{
108108
if (!settings.ForwardAuditMessages)
109109
{
@@ -120,7 +120,7 @@ public async Task VerifyCanReachForwardingAddress()
120120
)
121121
);
122122

123-
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction());
123+
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken);
124124
}
125125
catch (Exception e)
126126
{

src/ServiceControl.Audit/Auditing/AuditPersister.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Text.Json;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Infrastructure;
89
using Monitoring;
@@ -21,14 +22,14 @@ class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
2122
IMessageSession messageSession,
2223
Lazy<IMessageDispatcher> messageDispatcher)
2324
{
24-
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts)
25+
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts, CancellationToken cancellationToken)
2526
{
2627
var storedContexts = new List<MessageContext>(contexts.Count);
2728
IAuditIngestionUnitOfWork unitOfWork = null;
2829
try
2930
{
3031
// deliberately not using the using statement because we dispose async explicitly
31-
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count);
32+
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count, cancellationToken);
3233
var inserts = new List<Task>(contexts.Count);
3334
foreach (var context in contexts)
3435
{

0 commit comments

Comments
 (0)