Skip to content

Commit 12cf263

Browse files
committed
Correctly handle fail-fast
1 parent 00dc0be commit 12cf263

File tree

4 files changed

+241
-1
lines changed

4 files changed

+241
-1
lines changed

pkg/common/executor.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,87 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor {
131131
}
132132
}
133133

134+
// NewFailFastParallelExecutor creates a parallel executor that respects fail-fast semantics
135+
// When fail-fast is enabled via context, it will cancel remaining work on first error
136+
func NewFailFastParallelExecutor(parallel int, executors ...Executor) Executor {
137+
return func(ctx context.Context) error {
138+
failFast := IsFailFast(ctx)
139+
140+
// If fail-fast is disabled, use the standard parallel executor
141+
if !failFast {
142+
return NewParallelExecutor(parallel, executors...)(ctx)
143+
}
144+
145+
// Fail-fast mode: create a cancellable context for workers
146+
workCtx, cancelWork := context.WithCancel(ctx)
147+
defer cancelWork()
148+
149+
work := make(chan Executor, len(executors))
150+
errs := make(chan error, len(executors))
151+
152+
if 1 > parallel {
153+
log.Debugf("Parallel tasks (%d) below minimum, setting to 1", parallel)
154+
parallel = 1
155+
}
156+
157+
// Start worker goroutines
158+
for i := 0; i < parallel; i++ {
159+
go func(work <-chan Executor, errs chan<- error) {
160+
for executor := range work {
161+
// Check if work context was cancelled (fail-fast triggered)
162+
if workCtx.Err() != nil {
163+
errs <- workCtx.Err()
164+
continue
165+
}
166+
errs <- executor(workCtx)
167+
}
168+
}(work, errs)
169+
}
170+
171+
// Queue work and monitor for failures
172+
go func() {
173+
defer close(work)
174+
for i := 0; i < len(executors); i++ {
175+
// Check if we should stop queuing due to failure
176+
if workCtx.Err() != nil {
177+
// Don't queue remaining work, but send cancelled errors for remaining executors
178+
for j := i; j < len(executors); j++ {
179+
errs <- workCtx.Err()
180+
}
181+
return
182+
}
183+
work <- executors[i]
184+
}
185+
}()
186+
187+
// Collect results and trigger fail-fast on first error
188+
var firstErr error
189+
for i := 0; i < len(executors); i++ {
190+
err := <-errs
191+
192+
// Track the first non-Warning error
193+
if err != nil && firstErr == nil {
194+
switch err.(type) {
195+
case Warning:
196+
// Warnings don't trigger fail-fast
197+
log.Warning(err.Error())
198+
default:
199+
firstErr = err
200+
// Cancel remaining work on first real error
201+
cancelWork()
202+
}
203+
}
204+
}
205+
206+
// Check if parent context was cancelled
207+
if err := ctx.Err(); err != nil {
208+
return err
209+
}
210+
211+
return firstErr
212+
}
213+
}
214+
134215
func NewFieldExecutor(name string, value interface{}, exec Executor) Executor {
135216
return func(ctx context.Context) error {
136217
return exec(WithLogger(ctx, Logger(ctx).WithField(name, value)))

pkg/common/executor_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package common
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"testing"
78
"time"
89

@@ -150,3 +151,123 @@ func TestNewParallelExecutorCanceled(t *testing.T) {
150151
assert.Equal(3, count)
151152
assert.Error(errExpected, err)
152153
}
154+
155+
func TestNewFailFastParallelExecutorWithFailFastTrue(t *testing.T) {
156+
assert := assert.New(t)
157+
158+
ctx := WithFailFast(context.Background(), true)
159+
160+
executedCount := 0
161+
var mu sync.Mutex
162+
163+
// Create executors: some succeed, one fails, rest should be cancelled
164+
executors := []Executor{
165+
func(ctx context.Context) error {
166+
mu.Lock()
167+
executedCount++
168+
mu.Unlock()
169+
time.Sleep(100 * time.Millisecond)
170+
return nil
171+
},
172+
func(ctx context.Context) error {
173+
mu.Lock()
174+
executedCount++
175+
mu.Unlock()
176+
time.Sleep(100 * time.Millisecond)
177+
return fmt.Errorf("intentional failure")
178+
},
179+
func(ctx context.Context) error {
180+
mu.Lock()
181+
executedCount++
182+
mu.Unlock()
183+
time.Sleep(2 * time.Second) // Should be cancelled
184+
return nil
185+
},
186+
func(ctx context.Context) error {
187+
mu.Lock()
188+
executedCount++
189+
mu.Unlock()
190+
time.Sleep(2 * time.Second) // Should be cancelled
191+
return nil
192+
},
193+
}
194+
195+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
196+
197+
assert.Error(err)
198+
assert.Contains(err.Error(), "intentional failure")
199+
}
200+
201+
func TestNewFailFastParallelExecutorWithFailFastFalse(t *testing.T) {
202+
assert := assert.New(t)
203+
204+
ctx := WithFailFast(context.Background(), false)
205+
206+
executedCount := 0
207+
var mu sync.Mutex
208+
209+
executors := []Executor{
210+
func(ctx context.Context) error {
211+
mu.Lock()
212+
executedCount++
213+
mu.Unlock()
214+
return nil
215+
},
216+
func(ctx context.Context) error {
217+
mu.Lock()
218+
executedCount++
219+
mu.Unlock()
220+
return fmt.Errorf("intentional failure")
221+
},
222+
func(ctx context.Context) error {
223+
mu.Lock()
224+
executedCount++
225+
mu.Unlock()
226+
return nil
227+
},
228+
}
229+
230+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
231+
232+
assert.Error(err)
233+
mu.Lock()
234+
assert.Equal(3, executedCount, "all executors should run when fail-fast is false")
235+
mu.Unlock()
236+
}
237+
238+
func TestNewFailFastParallelExecutorNoFailFastInContext(t *testing.T) {
239+
assert := assert.New(t)
240+
241+
ctx := context.Background()
242+
243+
executedCount := 0
244+
var mu sync.Mutex
245+
246+
executors := []Executor{
247+
func(ctx context.Context) error {
248+
mu.Lock()
249+
executedCount++
250+
mu.Unlock()
251+
return nil
252+
},
253+
func(ctx context.Context) error {
254+
mu.Lock()
255+
executedCount++
256+
mu.Unlock()
257+
return fmt.Errorf("intentional failure")
258+
},
259+
func(ctx context.Context) error {
260+
mu.Lock()
261+
executedCount++
262+
mu.Unlock()
263+
return nil
264+
},
265+
}
266+
267+
err := NewFailFastParallelExecutor(2, executors...)(ctx)
268+
269+
assert.Error(err)
270+
mu.Lock()
271+
assert.Equal(3, executedCount, "all executors should run when fail-fast not in context")
272+
mu.Unlock()
273+
}

pkg/common/job_error.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,26 @@ type jobCancelCtx string
1212

1313
const JobCancelCtxVal = jobCancelCtx("job.cancel")
1414

15+
type failFastContextKey string
16+
17+
const FailFastContextKeyVal = failFastContextKey("job.failfast")
18+
19+
// WithFailFast adds fail-fast configuration to the context
20+
func WithFailFast(ctx context.Context, failFast bool) context.Context {
21+
return context.WithValue(ctx, FailFastContextKeyVal, failFast)
22+
}
23+
24+
// IsFailFast returns whether fail-fast is enabled for this context
25+
func IsFailFast(ctx context.Context) bool {
26+
val := ctx.Value(FailFastContextKeyVal)
27+
if val != nil {
28+
if ff, ok := val.(bool); ok {
29+
return ff
30+
}
31+
}
32+
return false
33+
}
34+
1535
// JobError returns the job error for current context if any
1636
func JobError(ctx context.Context) error {
1737
val := ctx.Value(jobErrorContextKeyVal)

pkg/runner/runner.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,20 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
207207
return executor(common.WithJobErrorContainer(WithJobLogger(ctx, rc.Run.JobID, jobName, rc.Config, &rc.Masks, matrix)))
208208
})
209209
}
210-
pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...))
210+
211+
// Use fail-fast executor for matrix jobs
212+
matrixExecutor := common.NewFailFastParallelExecutor(maxParallel, stageExecutor...)
213+
214+
// Wrap with fail-fast context based on strategy
215+
pipeline = append(pipeline, func(ctx context.Context) error {
216+
// Inject fail-fast setting into context
217+
failFast := false
218+
if job.Strategy != nil {
219+
failFast = job.Strategy.FailFast
220+
}
221+
ctx = common.WithFailFast(ctx, failFast)
222+
return matrixExecutor(ctx)
223+
})
211224
}
212225

213226
log.Debugf("PlanExecutor concurrency: %d", runner.config.GetConcurrentJobs())
@@ -223,6 +236,11 @@ func handleFailure(plan *model.Plan) common.Executor {
223236
for _, stage := range plan.Stages {
224237
for _, run := range stage.Runs {
225238
if run.Job().Result == "failure" {
239+
job := run.Job()
240+
// Check if this was a matrix job with fail-fast
241+
if job.Strategy != nil && job.Strategy.FailFast {
242+
return fmt.Errorf("Job '%s' failed (fail-fast enabled, remaining matrix jobs may have been cancelled)", run.String())
243+
}
226244
return fmt.Errorf("Job '%s' failed", run.String())
227245
}
228246
}

0 commit comments

Comments
 (0)