Skip to content

Commit 4151dd5

Browse files
authored
Merge pull request #166 from cschleiden/cschleiden/check-params
Check params when scheduling workflows or activities
2 parents ae76e98 + 7243ce4 commit 4151dd5

File tree

10 files changed

+256
-27
lines changed

10 files changed

+256
-27
lines changed

backend/options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@ type Options struct {
2424

2525
StickyTimeout time.Duration
2626

27+
// WorkflowLockTimeout determines how long a workflow task can be locked for. If the workflow task is not completed
28+
// by that timeframe, it's considered abandoned and another worker might pick it up.
29+
//
30+
// For long running workflow tasks, combine this with heartbearts.
2731
WorkflowLockTimeout time.Duration
2832

33+
// ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed
34+
// by that timeframe, it's considered abandoned and another worker might pick it up
2935
ActivityLockTimeout time.Duration
3036
}
3137

backend/test/e2e.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,12 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
8282
}
8383
register(t, ctx, w, []interface{}{wf}, nil)
8484

85-
output, err := runWorkflowWithResult[int](t, ctx, c, wf)
85+
instance, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
86+
InstanceID: uuid.NewString(),
87+
}, wf)
8688

87-
require.Zero(t, output)
88-
require.ErrorContains(t, err, "converting workflow inputs: mismatched argument count: expected 1, got 0")
89+
require.Nil(t, instance)
90+
require.ErrorContains(t, err, "mismatched argument count: expected 1, got 0")
8991
},
9092
},
9193
{
@@ -123,7 +125,7 @@ func EndToEndBackendTest(t *testing.T, setup func() TestBackend, teardown func(b
123125
output, err := runWorkflowWithResult[int](t, ctx, c, wf)
124126

125127
require.Zero(t, output)
126-
require.ErrorContains(t, err, "converting activity inputs: mismatched argument count: expected 2, got 1")
128+
require.ErrorContains(t, err, "mismatched argument count: expected 2, got 1")
127129
},
128130
},
129131
{

client/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ func New(backend backend.Backend) Client {
5555
}
5656

5757
func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowInstanceOptions, wf workflow.Workflow, args ...interface{}) (*workflow.Instance, error) {
58+
// Check arguments
59+
if err := fn.ParamsMatch(wf, 1, args...); err != nil {
60+
return nil, err
61+
}
62+
5863
inputs, err := a.ArgsToInputs(c.backend.Converter(), args...)
5964
if err != nil {
6065
return nil, fmt.Errorf("converting arguments: %w", err)

client/client_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,33 @@ import (
1212
"github.com/cschleiden/go-workflows/internal/core"
1313
"github.com/cschleiden/go-workflows/internal/history"
1414
"github.com/cschleiden/go-workflows/internal/logger"
15+
"github.com/cschleiden/go-workflows/workflow"
1516
"github.com/google/uuid"
1617
"github.com/stretchr/testify/mock"
1718
"github.com/stretchr/testify/require"
1819
)
1920

21+
func Test_Client_CreateWorkflowInstance_ParamMismatch(t *testing.T) {
22+
wf := func(workflow.Context, int) (int, error) {
23+
return 0, nil
24+
}
25+
26+
ctx := context.Background()
27+
28+
b := &backend.MockBackend{}
29+
c := &client{
30+
backend: b,
31+
clock: clock.New(),
32+
}
33+
34+
result, err := c.CreateWorkflowInstance(ctx, WorkflowInstanceOptions{
35+
InstanceID: "id",
36+
}, wf, "foo")
37+
require.Zero(t, result)
38+
require.EqualError(t, err, "mismatched argument type: expected int, got string")
39+
b.AssertExpectations(t)
40+
}
41+
2042
func Test_Client_GetWorkflowResultTimeout(t *testing.T) {
2143
instance := core.NewWorkflowInstance(uuid.NewString(), "test")
2244

internal/fn/fn.go

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package fn
22

33
import (
4+
"errors"
5+
"fmt"
46
"reflect"
57
"runtime"
68
"strings"
@@ -16,16 +18,58 @@ func Name(i interface{}) string {
1618
return strings.TrimSuffix(fnName, "-fm")
1719
}
1820

19-
func ReturnTypeMatch[TResult any](fn interface{}) bool {
21+
func ReturnTypeMatch[TResult any](fn interface{}) error {
2022
fnType := reflect.TypeOf(fn)
2123
if fnType.Kind() != reflect.Func {
22-
return false
24+
return errors.New("not a function")
2325
}
2426

25-
if fnType.NumOut() == 1 {
26-
return true
27+
if fnType.NumOut() < 1 {
28+
return errors.New("function has no return value, must return at least (error) or (result, error)")
2729
}
2830

29-
t := *new(TResult)
30-
return fnType.Out(0) == reflect.TypeOf(t)
31+
if fnType.NumOut() > 2 {
32+
return errors.New("function has too many return values, must return at most (error) or (result, error)")
33+
}
34+
35+
errorPosition := 0
36+
if fnType.NumOut() == 2 {
37+
errorPosition = 1
38+
39+
t := *new(TResult)
40+
if fnType.Out(0) != reflect.TypeOf(t) {
41+
return fmt.Errorf("function must return %s, got %s", reflect.TypeOf(t), fnType.Out(0))
42+
}
43+
}
44+
45+
// Check if return is error
46+
if fnType.Out(errorPosition) != reflect.TypeOf((*error)(nil)).Elem() {
47+
return fmt.Errorf("function must return error, got %s", fnType.Out(errorPosition))
48+
}
49+
50+
return nil
51+
}
52+
53+
func ParamsMatch(fn interface{}, skip int, args ...interface{}) error {
54+
fnType := reflect.TypeOf(fn)
55+
if fnType.Kind() != reflect.Func {
56+
return errors.New("not a function")
57+
}
58+
59+
if fnType.NumIn() != skip+len(args) {
60+
return fmt.Errorf("mismatched argument count: expected %d, got %d", fnType.NumIn()-skip, len(args))
61+
}
62+
63+
for i, arg := range args {
64+
// if target is interface{} skip
65+
if fnType.In(skip+i).Kind() == reflect.Interface {
66+
continue
67+
}
68+
69+
if fnType.In(skip+i) != reflect.TypeOf(arg) {
70+
return fmt.Errorf("mismatched argument type: expected %s, got %s", fnType.In(skip+i), reflect.TypeOf(arg))
71+
}
72+
}
73+
74+
return nil
3175
}

internal/fn/fn_test.go

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,48 +68,135 @@ func errorReturn() error {
6868
func TestReturnTypeMatch(t *testing.T) {
6969
tests := []struct {
7070
name string
71-
fn func() bool
72-
want bool
71+
fn func() error
72+
want string
7373
}{
7474
{
7575
name: "int match",
76-
fn: func() bool {
76+
fn: func() error {
7777
return ReturnTypeMatch[int](intReturn)
7878
},
79-
want: true,
79+
want: "",
8080
},
8181
{
8282
name: "string match",
83-
fn: func() bool {
83+
fn: func() error {
8484
return ReturnTypeMatch[string](stringReturn)
8585
},
86-
want: true,
86+
want: "",
8787
},
8888
{
8989
name: "int mismatch",
90-
fn: func() bool {
90+
fn: func() error {
9191
return ReturnTypeMatch[string](intReturn)
9292
},
93+
want: "function must return string, got int",
9394
},
9495
{
9596
name: "no param",
96-
fn: func() bool {
97+
fn: func() error {
9798
return ReturnTypeMatch[any](errorReturn)
9899
},
99-
want: true,
100+
want: "",
100101
},
101102
{
102103
name: "no param mismatch",
103-
fn: func() bool {
104+
fn: func() error {
104105
return ReturnTypeMatch[int](errorReturn)
105106
},
106-
want: true,
107+
want: "",
107108
},
108109
}
109110
for _, tt := range tests {
110111
t.Run(tt.name, func(t *testing.T) {
111112
got := tt.fn()
112-
require.Equal(t, tt.want, got)
113+
if tt.want == "" {
114+
require.NoError(t, got)
115+
} else {
116+
require.Error(t, got)
117+
require.Equal(t, tt.want, got.Error())
118+
}
119+
})
120+
}
121+
}
122+
123+
func intParam(int) {
124+
}
125+
126+
func stringParam(string) {
127+
}
128+
129+
func interfaceParam(string, interface{}, int) {
130+
}
131+
132+
func mixedParams(context.Context, int, string) {
133+
}
134+
135+
func TestParamsMatch(t *testing.T) {
136+
tests := []struct {
137+
name string
138+
fn func() error
139+
want string
140+
}{
141+
{
142+
name: "int match",
143+
fn: func() error {
144+
return ParamsMatch(intParam, 0, 42)
145+
},
146+
want: "",
147+
},
148+
{
149+
name: "int mismatch",
150+
fn: func() error {
151+
return ParamsMatch(intParam, 0, "")
152+
},
153+
want: "mismatched argument type: expected int, got string",
154+
},
155+
{
156+
name: "string mismatch",
157+
fn: func() error {
158+
return ParamsMatch(stringParam, 0, 42)
159+
},
160+
want: "mismatched argument type: expected string, got int",
161+
},
162+
{
163+
name: "interface{} ignored",
164+
fn: func() error {
165+
return ParamsMatch(interfaceParam, 0, "", 23, 42)
166+
},
167+
want: "",
168+
},
169+
{
170+
name: "mixed params",
171+
fn: func() error {
172+
return ParamsMatch(mixedParams, 1, 42, "")
173+
},
174+
want: "",
175+
},
176+
{
177+
name: "mixed params - no skip",
178+
fn: func() error {
179+
return ParamsMatch(mixedParams, 0, 42, "")
180+
},
181+
want: "mismatched argument count: expected 3, got 2",
182+
},
183+
{
184+
name: "mixed params - wrong params",
185+
fn: func() error {
186+
return ParamsMatch(mixedParams, 1, "", 42)
187+
},
188+
want: "mismatched argument type: expected int, got string",
189+
},
190+
}
191+
for _, tt := range tests {
192+
t.Run(tt.name, func(t *testing.T) {
193+
got := tt.fn()
194+
if tt.want == "" {
195+
require.NoError(t, got)
196+
} else {
197+
require.Error(t, got)
198+
require.Equal(t, tt.want, got.Error())
199+
}
113200
})
114201
}
115202
}

workflow/activity.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,15 @@ func executeActivity[TResult any](ctx Context, options ActivityOptions, attempt
3838
return f
3939
}
4040

41-
if !fn.ReturnTypeMatch[TResult](activity) {
42-
f.Set(*new(TResult), fmt.Errorf("activity return type does not match expected type"))
41+
// Check return type
42+
if err := fn.ReturnTypeMatch[TResult](activity); err != nil {
43+
f.Set(*new(TResult), err)
44+
return f
45+
}
46+
47+
// Check arguments
48+
if err := fn.ParamsMatch(activity, 1, args...); err != nil {
49+
f.Set(*new(TResult), err)
4350
return f
4451
}
4552

workflow/activity_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"go.opentelemetry.io/otel/trace"
1515
)
1616

17-
func Test_executeActivity_ParamMismatch(t *testing.T) {
17+
func Test_executeActivity_ResultMismatch(t *testing.T) {
1818
a := func(ctx Context) (int, error) {
1919
return 42, nil
2020
}
@@ -38,3 +38,27 @@ func Test_executeActivity_ParamMismatch(t *testing.T) {
3838
c.Execute()
3939
require.True(t, c.Finished())
4040
}
41+
func Test_executeActivity_ParamMismatch(t *testing.T) {
42+
a := func(ctx Context, s string, n int) (int, error) {
43+
return 42, nil
44+
}
45+
46+
ctx := sync.Background()
47+
ctx = converter.WithConverter(ctx, converter.DefaultConverter)
48+
ctx = workflowstate.WithWorkflowState(
49+
ctx,
50+
workflowstate.NewWorkflowState(core.NewWorkflowInstance("a", ""), logger.NewDefaultLogger(), clock.New()),
51+
)
52+
ctx = workflowtracer.WithWorkflowTracer(ctx, workflowtracer.New(trace.NewNoopTracerProvider().Tracer("test")))
53+
54+
c := sync.NewCoroutine(ctx, func(ctx sync.Context) error {
55+
f := executeActivity[int](ctx, DefaultActivityOptions, 1, a)
56+
_, err := f.Get(ctx)
57+
require.Error(t, err)
58+
59+
return nil
60+
})
61+
62+
c.Execute()
63+
require.True(t, c.Finished())
64+
}

workflow/subworkflow.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,15 @@ func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflo
4848
return f
4949
}
5050

51-
if !fn.ReturnTypeMatch[TResult](wf) {
52-
f.Set(*new(TResult), fmt.Errorf("subworkflow return type does not match expected type"))
51+
// Check return type
52+
if err := fn.ReturnTypeMatch[TResult](wf); err != nil {
53+
f.Set(*new(TResult), err)
54+
return f
55+
}
56+
57+
// Check arguments
58+
if err := fn.ParamsMatch(wf, 1, args...); err != nil {
59+
f.Set(*new(TResult), err)
5360
return f
5461
}
5562

0 commit comments

Comments
 (0)