-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathImportFailedAudits.cs
More file actions
79 lines (67 loc) · 3.22 KB
/
ImportFailedAudits.cs
File metadata and controls
79 lines (67 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
namespace ServiceControl.Audit.Auditing
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Infrastructure.Settings;
using NServiceBus.Extensibility;
using NServiceBus.Logging;
using NServiceBus.Transport;
using Persistence;
public class ImportFailedAudits
{
public ImportFailedAudits(
IFailedAuditStorage failedAuditStore,
AuditIngestor auditIngestor,
Settings settings)
{
this.settings = settings;
this.failedAuditStore = failedAuditStore;
this.auditIngestor = auditIngestor;
}
public async Task Run(CancellationToken cancellationToken = default)
{
await auditIngestor.VerifyCanReachForwardingAddress();
var succeeded = 0;
var failed = 0;
await failedAuditStore.ProcessFailedMessages(
async (transportMessage, markComplete, token) =>
{
try
{
var messageContext = new MessageContext(transportMessage.Id, transportMessage.Headers, transportMessage.Body, EmptyTransaction, settings.AuditQueue, EmptyContextBag);
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
messageContext.SetTaskCompletionSource(taskCompletionSource);
await auditIngestor.Ingest([messageContext]);
await taskCompletionSource.Task;
await markComplete(token);
succeeded++;
if (Logger.IsDebugEnabled)
{
Logger.Debug($"Successfully re-imported failed audit message {transportMessage.Id}.");
}
}
catch (OperationCanceledException e) when (token.IsCancellationRequested)
{
Logger.Info("Cancelled", e);
}
catch (Exception e)
{
Logger.Error($"Error while attempting to re-import failed audit message {transportMessage.Id}.", e);
failed++;
}
}, cancellationToken);
Logger.Info($"Done re-importing failed audits. Successfully re-imported {succeeded} messages. Failed re-importing {failed} messages.");
if (failed > 0)
{
Logger.Warn($"{failed} messages could not be re-imported. This could indicate a problem with the data. Contact Particular support if you need help with recovering the messages.");
}
}
readonly IFailedAuditStorage failedAuditStore;
readonly AuditIngestor auditIngestor;
readonly Settings settings;
static readonly TransportTransaction EmptyTransaction = new TransportTransaction();
static readonly ContextBag EmptyContextBag = new ContextBag();
static readonly ILog Logger = LogManager.GetLogger(typeof(ImportFailedAudits));
}
}