Skip to content

Commit af90cc9

Browse files
committed
Correctly handle fail-fast
1 parent 00dc0be commit af90cc9

File tree

7 files changed

+417
-16
lines changed

7 files changed

+417
-16
lines changed

pkg/common/executor.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@ func Warningf(format string, args ...interface{}) Warning {
2626
return w
2727
}
2828

29+
// FailFastError wraps a context cancellation error with a more informative message
30+
type FailFastError struct {
31+
Err error
32+
}
33+
34+
// Error returns the error message
35+
func (e FailFastError) Error() string {
36+
return "Job cancelled (fail-fast)"
37+
}
38+
39+
// Unwrap allows errors.Is and errors.As to work
40+
func (e FailFastError) Unwrap() error {
41+
return e.Err
42+
}
43+
2944
// Executor define contract for the steps of a workflow
3045
type Executor func(ctx context.Context) error
3146

@@ -131,6 +146,101 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor {
131146
}
132147
}
133148

149+
// NewFailFastParallelExecutor creates a parallel executor that respects fail-fast semantics
150+
// When fail-fast is enabled via context, it will cancel remaining work on first error
151+
func NewFailFastParallelExecutor(parallel int, executors ...Executor) Executor {
152+
return func(ctx context.Context) error {
153+
failFast := IsFailFast(ctx)
154+
155+
// If fail-fast is disabled, use the standard parallel executor
156+
if !failFast {
157+
return NewParallelExecutor(parallel, executors...)(ctx)
158+
}
159+
160+
// Fail-fast mode: create a cancellable context for workers
161+
workCtx, cancelWork := context.WithCancel(ctx)
162+
defer cancelWork()
163+
164+
work := make(chan Executor, len(executors))
165+
errs := make(chan error, len(executors))
166+
167+
if 1 > parallel {
168+
log.Debugf("Parallel tasks (%d) below minimum, setting to 1", parallel)
169+
parallel = 1
170+
}
171+
172+
// Start worker goroutines
173+
for i := 0; i < parallel; i++ {
174+
go func(work <-chan Executor, errs chan<- error) {
175+
for executor := range work {
176+
// Check if work context was cancelled (fail-fast triggered)
177+
if workCtx.Err() != nil {
178+
errs <- FailFastError{Err: workCtx.Err()}
179+
continue
180+
}
181+
errs <- executor(workCtx)
182+
}
183+
}(work, errs)
184+
}
185+
186+
// Queue work and monitor for failures
187+
go func() {
188+
defer close(work)
189+
for i := 0; i < len(executors); i++ {
190+
// Check if we should stop queuing due to failure
191+
if workCtx.Err() != nil {
192+
// Don't queue remaining work, but send cancelled errors for remaining executors
193+
for j := i; j < len(executors); j++ {
194+
errs <- FailFastError{Err: workCtx.Err()}
195+
}
196+
return
197+
}
198+
work <- executors[i]
199+
}
200+
}()
201+
202+
// Collect results and trigger fail-fast on first error
203+
var firstErr error
204+
var firstFailFastErr error
205+
for i := 0; i < len(executors); i++ {
206+
err := <-errs
207+
208+
if err != nil {
209+
switch err.(type) {
210+
case Warning:
211+
// Warnings don't trigger fail-fast
212+
log.Warning(err.Error())
213+
case FailFastError:
214+
// FailFastErrors are just cancellation notifications, not the root cause
215+
// Keep the first one for returning if no real error is found
216+
if firstFailFastErr == nil {
217+
firstFailFastErr = err
218+
}
219+
default:
220+
// First real error triggers fail-fast
221+
if firstErr == nil {
222+
firstErr = err
223+
// Cancel remaining work on first real error
224+
cancelWork()
225+
}
226+
}
227+
}
228+
}
229+
230+
// If we only have FailFastErrors (all jobs were cancelled), return that
231+
if firstErr == nil && firstFailFastErr != nil {
232+
firstErr = firstFailFastErr
233+
}
234+
235+
// Check if parent context was cancelled
236+
if err := ctx.Err(); err != nil {
237+
return err
238+
}
239+
240+
return firstErr
241+
}
242+
}
243+
134244
func NewFieldExecutor(name string, value interface{}, exec Executor) Executor {
135245
return func(ctx context.Context) error {
136246
return exec(WithLogger(ctx, Logger(ctx).WithField(name, value)))

pkg/common/executor_test.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package common
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"testing"
78
"time"
89

@@ -150,3 +151,240 @@ func TestNewParallelExecutorCanceled(t *testing.T) {
150151
assert.Equal(3, count)
151152
assert.Error(errExpected, err)
152153
}
154+
155+
func TestNewFailFastParallelExecutorWithFailFastTrue(t *testing.T) {
156+
assert := assert.New(t)
157+
158+
ctx := WithFailFast(context.Background(), true)
159+
160+
executedCount := 0
161+
var mu sync.Mutex
162+
163+
// Create executors: some succeed, one fails, rest should be cancelled
164+
executors := []Executor{
165+
func(ctx context.Context) error {
166+
mu.Lock()
167+
executedCount++
168+
mu.Unlock()
169+
time.Sleep(100 * time.Millisecond)
170+
return nil
171+
},
172+
func(ctx context.Context) error {
173+
mu.Lock()
174+
executedCount++
175+
mu.Unlock()
176+
time.Sleep(100 * time.Millisecond)
177+
return fmt.Errorf("intentional failure")
178+
},
179+
func(ctx context.Context) error {
180+
mu.Lock()
181+
executedCount++
182+
mu.Unlock()
183+
time.Sleep(2 * time.Second) // Should be cancelled
184+
return nil
185+
},
186+
func(ctx context.Context) error {
187+
mu.Lock()
188+
executedCount++
189+
mu.Unlock()
190+
time.Sleep(2 * time.Second) // Should be cancelled
191+
return nil
192+
},
193+
}
194+
195+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
196+
197+
assert.Error(err)
198+
assert.Contains(err.Error(), "intentional failure")
199+
}
200+
201+
func TestNewFailFastParallelExecutorWithFailFastFalse(t *testing.T) {
202+
assert := assert.New(t)
203+
204+
ctx := WithFailFast(context.Background(), false)
205+
206+
executedCount := 0
207+
var mu sync.Mutex
208+
209+
executors := []Executor{
210+
func(ctx context.Context) error {
211+
mu.Lock()
212+
executedCount++
213+
mu.Unlock()
214+
return nil
215+
},
216+
func(ctx context.Context) error {
217+
mu.Lock()
218+
executedCount++
219+
mu.Unlock()
220+
return fmt.Errorf("intentional failure")
221+
},
222+
func(ctx context.Context) error {
223+
mu.Lock()
224+
executedCount++
225+
mu.Unlock()
226+
return nil
227+
},
228+
}
229+
230+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
231+
232+
assert.Error(err)
233+
mu.Lock()
234+
assert.Equal(3, executedCount, "all executors should run when fail-fast is false")
235+
mu.Unlock()
236+
}
237+
238+
func TestNewFailFastParallelExecutorNoFailFastInContext(t *testing.T) {
239+
assert := assert.New(t)
240+
241+
ctx := context.Background()
242+
243+
executedCount := 0
244+
var mu sync.Mutex
245+
246+
executors := []Executor{
247+
func(ctx context.Context) error {
248+
mu.Lock()
249+
executedCount++
250+
mu.Unlock()
251+
return nil
252+
},
253+
func(ctx context.Context) error {
254+
mu.Lock()
255+
executedCount++
256+
mu.Unlock()
257+
return fmt.Errorf("intentional failure")
258+
},
259+
func(ctx context.Context) error {
260+
mu.Lock()
261+
executedCount++
262+
mu.Unlock()
263+
return nil
264+
},
265+
}
266+
267+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
268+
269+
assert.Error(err)
270+
mu.Lock()
271+
assert.Equal(3, executedCount, "all executors should run when fail-fast not in context")
272+
mu.Unlock()
273+
}
274+
275+
func TestNewFailFastParallelExecutorWithWarnings(t *testing.T) {
276+
assert := assert.New(t)
277+
278+
ctx := WithFailFast(context.Background(), true)
279+
280+
executedCount := 0
281+
var mu sync.Mutex
282+
283+
// Warnings should not trigger fail-fast
284+
executors := []Executor{
285+
func(ctx context.Context) error {
286+
mu.Lock()
287+
executedCount++
288+
mu.Unlock()
289+
return Warningf("this is a warning")
290+
},
291+
func(ctx context.Context) error {
292+
mu.Lock()
293+
executedCount++
294+
mu.Unlock()
295+
return nil
296+
},
297+
func(ctx context.Context) error {
298+
mu.Lock()
299+
executedCount++
300+
mu.Unlock()
301+
return nil
302+
},
303+
}
304+
305+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
306+
307+
// Warnings don't cause executor to fail
308+
assert.NoError(err)
309+
mu.Lock()
310+
assert.Equal(3, executedCount, "all executors should run when only warnings occur")
311+
mu.Unlock()
312+
}
313+
314+
func TestNewFailFastParallelExecutorParentContextCanceled(t *testing.T) {
315+
assert := assert.New(t)
316+
317+
ctx, cancel := context.WithCancel(context.Background())
318+
ctx = WithFailFast(ctx, true)
319+
320+
executedCount := 0
321+
var mu sync.Mutex
322+
323+
executors := []Executor{
324+
func(ctx context.Context) error {
325+
mu.Lock()
326+
executedCount++
327+
mu.Unlock()
328+
time.Sleep(100 * time.Millisecond)
329+
return nil
330+
},
331+
func(ctx context.Context) error {
332+
mu.Lock()
333+
executedCount++
334+
mu.Unlock()
335+
// Cancel parent context
336+
cancel()
337+
time.Sleep(100 * time.Millisecond)
338+
return nil
339+
},
340+
func(ctx context.Context) error {
341+
mu.Lock()
342+
executedCount++
343+
mu.Unlock()
344+
return nil
345+
},
346+
}
347+
348+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
349+
350+
// Should return context.Canceled from parent
351+
assert.ErrorIs(err, context.Canceled)
352+
}
353+
354+
func TestNewFailFastParallelExecutorAllSuccess(t *testing.T) {
355+
assert := assert.New(t)
356+
357+
ctx := WithFailFast(context.Background(), true)
358+
359+
executedCount := 0
360+
var mu sync.Mutex
361+
362+
// All executors succeed - fail-fast shouldn't interfere
363+
executors := []Executor{
364+
func(ctx context.Context) error {
365+
mu.Lock()
366+
executedCount++
367+
mu.Unlock()
368+
return nil
369+
},
370+
func(ctx context.Context) error {
371+
mu.Lock()
372+
executedCount++
373+
mu.Unlock()
374+
return nil
375+
},
376+
func(ctx context.Context) error {
377+
mu.Lock()
378+
executedCount++
379+
mu.Unlock()
380+
return nil
381+
},
382+
}
383+
384+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
385+
386+
assert.NoError(err)
387+
mu.Lock()
388+
assert.Equal(3, executedCount, "all executors should run when all succeed")
389+
mu.Unlock()
390+
}

pkg/common/job_error.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,26 @@ type jobCancelCtx string
1212

1313
const JobCancelCtxVal = jobCancelCtx("job.cancel")
1414

15+
type failFastContextKey string
16+
17+
const FailFastContextKeyVal = failFastContextKey("job.failfast")
18+
19+
// WithFailFast adds fail-fast configuration to the context
20+
func WithFailFast(ctx context.Context, failFast bool) context.Context {
21+
return context.WithValue(ctx, FailFastContextKeyVal, failFast)
22+
}
23+
24+
// IsFailFast returns whether fail-fast is enabled for this context
25+
func IsFailFast(ctx context.Context) bool {
26+
val := ctx.Value(FailFastContextKeyVal)
27+
if val != nil {
28+
if ff, ok := val.(bool); ok {
29+
return ff
30+
}
31+
}
32+
return false
33+
}
34+
1535
// JobError returns the job error for current context if any
1636
func JobError(ctx context.Context) error {
1737
val := ctx.Value(jobErrorContextKeyVal)

0 commit comments

Comments
 (0)