Skip to content

Commit 87b7d15

Browse files
committed
Correctly handle fail-fast
1 parent 00dc0be commit 87b7d15

File tree

6 files changed

+379
-16
lines changed

6 files changed

+379
-16
lines changed

pkg/common/executor.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,87 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor {
131131
}
132132
}
133133

134+
// NewFailFastParallelExecutor creates a parallel executor that respects fail-fast semantics
135+
// When fail-fast is enabled via context, it will cancel remaining work on first error
136+
func NewFailFastParallelExecutor(parallel int, executors ...Executor) Executor {
137+
return func(ctx context.Context) error {
138+
failFast := IsFailFast(ctx)
139+
140+
// If fail-fast is disabled, use the standard parallel executor
141+
if !failFast {
142+
return NewParallelExecutor(parallel, executors...)(ctx)
143+
}
144+
145+
// Fail-fast mode: create a cancellable context for workers
146+
workCtx, cancelWork := context.WithCancel(ctx)
147+
defer cancelWork()
148+
149+
work := make(chan Executor, len(executors))
150+
errs := make(chan error, len(executors))
151+
152+
if 1 > parallel {
153+
log.Debugf("Parallel tasks (%d) below minimum, setting to 1", parallel)
154+
parallel = 1
155+
}
156+
157+
// Start worker goroutines
158+
for i := 0; i < parallel; i++ {
159+
go func(work <-chan Executor, errs chan<- error) {
160+
for executor := range work {
161+
// Check if work context was cancelled (fail-fast triggered)
162+
if workCtx.Err() != nil {
163+
errs <- workCtx.Err()
164+
continue
165+
}
166+
errs <- executor(workCtx)
167+
}
168+
}(work, errs)
169+
}
170+
171+
// Queue work and monitor for failures
172+
go func() {
173+
defer close(work)
174+
for i := 0; i < len(executors); i++ {
175+
// Check if we should stop queuing due to failure
176+
if workCtx.Err() != nil {
177+
// Don't queue remaining work, but send cancelled errors for remaining executors
178+
for j := i; j < len(executors); j++ {
179+
errs <- workCtx.Err()
180+
}
181+
return
182+
}
183+
work <- executors[i]
184+
}
185+
}()
186+
187+
// Collect results and trigger fail-fast on first error
188+
var firstErr error
189+
for i := 0; i < len(executors); i++ {
190+
err := <-errs
191+
192+
// Track the first non-Warning error
193+
if err != nil && firstErr == nil {
194+
switch err.(type) {
195+
case Warning:
196+
// Warnings don't trigger fail-fast
197+
log.Warning(err.Error())
198+
default:
199+
firstErr = err
200+
// Cancel remaining work on first real error
201+
cancelWork()
202+
}
203+
}
204+
}
205+
206+
// Check if parent context was cancelled
207+
if err := ctx.Err(); err != nil {
208+
return err
209+
}
210+
211+
return firstErr
212+
}
213+
}
214+
134215
func NewFieldExecutor(name string, value interface{}, exec Executor) Executor {
135216
return func(ctx context.Context) error {
136217
return exec(WithLogger(ctx, Logger(ctx).WithField(name, value)))

pkg/common/executor_test.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package common
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"testing"
78
"time"
89

@@ -150,3 +151,240 @@ func TestNewParallelExecutorCanceled(t *testing.T) {
150151
assert.Equal(3, count)
151152
assert.Error(errExpected, err)
152153
}
154+
155+
func TestNewFailFastParallelExecutorWithFailFastTrue(t *testing.T) {
156+
assert := assert.New(t)
157+
158+
ctx := WithFailFast(context.Background(), true)
159+
160+
executedCount := 0
161+
var mu sync.Mutex
162+
163+
// Create executors: some succeed, one fails, rest should be cancelled
164+
executors := []Executor{
165+
func(ctx context.Context) error {
166+
mu.Lock()
167+
executedCount++
168+
mu.Unlock()
169+
time.Sleep(100 * time.Millisecond)
170+
return nil
171+
},
172+
func(ctx context.Context) error {
173+
mu.Lock()
174+
executedCount++
175+
mu.Unlock()
176+
time.Sleep(100 * time.Millisecond)
177+
return fmt.Errorf("intentional failure")
178+
},
179+
func(ctx context.Context) error {
180+
mu.Lock()
181+
executedCount++
182+
mu.Unlock()
183+
time.Sleep(2 * time.Second) // Should be cancelled
184+
return nil
185+
},
186+
func(ctx context.Context) error {
187+
mu.Lock()
188+
executedCount++
189+
mu.Unlock()
190+
time.Sleep(2 * time.Second) // Should be cancelled
191+
return nil
192+
},
193+
}
194+
195+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
196+
197+
assert.Error(err)
198+
assert.Contains(err.Error(), "intentional failure")
199+
}
200+
201+
func TestNewFailFastParallelExecutorWithFailFastFalse(t *testing.T) {
202+
assert := assert.New(t)
203+
204+
ctx := WithFailFast(context.Background(), false)
205+
206+
executedCount := 0
207+
var mu sync.Mutex
208+
209+
executors := []Executor{
210+
func(ctx context.Context) error {
211+
mu.Lock()
212+
executedCount++
213+
mu.Unlock()
214+
return nil
215+
},
216+
func(ctx context.Context) error {
217+
mu.Lock()
218+
executedCount++
219+
mu.Unlock()
220+
return fmt.Errorf("intentional failure")
221+
},
222+
func(ctx context.Context) error {
223+
mu.Lock()
224+
executedCount++
225+
mu.Unlock()
226+
return nil
227+
},
228+
}
229+
230+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
231+
232+
assert.Error(err)
233+
mu.Lock()
234+
assert.Equal(3, executedCount, "all executors should run when fail-fast is false")
235+
mu.Unlock()
236+
}
237+
238+
func TestNewFailFastParallelExecutorNoFailFastInContext(t *testing.T) {
239+
assert := assert.New(t)
240+
241+
ctx := context.Background()
242+
243+
executedCount := 0
244+
var mu sync.Mutex
245+
246+
executors := []Executor{
247+
func(ctx context.Context) error {
248+
mu.Lock()
249+
executedCount++
250+
mu.Unlock()
251+
return nil
252+
},
253+
func(ctx context.Context) error {
254+
mu.Lock()
255+
executedCount++
256+
mu.Unlock()
257+
return fmt.Errorf("intentional failure")
258+
},
259+
func(ctx context.Context) error {
260+
mu.Lock()
261+
executedCount++
262+
mu.Unlock()
263+
return nil
264+
},
265+
}
266+
267+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
268+
269+
assert.Error(err)
270+
mu.Lock()
271+
assert.Equal(3, executedCount, "all executors should run when fail-fast not in context")
272+
mu.Unlock()
273+
}
274+
275+
func TestNewFailFastParallelExecutorWithWarnings(t *testing.T) {
276+
assert := assert.New(t)
277+
278+
ctx := WithFailFast(context.Background(), true)
279+
280+
executedCount := 0
281+
var mu sync.Mutex
282+
283+
// Warnings should not trigger fail-fast
284+
executors := []Executor{
285+
func(ctx context.Context) error {
286+
mu.Lock()
287+
executedCount++
288+
mu.Unlock()
289+
return Warningf("this is a warning")
290+
},
291+
func(ctx context.Context) error {
292+
mu.Lock()
293+
executedCount++
294+
mu.Unlock()
295+
return nil
296+
},
297+
func(ctx context.Context) error {
298+
mu.Lock()
299+
executedCount++
300+
mu.Unlock()
301+
return nil
302+
},
303+
}
304+
305+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
306+
307+
// Warnings don't cause executor to fail
308+
assert.NoError(err)
309+
mu.Lock()
310+
assert.Equal(3, executedCount, "all executors should run when only warnings occur")
311+
mu.Unlock()
312+
}
313+
314+
func TestNewFailFastParallelExecutorParentContextCanceled(t *testing.T) {
315+
assert := assert.New(t)
316+
317+
ctx, cancel := context.WithCancel(context.Background())
318+
ctx = WithFailFast(ctx, true)
319+
320+
executedCount := 0
321+
var mu sync.Mutex
322+
323+
executors := []Executor{
324+
func(ctx context.Context) error {
325+
mu.Lock()
326+
executedCount++
327+
mu.Unlock()
328+
time.Sleep(100 * time.Millisecond)
329+
return nil
330+
},
331+
func(ctx context.Context) error {
332+
mu.Lock()
333+
executedCount++
334+
mu.Unlock()
335+
// Cancel parent context
336+
cancel()
337+
time.Sleep(100 * time.Millisecond)
338+
return nil
339+
},
340+
func(ctx context.Context) error {
341+
mu.Lock()
342+
executedCount++
343+
mu.Unlock()
344+
return nil
345+
},
346+
}
347+
348+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
349+
350+
// Should return context.Canceled from parent
351+
assert.ErrorIs(err, context.Canceled)
352+
}
353+
354+
func TestNewFailFastParallelExecutorAllSuccess(t *testing.T) {
355+
assert := assert.New(t)
356+
357+
ctx := WithFailFast(context.Background(), true)
358+
359+
executedCount := 0
360+
var mu sync.Mutex
361+
362+
// All executors succeed - fail-fast shouldn't interfere
363+
executors := []Executor{
364+
func(ctx context.Context) error {
365+
mu.Lock()
366+
executedCount++
367+
mu.Unlock()
368+
return nil
369+
},
370+
func(ctx context.Context) error {
371+
mu.Lock()
372+
executedCount++
373+
mu.Unlock()
374+
return nil
375+
},
376+
func(ctx context.Context) error {
377+
mu.Lock()
378+
executedCount++
379+
mu.Unlock()
380+
return nil
381+
},
382+
}
383+
384+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
385+
386+
assert.NoError(err)
387+
mu.Lock()
388+
assert.Equal(3, executedCount, "all executors should run when all succeed")
389+
mu.Unlock()
390+
}

pkg/common/job_error.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,26 @@ type jobCancelCtx string
1212

1313
const JobCancelCtxVal = jobCancelCtx("job.cancel")
1414

15+
type failFastContextKey string
16+
17+
const FailFastContextKeyVal = failFastContextKey("job.failfast")
18+
19+
// WithFailFast adds fail-fast configuration to the context
20+
func WithFailFast(ctx context.Context, failFast bool) context.Context {
21+
return context.WithValue(ctx, FailFastContextKeyVal, failFast)
22+
}
23+
24+
// IsFailFast returns whether fail-fast is enabled for this context
25+
func IsFailFast(ctx context.Context) bool {
26+
val := ctx.Value(FailFastContextKeyVal)
27+
if val != nil {
28+
if ff, ok := val.(bool); ok {
29+
return ff
30+
}
31+
}
32+
return false
33+
}
34+
1535
// JobError returns the job error for current context if any
1636
func JobError(ctx context.Context) error {
1737
val := ctx.Value(jobErrorContextKeyVal)

pkg/model/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ type Job struct {
212212
With map[string]interface{} `yaml:"with"`
213213
RawSecrets yaml.Node `yaml:"secrets"`
214214
Result string
215-
outputsMu sync.Mutex // Protects concurrent access to Outputs from parallel matrix jobs
215+
outputsMu sync.Mutex // Protects concurrent access to Outputs from parallel matrix jobs
216216
}
217217

218218
// Lock locks the job's outputs mutex to allow safe concurrent access from parallel matrix jobs

0 commit comments

Comments
 (0)