22{
33 using System ;
44 using System . Collections . Generic ;
5- using System . Diagnostics . Metrics ;
65 using System . Threading ;
76 using System . Threading . Channels ;
87 using System . Threading . Tasks ;
98 using Infrastructure . Settings ;
9+ using Metrics ;
1010 using Microsoft . Extensions . Hosting ;
1111 using NServiceBus ;
1212 using NServiceBus . Logging ;
@@ -26,7 +26,8 @@ public AuditIngestion(
2626 AuditIngestionCustomCheck . State ingestionState ,
2727 AuditIngestor auditIngestor ,
2828 IAuditIngestionUnitOfWorkFactory unitOfWorkFactory ,
29- IHostApplicationLifetime applicationLifetime )
29+ IHostApplicationLifetime applicationLifetime ,
30+ IngestionMetrics metrics )
3031 {
3132 inputEndpoint = settings . AuditQueue ;
3233 this . transportCustomization = transportCustomization ;
@@ -35,21 +36,24 @@ public AuditIngestion(
3536 this . unitOfWorkFactory = unitOfWorkFactory ;
3637 this . settings = settings ;
3738 this . applicationLifetime = applicationLifetime ;
39+ this . metrics = metrics ;
3840
3941 if ( ! transportSettings . MaxConcurrency . HasValue )
4042 {
4143 throw new ArgumentException ( "MaxConcurrency is not set in TransportSettings" ) ;
4244 }
4345
44- channel = Channel . CreateBounded < MessageContext > ( new BoundedChannelOptions ( transportSettings . MaxConcurrency . Value )
46+ MaxBatchSize = transportSettings . MaxConcurrency . Value ;
47+
48+ channel = Channel . CreateBounded < MessageContext > ( new BoundedChannelOptions ( MaxBatchSize )
4549 {
4650 SingleReader = true ,
4751 SingleWriter = false ,
4852 AllowSynchronousContinuations = false ,
4953 FullMode = BoundedChannelFullMode . Wait
5054 } ) ;
5155
52- errorHandlingPolicy = new AuditIngestionFaultPolicy ( failedImportsStorage , settings . LoggingSettings , OnCriticalError ) ;
56+ errorHandlingPolicy = new AuditIngestionFaultPolicy ( failedImportsStorage , settings . LoggingSettings , OnCriticalError , metrics ) ;
5357
5458 watchdog = new Watchdog (
5559 "audit message ingestion" ,
@@ -190,22 +194,21 @@ async Task EnsureStopped(CancellationToken cancellationToken)
190194
191195 async Task OnMessage ( MessageContext messageContext , CancellationToken cancellationToken )
192196 {
193- var tags = Telemetry . GetIngestedMessageTags ( messageContext . Headers , messageContext . Body ) ;
194- using ( new DurationRecorder ( ingestionDuration , tags ) )
197+ using var messageIngestionMetrics = metrics . BeginIngestion ( messageContext ) ;
198+
199+ if ( settings . MessageFilter != null && settings . MessageFilter ( messageContext ) )
195200 {
196- if ( settings . MessageFilter != null && settings . MessageFilter ( messageContext ) )
197- {
198- return ;
199- }
201+ messageIngestionMetrics . Skipped ( ) ;
202+ return ;
203+ }
200204
201- var taskCompletionSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
202- messageContext . SetTaskCompletionSource ( taskCompletionSource ) ;
205+ var taskCompletionSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
206+ messageContext . SetTaskCompletionSource ( taskCompletionSource ) ;
203207
204- await channel . Writer . WriteAsync ( messageContext , cancellationToken ) ;
205- await taskCompletionSource . Task ;
208+ await channel . Writer . WriteAsync ( messageContext , cancellationToken ) ;
209+ _ = await taskCompletionSource . Task ;
206210
207- successfulMessagesCounter . Add ( 1 , tags ) ;
208- }
211+ messageIngestionMetrics . Success ( ) ;
209212 }
210213
211214 public override async Task StartAsync ( CancellationToken cancellationToken )
@@ -218,27 +221,24 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
218221 {
219222 try
220223 {
221- var contexts = new List < MessageContext > ( transportSettings . MaxConcurrency . Value ) ;
224+ var contexts = new List < MessageContext > ( MaxBatchSize ) ;
222225
223226 while ( await channel . Reader . WaitToReadAsync ( stoppingToken ) )
224227 {
225228 // will only enter here if there is something to read.
226229 try
227230 {
231+ using var batchMetrics = metrics . BeginBatch ( MaxBatchSize ) ;
232+
228233 // as long as there is something to read this will fetch up to MaximumConcurrency items
229- using ( var recorder = new DurationRecorder ( batchDuration ) )
234+ while ( channel . Reader . TryRead ( out var context ) )
230235 {
231- while ( channel . Reader . TryRead ( out var context ) )
232- {
233- contexts . Add ( context ) ;
234- }
235-
236- recorder . Tags . Add ( "ingestion.batch_size" , contexts . Count ) ;
237-
238- await auditIngestor . Ingest ( contexts ) ;
236+ contexts . Add ( context ) ;
239237 }
240238
241- consecutiveBatchFailuresCounter . Record ( 0 ) ;
239+ await auditIngestor . Ingest ( contexts ) ;
240+
241+ batchMetrics . Complete ( contexts . Count ) ;
242242 }
243243 catch ( Exception e )
244244 {
@@ -255,9 +255,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
255255 }
256256
257257 logger . Info ( "Ingesting messages failed" , e ) ;
258-
259- // no need to do interlocked increment since this is running sequential
260- consecutiveBatchFailuresCounter . Record ( consecutiveBatchFailures ++ ) ;
261258 }
262259 finally
263260 {
@@ -298,8 +295,8 @@ public override async Task StopAsync(CancellationToken cancellationToken)
298295
299296 TransportInfrastructure transportInfrastructure ;
300297 IMessageReceiver messageReceiver ;
301- long consecutiveBatchFailures = 0 ;
302298
299+ readonly int MaxBatchSize ;
303300 readonly SemaphoreSlim startStopSemaphore = new ( 1 ) ;
304301 readonly string inputEndpoint ;
305302 readonly ITransportCustomization transportCustomization ;
@@ -309,12 +306,9 @@ public override async Task StopAsync(CancellationToken cancellationToken)
309306 readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory ;
310307 readonly Settings settings ;
311308 readonly Channel < MessageContext > channel ;
312- readonly Histogram < double > batchDuration = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion" , "batch_duration" ) , unit : "ms" , "Average audit message batch processing duration" ) ;
313- readonly Counter < long > successfulMessagesCounter = Telemetry . Meter . CreateCounter < long > ( Telemetry . CreateInstrumentName ( "ingestion" , "success" ) , description : "Successful ingested audit message count" ) ;
314- readonly Histogram < long > consecutiveBatchFailuresCounter = Telemetry . Meter . CreateHistogram < long > ( Telemetry . CreateInstrumentName ( "ingestion" , "consecutive_batch_failures" ) , unit : "count" , description : "Consecutive audit ingestion batch failure" ) ;
315- readonly Histogram < double > ingestionDuration = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion" , "duration" ) , unit : "ms" , description : "Average incoming audit message processing duration" ) ;
316309 readonly Watchdog watchdog ;
317310 readonly IHostApplicationLifetime applicationLifetime ;
311+ readonly IngestionMetrics metrics ;
318312
319313 static readonly ILog logger = LogManager . GetLogger < AuditIngestion > ( ) ;
320314
0 commit comments