Skip to content

Commit e2e28a2

Browse files
committed
Fix API's are rebase with forcing accepting all changes from master
# Conflicts: # src/ServiceControl.Audit/Auditing/AuditIngestion.cs # src/ServiceControl.Audit/Auditing/AuditIngestor.cs
1 parent 7fb0a01 commit e2e28a2

File tree

3 files changed

+15
-13
lines changed

3 files changed

+15
-13
lines changed

src/ServiceControl.Audit/Auditing/AuditIngestion.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ public AuditIngestion(
2727
AuditIngestor auditIngestor,
2828
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
2929
IHostApplicationLifetime applicationLifetime,
30-
IngestionMetrics metrics)
30+
IngestionMetrics metrics
31+
)
3132
{
3233
inputEndpoint = settings.AuditQueue;
3334
this.transportCustomization = transportCustomization;
@@ -132,7 +133,7 @@ async Task SetUpAndStartInfrastructure(CancellationToken cancellationToken)
132133

133134
messageReceiver = transportInfrastructure.Receivers[inputEndpoint];
134135

135-
await auditIngestor.VerifyCanReachForwardingAddress();
136+
await auditIngestor.VerifyCanReachForwardingAddress(cancellationToken);
136137
await messageReceiver.StartReceive(cancellationToken);
137138

138139
logger.Info(LogMessages.StartedInfrastructure);
@@ -236,7 +237,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
236237
contexts.Add(context);
237238
}
238239

239-
await auditIngestor.Ingest(contexts);
240+
await auditIngestor.Ingest(contexts, stoppingToken);
240241

241242
batchMetrics.Complete(contexts.Count);
242243
}

src/ServiceControl.Audit/Auditing/AuditIngestor.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Infrastructure.Settings;
89
using Monitoring;
@@ -42,15 +43,15 @@ ITransportCustomization transportCustomization
4243
);
4344
}
4445

45-
public async Task Ingest(List<MessageContext> contexts)
46+
public async Task Ingest(List<MessageContext> contexts, CancellationToken cancellationToken)
4647
{
47-
var stored = await auditPersister.Persist(contexts);
48+
var stored = await auditPersister.Persist(contexts, cancellationToken);
4849

4950
try
5051
{
5152
if (settings.ForwardAuditMessages)
5253
{
53-
await Forward(stored, logQueueAddress);
54+
await Forward(stored, logQueueAddress, cancellationToken);
5455
}
5556

5657
foreach (var context in contexts)
@@ -67,7 +68,7 @@ public async Task Ingest(List<MessageContext> contexts)
6768
}
6869
}
6970

70-
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress)
71+
Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forwardingAddress, CancellationToken cancellationToken)
7172
{
7273
var transportOperations = new TransportOperation[messageContexts.Count]; //We could allocate based on the actual number of ProcessedMessages but this should be OK
7374
var index = 0;
@@ -96,12 +97,11 @@ Task Forward(IReadOnlyCollection<MessageContext> messageContexts, string forward
9697
return anyContext != null
9798
? messageDispatcher.Value.Dispatch(
9899
new TransportOperations(transportOperations),
99-
anyContext.TransportTransaction
100-
)
100+
anyContext.TransportTransaction, cancellationToken)
101101
: Task.CompletedTask;
102102
}
103103

104-
public async Task VerifyCanReachForwardingAddress()
104+
public async Task VerifyCanReachForwardingAddress(CancellationToken cancellationToken)
105105
{
106106
if (!settings.ForwardAuditMessages)
107107
{
@@ -118,7 +118,7 @@ public async Task VerifyCanReachForwardingAddress()
118118
)
119119
);
120120

121-
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction());
121+
await messageDispatcher.Value.Dispatch(transportOperations, new TransportTransaction(), cancellationToken);
122122
}
123123
catch (Exception e)
124124
{

src/ServiceControl.Audit/Auditing/AuditPersister.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Text.Json;
6+
using System.Threading;
67
using System.Threading.Tasks;
78
using Infrastructure;
89
using Monitoring;
@@ -21,14 +22,14 @@ class AuditPersister(IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
2122
IMessageSession messageSession,
2223
Lazy<IMessageDispatcher> messageDispatcher)
2324
{
24-
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts)
25+
public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageContext> contexts, CancellationToken cancellationToken)
2526
{
2627
var storedContexts = new List<MessageContext>(contexts.Count);
2728
IAuditIngestionUnitOfWork unitOfWork = null;
2829
try
2930
{
3031
// deliberately not using the using statement because we dispose async explicitly
31-
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count);
32+
unitOfWork = await unitOfWorkFactory.StartNew(contexts.Count, cancellationToken);
3233
var inserts = new List<Task>(contexts.Count);
3334
foreach (var context in contexts)
3435
{

0 commit comments

Comments
 (0)