22{
33 using System ;
44 using System . Collections . Generic ;
5- using System . Diagnostics . Metrics ;
65 using System . Threading ;
76 using System . Threading . Channels ;
87 using System . Threading . Tasks ;
98 using Infrastructure . Settings ;
9+ using Metrics ;
1010 using Microsoft . Extensions . Hosting ;
1111 using NServiceBus ;
1212 using NServiceBus . Logging ;
@@ -26,7 +26,8 @@ public AuditIngestion(
2626 AuditIngestionCustomCheck . State ingestionState ,
2727 AuditIngestor auditIngestor ,
2828 IAuditIngestionUnitOfWorkFactory unitOfWorkFactory ,
29- IHostApplicationLifetime applicationLifetime )
29+ IHostApplicationLifetime applicationLifetime ,
30+ AuditIngestionMetrics metrics )
3031 {
3132 inputEndpoint = settings . AuditQueue ;
3233 this . transportCustomization = transportCustomization ;
@@ -35,13 +36,16 @@ public AuditIngestion(
3536 this . unitOfWorkFactory = unitOfWorkFactory ;
3637 this . settings = settings ;
3738 this . applicationLifetime = applicationLifetime ;
39+ this . metrics = metrics ;
3840
3941 if ( ! transportSettings . MaxConcurrency . HasValue )
4042 {
4143 throw new ArgumentException ( "MaxConcurrency is not set in TransportSettings" ) ;
4244 }
4345
44- channel = Channel . CreateBounded < MessageContext > ( new BoundedChannelOptions ( transportSettings . MaxConcurrency . Value )
46+ MaxBatchSize = transportSettings . MaxConcurrency . Value ;
47+
48+ channel = Channel . CreateBounded < MessageContext > ( new BoundedChannelOptions ( MaxBatchSize )
4549 {
4650 SingleReader = true ,
4751 SingleWriter = false ,
@@ -190,22 +194,21 @@ async Task EnsureStopped(CancellationToken cancellationToken)
190194
191195 async Task OnMessage ( MessageContext messageContext , CancellationToken cancellationToken )
192196 {
193- var tags = Telemetry . GetIngestedMessageTags ( messageContext . Headers , messageContext . Body ) ;
194- using ( new DurationRecorder ( ingestionDuration , tags ) )
197+ using var messageIngestionMetrics = metrics . BeginIngestion ( messageContext ) ;
198+
199+ if ( settings . MessageFilter != null && settings . MessageFilter ( messageContext ) )
195200 {
196- if ( settings . MessageFilter != null && settings . MessageFilter ( messageContext ) )
197- {
198- return ;
199- }
201+ messageIngestionMetrics . Skipped ( ) ;
202+ return ;
203+ }
200204
201- var taskCompletionSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
202- messageContext . SetTaskCompletionSource ( taskCompletionSource ) ;
205+ var taskCompletionSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
206+ messageContext . SetTaskCompletionSource ( taskCompletionSource ) ;
203207
204- await channel . Writer . WriteAsync ( messageContext , cancellationToken ) ;
205- await taskCompletionSource . Task ;
208+ await channel . Writer . WriteAsync ( messageContext , cancellationToken ) ;
209+ _ = await taskCompletionSource . Task ;
206210
207- successfulMessagesCounter . Add ( 1 , tags ) ;
208- }
211+ messageIngestionMetrics . Success ( ) ;
209212 }
210213
211214 public override async Task StartAsync ( CancellationToken cancellationToken )
@@ -218,27 +221,27 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
218221 {
219222 try
220223 {
221- var contexts = new List < MessageContext > ( transportSettings . MaxConcurrency . Value ) ;
224+ var contexts = new List < MessageContext > ( MaxBatchSize ) ;
222225
223226 while ( await channel . Reader . WaitToReadAsync ( stoppingToken ) )
224227 {
225228 // will only enter here if there is something to read.
226229 try
227230 {
228231 // as long as there is something to read this will fetch up to MaximumConcurrency items
229- using ( var recorder = new DurationRecorder ( batchDuration ) )
232+ using ( var batchMetrics = metrics . BeginBatch ( MaxBatchSize ) )
230233 {
231234 while ( channel . Reader . TryRead ( out var context ) )
232235 {
233236 contexts . Add ( context ) ;
234237 }
235238
236- recorder . Tags . Add ( "ingestion.batch_size" , contexts . Count ) ;
237-
238239 await auditIngestor . Ingest ( contexts ) ;
240+
241+ batchMetrics . Complete ( contexts . Count ) ;
239242 }
240243
241- consecutiveBatchFailuresCounter . Record ( 0 ) ;
244+ //metrics.ClearB .Record(0);
242245 }
243246 catch ( Exception e )
244247 {
@@ -257,7 +260,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
257260 logger . Info ( "Ingesting messages failed" , e ) ;
258261
259262 // no need to do interlocked increment since this is running sequential
260- consecutiveBatchFailuresCounter . Record ( consecutiveBatchFailures ++ ) ;
263+ // consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++);
261264 }
262265 finally
263266 {
@@ -297,9 +300,9 @@ public override async Task StopAsync(CancellationToken cancellationToken)
297300 }
298301
299302 TransportInfrastructure transportInfrastructure ;
300- IMessageReceiver messageReceiver ;
301- long consecutiveBatchFailures = 0 ;
303+ IMessageReceiver queueIngestor ;
302304
305+ readonly int MaxBatchSize ;
303306 readonly SemaphoreSlim startStopSemaphore = new ( 1 ) ;
304307 readonly string inputEndpoint ;
305308 readonly ITransportCustomization transportCustomization ;
@@ -309,12 +312,9 @@ public override async Task StopAsync(CancellationToken cancellationToken)
309312 readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory ;
310313 readonly Settings settings ;
311314 readonly Channel < MessageContext > channel ;
312- readonly Histogram < double > batchDuration = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion" , "batch_duration" ) , unit : "ms" , "Average audit message batch processing duration" ) ;
313- readonly Counter < long > successfulMessagesCounter = Telemetry . Meter . CreateCounter < long > ( Telemetry . CreateInstrumentName ( "ingestion" , "success" ) , description : "Successful ingested audit message count" ) ;
314- readonly Histogram < long > consecutiveBatchFailuresCounter = Telemetry . Meter . CreateHistogram < long > ( Telemetry . CreateInstrumentName ( "ingestion" , "consecutive_batch_failures" ) , unit : "count" , description : "Consecutive audit ingestion batch failure" ) ;
315- readonly Histogram < double > ingestionDuration = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion" , "duration" ) , unit : "ms" , description : "Average incoming audit message processing duration" ) ;
316315 readonly Watchdog watchdog ;
317316 readonly IHostApplicationLifetime applicationLifetime ;
317+ readonly AuditIngestionMetrics metrics ;
318318
319319 static readonly ILog logger = LogManager . GetLogger < AuditIngestion > ( ) ;
320320
0 commit comments