Skip to content

Commit 4ab1e10

Browse files
authored
refactor: move shared retry and concurrency handling (#195)
1 parent 6f324a0 commit 4ab1e10

File tree

29 files changed

+851
-456
lines changed

29 files changed

+851
-456
lines changed

cmd/internal/exec.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/jahvon/flow/internal/context"
1616
"github.com/jahvon/flow/internal/io"
1717
"github.com/jahvon/flow/internal/runner"
18+
"github.com/jahvon/flow/internal/runner/engine"
1819
"github.com/jahvon/flow/internal/runner/exec"
1920
"github.com/jahvon/flow/internal/runner/launch"
2021
"github.com/jahvon/flow/internal/runner/parallel"
@@ -132,7 +133,8 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
132133
}
133134
}
134135
startTime := time.Now()
135-
if err := runner.Exec(ctx, e, envMap); err != nil {
136+
eng := engine.NewExecEngine()
137+
if err := runner.Exec(ctx, e, eng, envMap); err != nil {
136138
logger.FatalErr(err)
137139
}
138140
dur := time.Since(startTime)

codecov.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ coverage:
1919
only_pulls: true
2020
ignore:
2121
- "**/*.gen.go" # ignore generated code
22+
- "**/*_md.go" # ignore markdown generators
2223
- "tools/**" # ignore tools directory
2324
- "tests/utils/**" # ignore test utilities
2425
- "**/testdata/**" # ignore test data

docs/schemas/flowfile_schema.json

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,8 @@
217217
"description": "A list of executables to run in parallel.\nEach executable can be a command or a reference to another executable.\n"
218218
},
219219
"failFast": {
220-
"description": "If set to true, the parallel executable will fail if any of the sub-executables fail.",
221-
"type": "boolean",
222-
"default": false
220+
"description": "End the parallel execution as soon as an exec exits with a non-zero status. This is the default behavior.\nWhen set to false, all execs will be run regardless of the exit status of parallel execs.\n",
221+
"type": "boolean"
223222
},
224223
"maxThreads": {
225224
"description": "The maximum number of threads to use when executing the parallel executables.",
@@ -451,9 +450,8 @@
451450
"description": "A list of executables to run in serial.\nEach executable can be a command or a reference to another executable.\n"
452451
},
453452
"failFast": {
454-
"description": "If set to true, the serial executable will fail if any of the sub-executables fail.",
455-
"type": "boolean",
456-
"default": false
453+
"description": "End the serial execution as soon as an exec exits with a non-zero status. This is the default behavior.\nWhen set to false, all execs will be run regardless of the exit status of the previous exec.\n",
454+
"type": "boolean"
457455
},
458456
"params": {
459457
"$ref": "#/definitions/ExecutableParameterList"

docs/types/flowfile.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ Launches an application or opens a URI.
188188
| ----- | ----------- | ---- | ------- | :--------: |
189189
| `args` | | [ExecutableArgumentList](#ExecutableArgumentList) | <no value> | |
190190
| `execs` | A list of executables to run in parallel. Each executable can be a command or a reference to another executable. | [ExecutableParallelRefConfigList](#ExecutableParallelRefConfigList) | <no value> ||
191-
| `failFast` | If set to true, the parallel executable will fail if any of the sub-executables fail. | `boolean` | false | |
191+
| `failFast` | End the parallel execution as soon as an exec exits with a non-zero status. This is the default behavior. When set to false, all execs will be run regardless of the exit status of parallel execs. | `boolean` | <no value> | |
192192
| `maxThreads` | The maximum number of threads to use when executing the parallel executables. | `integer` | 5 | |
193193
| `params` | | [ExecutableParameterList](#ExecutableParameterList) | <no value> | |
194194

@@ -335,7 +335,7 @@ Executes a list of executables in serial.
335335
| ----- | ----------- | ---- | ------- | :--------: |
336336
| `args` | | [ExecutableArgumentList](#ExecutableArgumentList) | <no value> | |
337337
| `execs` | A list of executables to run in serial. Each executable can be a command or a reference to another executable. | [ExecutableSerialRefConfigList](#ExecutableSerialRefConfigList) | <no value> ||
338-
| `failFast` | If set to true, the serial executable will fail if any of the sub-executables fail. | `boolean` | false | |
338+
| `failFast` | End the serial execution as soon as an exec exits with a non-zero status. This is the default behavior. When set to false, all execs will be run regardless of the exit status of the previous exec. | `boolean` | <no value> | |
339339
| `params` | | [ExecutableParameterList](#ExecutableParameterList) | <no value> | |
340340

341341
### ExecutableSerialRefConfig

internal/runner/engine/engine.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package engine
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"golang.org/x/sync/errgroup"
8+
9+
"github.com/jahvon/flow/internal/runner/engine/retry"
10+
)
11+
12+
//go:generate mockgen -destination=mocks/mock_engine.go -package=mocks github.com/jahvon/flow/internal/runner/engine Engine
13+
14+
type Result struct {
15+
ID string
16+
Error error
17+
Retries int
18+
}
19+
20+
type ResultSummary struct {
21+
Results []Result
22+
}
23+
24+
func (rs ResultSummary) HasErrors() bool {
25+
for _, r := range rs.Results {
26+
if r.Error != nil {
27+
return true
28+
}
29+
}
30+
return false
31+
}
32+
33+
func (rs ResultSummary) String() string {
34+
var res string
35+
if rs.HasErrors() {
36+
res += "execution error encountered\n\n"
37+
}
38+
for _, r := range rs.Results {
39+
if r.Error == nil {
40+
continue
41+
}
42+
res += fmt.Sprintf("- Executable: %s\n Error: %v", r.ID, r.Error)
43+
if r.Retries > 0 {
44+
res += fmt.Sprintf("\n Retries: %d\n", r.Retries)
45+
}
46+
}
47+
return res
48+
}
49+
50+
type Exec struct {
51+
ID string
52+
Function func() error
53+
MaxRetries int
54+
}
55+
56+
type ExecutionMode int
57+
58+
const (
59+
Parallel ExecutionMode = iota
60+
Serial
61+
)
62+
63+
type Options struct {
64+
MaxThreads int
65+
ExecutionMode ExecutionMode
66+
FailFast *bool
67+
}
68+
69+
type OptionFunc func(*Options)
70+
71+
type Engine interface {
72+
Execute(ctx context.Context, execs []Exec, opts ...OptionFunc) ResultSummary
73+
}
74+
75+
type execEngine struct{}
76+
77+
func NewExecEngine() Engine {
78+
return &execEngine{}
79+
}
80+
81+
func WithMaxThreads(maxThreads int) OptionFunc {
82+
return func(o *Options) {
83+
o.MaxThreads = maxThreads
84+
}
85+
}
86+
87+
func WithFailFast(failFast *bool) OptionFunc {
88+
return func(o *Options) {
89+
o.FailFast = failFast
90+
}
91+
}
92+
93+
func WithMode(mode ExecutionMode) OptionFunc {
94+
return func(o *Options) {
95+
o.ExecutionMode = mode
96+
}
97+
}
98+
99+
func (e *execEngine) Execute(ctx context.Context, execs []Exec, opts ...OptionFunc) ResultSummary {
100+
options := Options{MaxThreads: 0, ExecutionMode: Serial}
101+
for _, opt := range opts {
102+
opt(&options)
103+
}
104+
var results []Result
105+
switch options.ExecutionMode {
106+
case Parallel:
107+
results = e.executeParallel(ctx, execs, options)
108+
case Serial:
109+
results = e.executeSerial(ctx, execs, options)
110+
default:
111+
results = []Result{{Error: fmt.Errorf("invalid execution mode")}}
112+
}
113+
return ResultSummary{Results: results}
114+
}
115+
116+
func (e *execEngine) executeParallel(ctx context.Context, execs []Exec, opts Options) []Result {
117+
results := make([]Result, len(execs))
118+
119+
groupCtx, groupCancel := context.WithCancel(ctx)
120+
defer groupCancel()
121+
group, _ := errgroup.WithContext(groupCtx)
122+
limit := opts.MaxThreads
123+
if limit == 0 {
124+
limit = len(execs)
125+
}
126+
group.SetLimit(limit)
127+
128+
for i, exec := range execs {
129+
runExec := func() error {
130+
rh := retry.NewRetryHandler(exec.MaxRetries, 0)
131+
err := rh.Execute(exec.Function)
132+
results[i] = Result{
133+
ID: exec.ID,
134+
Error: err,
135+
Retries: rh.GetStats().Attempts - 1,
136+
}
137+
ff := opts.FailFast == nil || *opts.FailFast
138+
if err != nil && ff {
139+
return err
140+
}
141+
return nil
142+
}
143+
group.Go(runExec)
144+
}
145+
146+
if err := group.Wait(); err != nil {
147+
if len(results) > 0 {
148+
return results
149+
}
150+
return []Result{{Error: err}}
151+
}
152+
return results
153+
}
154+
155+
func (e *execEngine) executeSerial(ctx context.Context, execs []Exec, opts Options) []Result {
156+
results := make([]Result, len(execs))
157+
for i, exec := range execs {
158+
select {
159+
case <-ctx.Done():
160+
results[i] = Result{
161+
ID: exec.ID,
162+
Error: ctx.Err(),
163+
}
164+
return results
165+
default:
166+
rh := retry.NewRetryHandler(exec.MaxRetries, 0)
167+
err := rh.Execute(exec.Function)
168+
results[i] = Result{
169+
ID: exec.ID,
170+
Error: err,
171+
Retries: rh.GetStats().Attempts - 1,
172+
}
173+
174+
ff := opts.FailFast == nil || *opts.FailFast
175+
if err != nil && ff {
176+
return results[:i+1]
177+
}
178+
}
179+
}
180+
181+
return results
182+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package engine_test
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
. "github.com/onsi/ginkgo/v2"
10+
. "github.com/onsi/gomega"
11+
12+
"github.com/jahvon/flow/internal/runner/engine"
13+
)
14+
15+
func TestEngine_Execute(t *testing.T) {
16+
RegisterFailHandler(Fail)
17+
RunSpecs(t, "Execute Engine Suite")
18+
}
19+
20+
var _ = Describe("e.Execute", func() {
21+
var (
22+
eng engine.Engine
23+
ctx context.Context
24+
cancel context.CancelFunc
25+
)
26+
27+
BeforeEach(func() {
28+
eng = engine.NewExecEngine()
29+
ctx, cancel = context.WithCancel(context.Background())
30+
})
31+
32+
AfterEach(func() {
33+
cancel()
34+
})
35+
36+
Context("Parallel execution", func() {
37+
It("should execute execs in parallel", func() {
38+
execs := []engine.Exec{
39+
{ID: "exec1", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
40+
{ID: "exec2", Function: func() error { return nil }},
41+
}
42+
43+
start := time.Now()
44+
ff := false
45+
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel), engine.WithFailFast(&ff))
46+
duration := time.Since(start)
47+
48+
Expect(summary.Results).To(HaveLen(2))
49+
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
50+
Expect(summary.Results[1].Error).NotTo(HaveOccurred())
51+
Expect(duration).To(BeNumerically("<", 200*time.Millisecond))
52+
})
53+
54+
It("should handle exec failures with fail fast", func() {
55+
execs := []engine.Exec{
56+
{ID: "exec1", Function: func() error { return errors.New("error") }},
57+
{ID: "exec2", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
58+
}
59+
60+
ff := true
61+
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel), engine.WithFailFast(&ff))
62+
63+
Expect(summary.Results).To(HaveLen(2))
64+
Expect(summary.Results[0].Error).To(HaveOccurred())
65+
Expect(summary.Results[1].Error).ToNot(HaveOccurred())
66+
Expect(summary.HasErrors()).To(BeTrue())
67+
})
68+
69+
It("should limit the number of concurrent execs", func() {
70+
execs := []engine.Exec{
71+
{ID: "exec1", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
72+
{ID: "exec2", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
73+
{ID: "exec3", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
74+
{ID: "exec4", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
75+
{ID: "exec5", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
76+
}
77+
78+
start := time.Now()
79+
ff := false
80+
summary := eng.Execute(ctx, execs,
81+
engine.WithMode(engine.Parallel), engine.WithFailFast(&ff), engine.WithMaxThreads(2))
82+
duration := time.Since(start)
83+
84+
Expect(summary.Results).To(HaveLen(5))
85+
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
86+
Expect(summary.Results[1].Error).NotTo(HaveOccurred())
87+
Expect(summary.Results[2].Error).NotTo(HaveOccurred())
88+
Expect(summary.Results[3].Error).NotTo(HaveOccurred())
89+
Expect(summary.Results[4].Error).NotTo(HaveOccurred())
90+
Expect(duration).To(BeNumerically(">=", 250*time.Millisecond))
91+
})
92+
})
93+
94+
Context("Serial execution", func() {
95+
It("should execute execs serially", func() {
96+
execs := []engine.Exec{
97+
{ID: "exec1", Function: func() error { time.Sleep(100 * time.Millisecond); return nil }},
98+
{ID: "exec2", Function: func() error { time.Sleep(110 * time.Millisecond); return nil }},
99+
}
100+
101+
start := time.Now()
102+
ff := false
103+
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(&ff))
104+
duration := time.Since(start)
105+
106+
Expect(summary.Results).To(HaveLen(2))
107+
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
108+
Expect(summary.Results[1].Error).NotTo(HaveOccurred())
109+
Expect(duration).To(BeNumerically(">=", 200*time.Millisecond))
110+
})
111+
112+
It("should handle exec failures with fail fast", func() {
113+
execs := []engine.Exec{
114+
{ID: "exec1", Function: func() error { return errors.New("error") }},
115+
{ID: "exec2", Function: func() error { return nil }},
116+
}
117+
118+
ff := true
119+
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(&ff))
120+
121+
Expect(summary.Results).To(HaveLen(1))
122+
Expect(summary.Results[0].Error).To(HaveOccurred())
123+
Expect(summary.HasErrors()).To(BeTrue())
124+
})
125+
})
126+
})

0 commit comments

Comments
 (0)