22{
33 using System ;
44 using System . Collections . Generic ;
5- using System . Diagnostics . Metrics ;
65 using System . Linq ;
76 using System . Threading . Tasks ;
87 using Infrastructure . Settings ;
1413 using Persistence . UnitOfWork ;
1514 using Recoverability ;
1615 using SagaAudit ;
17- using ServiceControl . Infrastructure ;
18- using ServiceControl . Transports ;
16+ using Transports ;
1917
2018 public class AuditIngestor
2119 {
@@ -26,11 +24,13 @@ public AuditIngestor(
2624 IEnumerable < IEnrichImportedAuditMessages > auditEnrichers , // allows extending message enrichers with custom enrichers registered in the DI container
2725 IMessageSession messageSession ,
2826 Lazy < IMessageDispatcher > messageDispatcher ,
29- ITransportCustomization transportCustomization
27+ ITransportCustomization transportCustomization ,
28+ AuditIngestionMetrics metrics
3029 )
3130 {
3231 this . settings = settings ;
3332 this . messageDispatcher = messageDispatcher ;
33+ this . metrics = metrics ;
3434 var enrichers = new IEnrichImportedAuditMessages [ ] { new MessageTypeEnricher ( ) , new EnrichWithTrackingIds ( ) , new ProcessingStatisticsEnricher ( ) , new DetectNewEndpointsFromAuditImportsEnricher ( endpointInstanceMonitoring ) , new DetectSuccessfulRetriesEnricher ( ) , new SagaRelationshipsEnricher ( ) } . Concat ( auditEnrichers ) . ToArray ( ) ;
3535
3636 logQueueAddress = transportCustomization . ToTransportQualifiedQueueName ( settings . AuditLogQueue ) ;
@@ -52,7 +52,7 @@ public async Task Ingest(List<MessageContext> contexts)
5252 if ( settings . ForwardAuditMessages )
5353 {
5454 await Forward ( stored , logQueueAddress ) ;
55- forwardedMessagesCounter . Add ( stored . Count ) ;
55+ metrics . IncrementMessagesForwarded ( stored . Count ) ;
5656 }
5757
5858 foreach ( var context in contexts )
@@ -131,8 +131,8 @@ public async Task VerifyCanReachForwardingAddress()
131131 readonly AuditPersister auditPersister ;
132132 readonly Settings settings ;
133133 readonly Lazy < IMessageDispatcher > messageDispatcher ;
134+ readonly AuditIngestionMetrics metrics ;
134135 readonly string logQueueAddress ;
135- readonly Counter < long > forwardedMessagesCounter = Telemetry . Meter . CreateCounter < long > ( Telemetry . CreateInstrumentName ( "ingestion" , "forwarded" ) , description : "Audit ingestion forwarded message count" ) ;
136136
137137 static readonly ILog Log = LogManager . GetLogger < AuditIngestor > ( ) ;
138138 }
0 commit comments