@@ -78,65 +78,97 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
7878
7979 if ( ! unitOfWorkFactory . CanIngestMore ( ) )
8080 {
81- if ( queueIngestor != null )
82- {
83- var stoppable = queueIngestor ;
84- queueIngestor = null ;
85- logger . Info ( "Shutting down due to failed persistence health check. Infrastructure shut down commencing" ) ;
86- await stoppable . StopReceive ( cancellationToken ) ;
87- logger . Info ( "Shutting down due to failed persistence health check. Infrastructure shut down completed" ) ;
88- }
89-
81+ await StopAndTeardownInfrastructure ( cancellationToken ) ;
9082 return ;
9183 }
9284
93- if ( queueIngestor != null )
85+ await SetUpAndStartInfrastructure ( cancellationToken ) ;
86+ }
87+ catch ( Exception e )
88+ {
89+ try
9490 {
95- logger . Debug ( "Ensure started. Already started, skipping start up" ) ;
96- return ; //Already started
91+ await StopAndTeardownInfrastructure ( cancellationToken ) ;
9792 }
93+ catch ( Exception teardownException )
94+ {
95+ throw new AggregateException ( e , teardownException ) ;
96+ }
97+
98+ throw ;
99+ }
100+ finally
101+ {
102+ logger . Debug ( "Ensure started. Start/stop semaphore releasing" ) ;
103+ startStopSemaphore . Release ( ) ;
104+ logger . Debug ( "Ensure started. Start/stop semaphore released" ) ;
105+ }
106+ }
98107
99- logger . Info ( "Ensure started. Infrastructure starting" ) ;
108+ async Task SetUpAndStartInfrastructure ( CancellationToken cancellationToken )
109+ {
110+ if ( queueIngestor != null )
111+ {
112+ logger . Debug ( "Infrastructure already Started" ) ;
113+ return ;
114+ }
100115
116+ try
117+ {
118+ logger . Info ( "Starting infrastructure" ) ;
101119 transportInfrastructure = await transportCustomization . CreateTransportInfrastructure (
102120 inputEndpoint ,
103121 transportSettings ,
104122 OnMessage ,
105123 errorHandlingPolicy . OnError ,
106124 OnCriticalError ,
107- TransportTransactionMode . ReceiveOnly ) ;
125+ TransportTransactionMode . ReceiveOnly
126+ ) ;
108127
109128 queueIngestor = transportInfrastructure . Receivers [ inputEndpoint ] ;
110129
111130 await auditIngestor . VerifyCanReachForwardingAddress ( ) ;
112-
113131 await queueIngestor . StartReceive ( cancellationToken ) ;
114132
115- logger . Info ( "Ensure started. Infrastructure started " ) ;
133+ logger . Info ( "Started infrastructure " ) ;
116134 }
117- catch
135+ catch ( Exception e )
118136 {
119- if ( queueIngestor != null )
137+ logger . Error ( "Failed to start infrastructure" , e ) ;
138+ throw ;
139+ }
140+ }
141+
142+ async Task StopAndTeardownInfrastructure ( CancellationToken cancellationToken )
143+ {
144+ if ( transportInfrastructure == null )
145+ {
146+ logger . Debug ( "Infrastructure already Stopped" ) ;
147+ return ;
148+ }
149+
150+ try
151+ {
152+ logger . Info ( "Stopping infrastructure" ) ;
153+ try
120154 {
121- try
155+ if ( queueIngestor != null )
122156 {
123157 await queueIngestor . StopReceive ( cancellationToken ) ;
124158 }
125- catch ( OperationCanceledException e ) when ( e . CancellationToken == cancellationToken )
126- {
127- logger . Info ( "StopReceive cancelled" ) ;
128- }
159+ }
160+ finally
161+ {
162+ await transportInfrastructure . Shutdown ( cancellationToken ) ;
129163 }
130164
131- queueIngestor = null ; // Setting to null so that it doesn't exit when it retries in line 185
132-
133- throw ;
165+ queueIngestor = null ;
166+ logger . Info ( "Stopped infrastructure" ) ;
134167 }
135- finally
168+ catch ( Exception e )
136169 {
137- logger . Debug ( "Ensure started. Start/stop semaphore releasing" ) ;
138- startStopSemaphore . Release ( ) ;
139- logger . Debug ( "Ensure started. Start/stop semaphore released" ) ;
170+ logger . Error ( "Failed to stop infrastructure" , e ) ;
171+ throw ;
140172 }
141173 }
142174
0 commit comments