@@ -221,6 +221,8 @@ async Task Loop()
221221 {
222222 await auditIngestor . Ingest ( contexts ) ;
223223 }
224+
225+ consecutiveBatchFailuresCounter . Record ( Interlocked . Exchange ( ref consecutiveBatchFailures , 0 ) ) ;
224226 }
225227 catch ( OperationCanceledException )
226228 {
@@ -239,6 +241,8 @@ async Task Loop()
239241 {
240242 context . GetTaskCompletionSource ( ) . TrySetException ( e ) ;
241243 }
244+
245+ consecutiveBatchFailuresCounter . Record ( Interlocked . Increment ( ref consecutiveBatchFailures ) ) ;
242246 }
243247 finally
244248 {
@@ -250,6 +254,7 @@ async Task Loop()
250254
251255 TransportInfrastructure transportInfrastructure ;
252256 IMessageReceiver queueIngestor ;
257+ long consecutiveBatchFailures = 0 ;
253258
254259 readonly SemaphoreSlim startStopSemaphore = new ( 1 ) ;
255260 readonly string inputEndpoint ;
@@ -264,6 +269,7 @@ async Task Loop()
264269 readonly Histogram < double > auditBatchDuration = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion." , "batch_duration" ) , unit : "ms" ) ;
265270 readonly Histogram < double > messageSize = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion." , "message_size" ) , unit : "kilobytes" ) ;
266271 readonly Counter < long > ingestedMessagesCounter = Telemetry . Meter . CreateCounter < long > ( Telemetry . CreateInstrumentName ( "ingestion." , "count" ) ) ;
272+ readonly Histogram < long > consecutiveBatchFailuresCounter = Telemetry . Meter . CreateHistogram < long > ( Telemetry . CreateInstrumentName ( "ingestion." , "consecutive_batch_failures" ) , unit : "count" ) ;
267273 readonly Histogram < double > ingestionDuration = Telemetry . Meter . CreateHistogram < double > ( Telemetry . CreateInstrumentName ( "ingestion." , "duration" ) , unit : "ms" ) ;
268274 readonly Watchdog watchdog ;
269275 readonly Task ingestionWorker ;
0 commit comments