@@ -206,49 +206,59 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
206206
207207 async Task Loop ( )
208208 {
209- var contexts = new List < MessageContext > ( transportSettings . MaxConcurrency . Value ) ;
210-
211- while ( await channel . Reader . WaitToReadAsync ( ) )
209+ try
212210 {
213- // will only enter here if there is something to read.
214- try
211+ var contexts = new List < MessageContext > ( transportSettings . MaxConcurrency . Value ) ;
212+
213+ while ( await channel . Reader . WaitToReadAsync ( ) )
215214 {
216- // as long as there is something to read this will fetch up to MaximumConcurrency items
217- while ( channel . Reader . TryRead ( out var context ) )
215+ // will only enter here if there is something to read.
216+ try
218217 {
219- contexts . Add ( context ) ;
218+ // as long as there is something to read this will fetch up to MaximumConcurrency items
219+ while ( channel . Reader . TryRead ( out var context ) )
220+ {
221+ contexts . Add ( context ) ;
222+ }
223+
224+ batchSizeMeter . Mark ( contexts . Count ) ;
225+ using ( batchDurationMeter . Measure ( ) )
226+ {
227+ await auditIngestor . Ingest ( contexts ) ;
228+ }
220229 }
221-
222- batchSizeMeter . Mark ( contexts . Count ) ;
223- using ( batchDurationMeter . Measure ( ) )
230+ catch ( OperationCanceledException e )
224231 {
225- await auditIngestor . Ingest ( contexts ) ;
232+ logger . Info ( "Ingesting messages failed" , e ) ;
233+ // continue loop, do nothing as we are shutting down
234+ // TODO: Assumption here is that OCE equals a shutdown which is definitely not the case.
226235 }
227- }
228- catch ( OperationCanceledException )
229- {
230- //Do nothing as we are shutting down
231- continue ;
232- }
233- catch ( Exception e ) // show must go on
234- {
235- if ( logger . IsInfoEnabled )
236+ catch ( Exception e ) // show must go on
236237 {
237- logger . Info ( "Ingesting messages failed" , e ) ;
238+ logger . Warn ( "Ingesting messages failed" , e ) ;
239+
240+ // signal all message handling tasks to terminate
241+ foreach ( var context in contexts )
242+ {
243+ if ( ! context . GetTaskCompletionSource ( ) . TrySetException ( e ) )
244+ {
245+ logger . Error ( "Loop TrySetException failed" ) ;
246+ }
247+ }
238248 }
239-
240- // signal all message handling tasks to terminate
241- foreach ( var context in contexts )
249+ finally
242250 {
243- context . GetTaskCompletionSource ( ) . TrySetException ( e ) ;
251+ contexts . Clear ( ) ;
244252 }
245253 }
246- finally
247- {
248- contexts . Clear ( ) ;
249- }
254+ // will fall out here when writer is completed
255+ }
256+ catch ( Exception e )
257+ {
258+ logger . Fatal ( "Loop interrupted" , e ) ;
259+ applicationLifetime . StopApplication ( ) ;
260+ throw ;
250261 }
251- // will fall out here when writer is completed
252262 }
253263
254264 TransportInfrastructure transportInfrastructure ;
0 commit comments