@@ -182,19 +182,22 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
182182
183183 async Task OnMessage ( MessageContext messageContext , CancellationToken cancellationToken )
184184 {
185- if ( settings . MessageFilter != null && settings . MessageFilter ( messageContext ) )
185+ using ( new DurationRecorder ( ingestionDuration ) )
186186 {
187- return ;
188- }
187+ if ( settings . MessageFilter != null && settings . MessageFilter ( messageContext ) )
188+ {
189+ return ;
190+ }
189191
190- var taskCompletionSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
191- messageContext . SetTaskCompletionSource ( taskCompletionSource ) ;
192+ var taskCompletionSource = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
193+ messageContext . SetTaskCompletionSource ( taskCompletionSource ) ;
192194
193- await channel . Writer . WriteAsync ( messageContext , cancellationToken ) ;
194- await taskCompletionSource . Task ;
195+ await channel . Writer . WriteAsync ( messageContext , cancellationToken ) ;
196+ await taskCompletionSource . Task ;
195197
196- ingestedMessagesCounter . Add ( 1 ) ;
197- messageSize . Record ( messageContext . Body . Length / 1024.0 ) ;
198+ ingestedMessagesCounter . Add ( 1 ) ;
199+ messageSize . Record ( messageContext . Body . Length / 1024.0 ) ;
200+ }
198201 }
199202
200203 async Task Loop ( )
@@ -261,6 +264,7 @@ async Task Loop()
261264 readonly Histogram < double > auditBatchDuration = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion." , "batch_duration" ) , unit : "ms" ) ;
262265 readonly Histogram < double > messageSize = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion." , "message_size" ) , unit : "kilobytes" ) ;
263266 readonly Counter < long > ingestedMessagesCounter = Telemetry . Meter . CreateCounter < long > ( Telemetry . CreateInstrumentName ( "ingestion." , "count" ) ) ;
267+ readonly Histogram < double > ingestionDuration = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion." , "duration" ) , unit : "ms" ) ;
264268 readonly Watchdog watchdog ;
265269 readonly Task ingestionWorker ;
266270 readonly IHostApplicationLifetime applicationLifetime ;
0 commit comments