@@ -584,28 +584,43 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie
584
584
throw new InvalidOperationException ( "recordedEntitiesSemaphore must be held" ) ;
585
585
}
586
586
587
- var recoveredChannels = new List < AutorecoveringChannel > ( ) ;
587
+ var channelsToRecover = new List < AutorecoveringChannel > ( ) ;
588
588
await _channelsSemaphore . WaitAsync ( cancellationToken )
589
589
. ConfigureAwait ( false ) ;
590
590
try
591
591
{
592
- foreach ( AutorecoveringChannel channel in _channels )
592
+ channelsToRecover . AddRange ( _channels ) ;
593
+ }
594
+ finally
595
+ {
596
+ _channelsSemaphore . Release ( ) ;
597
+ }
598
+
599
+ var notRecoveredChannels = new List < AutorecoveringChannel > ( ) ;
600
+ foreach ( AutorecoveringChannel channel in channelsToRecover )
601
+ {
602
+ bool recovered = await channel . AutomaticallyRecoverAsync ( this , _config . TopologyRecoveryEnabled ,
603
+ recordedEntitiesSemaphoreHeld : recordedEntitiesSemaphoreHeld ,
604
+ cancellationToken : cancellationToken )
605
+ . ConfigureAwait ( false ) ;
606
+
607
+ if ( false == recovered )
593
608
{
594
- bool recovered = await channel . AutomaticallyRecoverAsync ( this , _config . TopologyRecoveryEnabled ,
595
- recordedEntitiesSemaphoreHeld : recordedEntitiesSemaphoreHeld ,
596
- cancellationToken : cancellationToken )
597
- . ConfigureAwait ( false ) ;
609
+ notRecoveredChannels . Add ( channel ) ;
610
+ }
611
+ }
598
612
599
- if ( recovered )
600
- {
601
- recoveredChannels . Add ( channel ) ;
602
- }
613
+ await _channelsSemaphore . WaitAsync ( cancellationToken )
614
+ . ConfigureAwait ( false ) ;
615
+ try
616
+ {
617
+ foreach ( AutorecoveringChannel channel in notRecoveredChannels )
618
+ {
619
+ _channels . Remove ( channel ) ;
603
620
}
604
621
}
605
622
finally
606
623
{
607
- _channels . Clear ( ) ;
608
- _channels = recoveredChannels ;
609
624
_channelsSemaphore . Release ( ) ;
610
625
}
611
626
}
0 commit comments