@@ -183,101 +183,128 @@ private static async Task ExecuteStepsAsTaskDag(
183183 // Validate no cycles exist in the dependency graph
184184 ValidateDependencyGraph ( steps , stepsByName ) ;
185185
186- // Create a TaskCompletionSource for each step
187- var stepCompletions = new Dictionary < string , TaskCompletionSource > ( steps . Count , StringComparer . Ordinal ) ;
188- foreach ( var step in steps )
189- {
190- stepCompletions [ step . Name ] = new TaskCompletionSource ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
191- }
186+ // Create a linked CancellationTokenSource that will be cancelled when any step fails
187+ // or when the original context token is cancelled
188+ using var linkedCts = CancellationTokenSource . CreateLinkedTokenSource ( context . CancellationToken ) ;
189+
190+ // Store the original token and set the linked token on the context
191+ var originalToken = context . CancellationToken ;
192+ context . CancellationToken = linkedCts . Token ;
192193
193- // Execute a step after its dependencies complete
194- async Task ExecuteStepWithDependencies ( PipelineStep step )
194+ try
195195 {
196- var stepTcs = stepCompletions [ step . Name ] ;
196+ // Create a TaskCompletionSource for each step
197+ var stepCompletions = new Dictionary < string , TaskCompletionSource > ( steps . Count , StringComparer . Ordinal ) ;
198+ foreach ( var step in steps )
199+ {
200+ stepCompletions [ step . Name ] = new TaskCompletionSource ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
201+ }
197202
198- // Wait for all dependencies to complete (will throw if any dependency failed)
199- if ( step . DependsOnSteps . Count > 0 )
203+ // Execute a step after its dependencies complete
204+ async Task ExecuteStepWithDependencies ( PipelineStep step )
200205 {
206+ var stepTcs = stepCompletions [ step . Name ] ;
207+
208+ // Wait for all dependencies to complete (will throw if any dependency failed)
209+ if ( step . DependsOnSteps . Count > 0 )
210+ {
211+ try
212+ {
213+ var depTasks = step . DependsOnSteps . Select ( depName => stepCompletions [ depName ] . Task ) ;
214+ await Task . WhenAll ( depTasks ) . ConfigureAwait ( false ) ;
215+ }
216+ catch ( Exception ex )
217+ {
218+ // Find all dependencies that failed
219+ var failedDeps = step . DependsOnSteps
220+ . Where ( depName => stepCompletions [ depName ] . Task . IsFaulted )
221+ . ToList ( ) ;
222+
223+ var message = failedDeps . Count > 0
224+ ? $ "Step '{ step . Name } ' cannot run because { ( failedDeps . Count == 1 ? "dependency" : "dependencies" ) } { string . Join ( ", " , failedDeps . Select ( d => $ "'{ d } '") ) } failed"
225+ : $ "Step '{ step . Name } ' cannot run because a dependency failed";
226+
227+ // Wrap the dependency failure with context about this step
228+ var wrappedException = new InvalidOperationException ( message , ex ) ;
229+ stepTcs . TrySetException ( wrappedException ) ;
230+ return ;
231+ }
232+ }
233+
201234 try
202235 {
203- var depTasks = step . DependsOnSteps . Select ( depName => stepCompletions [ depName ] . Task ) ;
204- await Task . WhenAll ( depTasks ) . ConfigureAwait ( false ) ;
236+ await ExecuteStepAsync ( step , context ) . ConfigureAwait ( false ) ;
237+
238+ stepTcs . TrySetResult ( ) ;
205239 }
206240 catch ( Exception ex )
207241 {
208- // Find all dependencies that failed
209- var failedDeps = step . DependsOnSteps
210- . Where ( depName => stepCompletions [ depName ] . Task . IsFaulted )
211- . ToList ( ) ;
242+ // Execution failure - mark as failed, cancel all other work, and re-throw
243+ stepTcs . TrySetException ( ex ) ;
212244
213- var message = failedDeps . Count > 0
214- ? $ "Step '{ step . Name } ' cannot run because { ( failedDeps . Count == 1 ? "dependency" : "dependencies" ) } { string . Join ( ", " , failedDeps . Select ( d => $ "'{ d } '") ) } failed"
215- : $ "Step '{ step . Name } ' cannot run because a dependency failed";
245+ // Cancel all remaining work
246+ try
247+ {
248+ linkedCts . Cancel ( ) ;
249+ }
250+ catch ( ObjectDisposedException )
251+ {
252+ // Ignore cancellation errors
253+ }
216254
217- // Wrap the dependency failure with context about this step
218- var wrappedException = new InvalidOperationException ( message , ex ) ;
219- stepTcs . TrySetException ( wrappedException ) ;
220- return ;
255+ throw ;
221256 }
222257 }
223258
224- try
259+ // Start all steps (they'll wait on their dependencies internally)
260+ var allStepTasks = new Task [ steps . Count ] ;
261+ for ( var i = 0 ; i < steps . Count ; i ++ )
225262 {
226- await ExecuteStepAsync ( step , context ) . ConfigureAwait ( false ) ;
227-
228- stepTcs . TrySetResult ( ) ;
263+ var step = steps [ i ] ;
264+ allStepTasks [ i ] = Task . Run ( ( ) => ExecuteStepWithDependencies ( step ) ) ;
229265 }
230- catch ( Exception ex )
266+
267+ // Wait for all steps to complete (or fail)
268+ try
231269 {
232- // Execution failure - mark as failed and re-throw so it's counted
233- stepTcs . TrySetException ( ex ) ;
234- throw ;
270+ await Task . WhenAll ( allStepTasks ) . ConfigureAwait ( false ) ;
235271 }
236- }
237-
238- // Start all steps (they'll wait on their dependencies internally)
239- var allStepTasks = new Task [ steps . Count ] ;
240- for ( var i = 0 ; i < steps . Count ; i ++ )
241- {
242- var step = steps [ i ] ;
243- allStepTasks [ i ] = Task . Run ( ( ) => ExecuteStepWithDependencies ( step ) ) ;
244- }
245-
246- // Wait for all steps to complete (or fail)
247- try
248- {
249- await Task . WhenAll ( allStepTasks ) . ConfigureAwait ( false ) ;
250- }
251- catch
252- {
253- // Collect all failed steps and their names
254- var failures = allStepTasks
255- . Where ( t => t . IsFaulted )
256- . Select ( t => t . Exception ! )
257- . SelectMany ( ae => ae . InnerExceptions )
258- . ToList ( ) ;
259-
260- if ( failures . Count > 1 )
272+ catch
261273 {
262- // Match failures to steps to get their names
263- var failedStepNames = new List < string > ( ) ;
264- for ( var i = 0 ; i < allStepTasks . Length ; i ++ )
274+ // Collect all failed steps and their names
275+ var failures = allStepTasks
276+ . Where ( t => t . IsFaulted )
277+ . Select ( t => t . Exception ! )
278+ . SelectMany ( ae => ae . InnerExceptions )
279+ . ToList ( ) ;
280+
281+ if ( failures . Count > 1 )
265282 {
266- if ( allStepTasks [ i ] . IsFaulted )
283+ // Match failures to steps to get their names
284+ var failedStepNames = new List < string > ( ) ;
285+ for ( var i = 0 ; i < allStepTasks . Length ; i ++ )
267286 {
268- failedStepNames . Add ( steps [ i ] . Name ) ;
287+ if ( allStepTasks [ i ] . IsFaulted )
288+ {
289+ failedStepNames . Add ( steps [ i ] . Name ) ;
290+ }
269291 }
270- }
271292
272- var message = failedStepNames . Count > 0
273- ? $ "Multiple pipeline steps failed: { string . Join ( ", " , failedStepNames . Distinct ( ) ) } "
274- : "Multiple pipeline steps failed." ;
293+ var message = failedStepNames . Count > 0
294+ ? $ "Multiple pipeline steps failed: { string . Join ( ", " , failedStepNames . Distinct ( ) ) } "
295+ : "Multiple pipeline steps failed." ;
275296
276- throw new AggregateException ( message , failures ) ;
277- }
297+ throw new AggregateException ( message , failures ) ;
298+ }
278299
279- // Single failure - just rethrow
280- throw ;
300+ // Single failure - just rethrow
301+ throw ;
302+ }
303+ }
304+ finally
305+ {
306+ // Restore the original token
307+ context . CancellationToken = originalToken ;
281308 }
282309 }
283310
0 commit comments