1- namespace ServiceControl . Audit . Auditing
1+ namespace ServiceControl . Audit . Auditing ;
2+
3+ using System ;
4+ using System . Diagnostics ;
5+ using System . IO ;
6+ using System . Runtime . InteropServices ;
7+ using System . Runtime . Versioning ;
8+ using System . Threading ;
9+ using System . Threading . Tasks ;
10+ using Infrastructure ;
11+ using NServiceBus . Logging ;
12+ using NServiceBus . Transport ;
13+ using Persistence ;
14+ using Configuration ;
15+ using Metrics ;
16+ using ServiceControl . Infrastructure ;
17+
18+ class AuditIngestionFaultPolicy
219{
3- using System ;
4- using System . Diagnostics ;
5- using System . Diagnostics . Metrics ;
6- using System . IO ;
7- using System . Runtime . InteropServices ;
8- using System . Runtime . Versioning ;
9- using System . Threading ;
10- using System . Threading . Tasks ;
11- using Infrastructure ;
12- using NServiceBus . Logging ;
13- using NServiceBus . Transport ;
14- using Persistence ;
15- using Configuration ;
16- using ServiceControl . Infrastructure ;
17-
18- class AuditIngestionFaultPolicy
20+ public AuditIngestionFaultPolicy ( IFailedAuditStorage failedAuditStorage , LoggingSettings settings , Func < string , Exception , Task > onCriticalError , IngestionMetrics metrics )
1921 {
20- readonly IFailedAuditStorage failedAuditStorage ;
21- readonly string logPath ;
22- readonly ImportFailureCircuitBreaker failureCircuitBreaker ;
22+ failureCircuitBreaker = new ImportFailureCircuitBreaker ( onCriticalError ) ;
23+ this . failedAuditStorage = failedAuditStorage ;
24+ this . metrics = metrics ;
2325
24- public AuditIngestionFaultPolicy ( IFailedAuditStorage failedAuditStorage , LoggingSettings settings , Func < string , Exception , Task > onCriticalError )
26+ if ( ! AppEnvironment . RunningInContainer )
2527 {
26- failureCircuitBreaker = new ImportFailureCircuitBreaker ( onCriticalError ) ;
27- this . failedAuditStorage = failedAuditStorage ;
28-
29- if ( ! AppEnvironment . RunningInContainer )
30- {
31- logPath = Path . Combine ( settings . LogPath , @"FailedImports\Audit" ) ;
32- Directory . CreateDirectory ( logPath ) ;
33- }
28+ logPath = Path . Combine ( settings . LogPath , @"FailedImports\Audit" ) ;
29+ Directory . CreateDirectory ( logPath ) ;
3430 }
31+ }
3532
36- public async Task < ErrorHandleResult > OnError ( ErrorContext errorContext , CancellationToken cancellationToken = default )
37- {
38- var tags = Telemetry . GetIngestedMessageTags ( errorContext . Message . Headers , errorContext . Message . Body ) ;
33+ public async Task < ErrorHandleResult > OnError ( ErrorContext errorContext , CancellationToken cancellationToken = default )
34+ {
35+ using var errorMetrics = metrics . BeginErrorHandling ( errorContext ) ;
3936
40- //Same as recoverability policy in NServiceBusFactory
41- if ( errorContext . ImmediateProcessingFailures < 3 )
42- {
43- retryCounter . Add ( 1 , tags ) ;
44- return ErrorHandleResult . RetryRequired ;
45- }
37+ //Same as recoverability policy in NServiceBusFactory
38+ if ( errorContext . ImmediateProcessingFailures < 3 )
39+ {
40+ errorMetrics . Retry ( ) ;
41+ return ErrorHandleResult . RetryRequired ;
42+ }
4643
47- await StoreFailedMessageDocument ( errorContext , cancellationToken ) ;
44+ await StoreFailedMessageDocument ( errorContext , cancellationToken ) ;
4845
49- failedCounter . Add ( 1 , tags ) ;
46+ // failedCounter.Add(1, tags);
5047
51- return ErrorHandleResult . Handled ;
52- }
48+ return ErrorHandleResult . Handled ;
49+ }
5350
54- async Task StoreFailedMessageDocument ( ErrorContext errorContext , CancellationToken cancellationToken )
51+ async Task StoreFailedMessageDocument ( ErrorContext errorContext , CancellationToken cancellationToken )
52+ {
53+ var failure = new FailedAuditImport
5554 {
56- var failure = new FailedAuditImport
57- {
58- Id = Guid . NewGuid ( ) . ToString ( ) ,
59- Message = new FailedTransportMessage
60- {
61- Id = errorContext . Message . MessageId ,
62- Headers = errorContext . Message . Headers ,
63- // At the moment we are taking a defensive copy of the body to avoid issues with the message body
64- // buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB
65- // handles byte[] to ReadOnlyMemory<byte> conversion we might be able to remove this.
66- Body = errorContext . Message . Body . ToArray ( )
67- } ,
68- ExceptionInfo = errorContext . Exception . ToFriendlyString ( )
69- } ;
70-
71- try
72- {
73- await DoLogging ( errorContext . Exception , failure , cancellationToken ) ;
74- }
75- finally
55+ Id = Guid . NewGuid ( ) . ToString ( ) ,
56+ Message = new FailedTransportMessage
7657 {
77- failureCircuitBreaker . Increment ( errorContext . Exception ) ;
78- }
58+ Id = errorContext . Message . MessageId ,
59+ Headers = errorContext . Message . Headers ,
60+ // At the moment we are taking a defensive copy of the body to avoid issues with the message body
61+ // buffers being returned to the pool and potentially being overwritten. Once we know how RavenDB
62+ // handles byte[] to ReadOnlyMemory<byte> conversion we might be able to remove this.
63+ Body = errorContext . Message . Body . ToArray ( )
64+ } ,
65+ ExceptionInfo = errorContext . Exception . ToFriendlyString ( )
66+ } ;
67+
68+ try
69+ {
70+ await DoLogging ( errorContext . Exception , failure , cancellationToken ) ;
7971 }
80-
81- async Task DoLogging ( Exception exception , FailedAuditImport failure , CancellationToken cancellationToken )
72+ finally
8273 {
83- log . Error ( "Failed importing error message" , exception ) ;
74+ failureCircuitBreaker . Increment ( errorContext . Exception ) ;
75+ }
76+ }
77+
78+ async Task DoLogging ( Exception exception , FailedAuditImport failure , CancellationToken cancellationToken )
79+ {
80+ log . Error ( "Failed importing error message" , exception ) ;
8481
85- // Write to storage
86- await failedAuditStorage . SaveFailedAuditImport ( failure ) ;
82+ // Write to storage
83+ await failedAuditStorage . SaveFailedAuditImport ( failure ) ;
8784
88- if ( ! AppEnvironment . RunningInContainer )
85+ if ( ! AppEnvironment . RunningInContainer )
86+ {
87+ // Write to Log Path
88+ var filePath = Path . Combine ( logPath , failure . Id + ".txt" ) ;
89+ await File . WriteAllTextAsync ( filePath , failure . ExceptionInfo , cancellationToken ) ;
90+
91+ if ( RuntimeInformation . IsOSPlatform ( OSPlatform . Windows ) )
8992 {
90- // Write to Log Path
91- var filePath = Path . Combine ( logPath , failure . Id + ".txt" ) ;
92- await File . WriteAllTextAsync ( filePath , failure . ExceptionInfo , cancellationToken ) ;
93-
94- if ( RuntimeInformation . IsOSPlatform ( OSPlatform . Windows ) )
95- {
96- WriteToEventLog ( "A message import has failed. A log file has been written to " + filePath ) ;
97- }
93+ WriteToEventLog ( "A message import has failed. A log file has been written to " + filePath ) ;
9894 }
9995 }
96+ }
10097
101- [ SupportedOSPlatform ( "windows" ) ]
102- void WriteToEventLog ( string message )
103- {
98+ [ SupportedOSPlatform ( "windows" ) ]
99+ void WriteToEventLog ( string message )
100+ {
104101#if DEBUG
105- EventSourceCreator . Create ( ) ;
102+ EventSourceCreator . Create ( ) ;
106103#endif
107- EventLog . WriteEntry ( EventSourceCreator . SourceName , message , EventLogEntryType . Error ) ;
108- }
104+ EventLog . WriteEntry ( EventSourceCreator . SourceName , message , EventLogEntryType . Error ) ;
105+ }
109106
110- readonly Counter < long > retryCounter = Telemetry . Meter . CreateCounter < long > ( Telemetry . CreateInstrumentName ( "ingestion" , "retry" ) , description : "Audit ingestion retries count" ) ;
111- readonly Counter < long > failedCounter = Telemetry . Meter . CreateCounter < long > ( Telemetry . CreateInstrumentName ( "ingestion" , "failed" ) , description : "Audit ingestion failure count" ) ;
107+ readonly IFailedAuditStorage failedAuditStorage ;
108+ readonly IngestionMetrics metrics ;
109+ readonly string logPath ;
110+ readonly ImportFailureCircuitBreaker failureCircuitBreaker ;
112111
113- static readonly ILog log = LogManager . GetLogger < AuditIngestionFaultPolicy > ( ) ;
114- }
112+ static readonly ILog log = LogManager . GetLogger < AuditIngestionFaultPolicy > ( ) ;
115113}
0 commit comments