Skip to content

Commit facf4b0

Browse files
authored
Merge pull request #62 from cschleiden/check-activity-input-count
Validate input counts for workflows and activities
2 parents 046cfd6 + 72741d1 commit facf4b0

File tree

6 files changed

+212
-10
lines changed

6 files changed

+212
-10
lines changed

backend/test/e2e.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,20 @@ func EndToEndBackendTest(t *testing.T, setup func() backend.Backend, teardown fu
4747
require.ErrorContains(t, err, "workflow 1 not found")
4848
},
4949
},
50+
{
51+
name: "WorkflowArgumentMismatch",
52+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker) {
53+
wf := func(ctx workflow.Context, p1 int) (int, error) {
54+
return 42, nil
55+
}
56+
register(t, ctx, w, []interface{}{wf}, nil)
57+
58+
output, err := runWorkflowWithResult[int](t, ctx, c, wf)
59+
60+
require.Zero(t, output)
61+
require.ErrorContains(t, err, "converting workflow inputs: mismatched argument count: expected 1, got 0")
62+
},
63+
},
5064
{
5165
name: "UnregisteredActivity",
5266
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker) {
@@ -62,6 +76,21 @@ func EndToEndBackendTest(t *testing.T, setup func() backend.Backend, teardown fu
6276
require.ErrorContains(t, err, "activity not found")
6377
},
6478
},
79+
{
80+
name: "ActivityArgumentMismatch",
81+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker) {
82+
a := func(context.Context, int, int) error { return nil }
83+
wf := func(ctx workflow.Context) (int, error) {
84+
return workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, a, 42).Get(ctx)
85+
}
86+
register(t, ctx, w, []interface{}{wf}, []interface{}{a})
87+
88+
output, err := runWorkflowWithResult[int](t, ctx, c, wf)
89+
90+
require.Zero(t, output)
91+
require.ErrorContains(t, err, "converting activity inputs: mismatched argument count: expected 2, got 1")
92+
},
93+
},
6594
}
6695

6796
for _, tt := range tests {

internal/activity/executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (pa
3131

3232
activity, err := e.r.GetActivity(a.Name)
3333
if err != nil {
34-
return nil, fmt.Errorf("finding activity in registry: %w", err)
34+
return nil, err
3535
}
3636

3737
activityFn := reflect.ValueOf(activity)

internal/activity/executor_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package activity
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/internal/core"
9+
"github.com/cschleiden/go-workflows/internal/fn"
10+
"github.com/cschleiden/go-workflows/internal/history"
11+
"github.com/cschleiden/go-workflows/internal/logger"
12+
"github.com/cschleiden/go-workflows/internal/payload"
13+
"github.com/cschleiden/go-workflows/internal/task"
14+
"github.com/cschleiden/go-workflows/internal/workflow"
15+
"github.com/google/uuid"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
func TestExecutor_ExecuteActivity(t *testing.T) {
20+
tests := []struct {
21+
name string
22+
setup func(t *testing.T, r *workflow.Registry) *history.ActivityScheduledAttributes
23+
result func(t *testing.T, result payload.Payload, err error)
24+
}{
25+
{
26+
name: "unknown activity",
27+
setup: func(t *testing.T, r *workflow.Registry) *history.ActivityScheduledAttributes {
28+
return &history.ActivityScheduledAttributes{
29+
Name: "unknown",
30+
}
31+
},
32+
result: func(t *testing.T, result payload.Payload, err error) {
33+
require.Nil(t, result)
34+
require.Error(t, err)
35+
require.EqualError(t, err, "activity not found")
36+
},
37+
},
38+
{
39+
name: "mismatched argument count",
40+
setup: func(t *testing.T, r *workflow.Registry) *history.ActivityScheduledAttributes {
41+
a := func(context.Context, int, int) error { return nil }
42+
require.NoError(t, r.RegisterActivity(a))
43+
44+
return &history.ActivityScheduledAttributes{
45+
Name: fn.Name(a),
46+
}
47+
},
48+
result: func(t *testing.T, result payload.Payload, err error) {
49+
require.Nil(t, result)
50+
require.Error(t, err)
51+
require.EqualError(t, err, "converting activity inputs: mismatched argument count: expected 2, got 0")
52+
},
53+
},
54+
}
55+
for _, tt := range tests {
56+
t.Run(tt.name, func(t *testing.T) {
57+
r := workflow.NewRegistry()
58+
attr := tt.setup(t, r)
59+
60+
e := &Executor{
61+
logger: logger.NewDefaultLogger(),
62+
r: r,
63+
}
64+
got, err := e.ExecuteActivity(context.Background(), &task.Activity{
65+
ID: uuid.NewString(),
66+
WorkflowInstance: core.NewWorkflowInstance("instanceID", "executionID"),
67+
Event: history.NewHistoryEvent(1, time.Now(), history.EventType_ActivityScheduled, attr),
68+
})
69+
tt.result(t, got, err)
70+
})
71+
}
72+
}

internal/args/args.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ func InputsToArgs(c converter.Converter, fn reflect.Value, inputs []payload.Payl
2828
addContext := false
2929

3030
activityFnT := fn.Type()
31-
3231
numArgs := activityFnT.NumIn()
32+
3333
args := make([]reflect.Value, numArgs)
3434

3535
input := 0
@@ -42,17 +42,27 @@ func InputsToArgs(c converter.Converter, fn reflect.Value, inputs []payload.Payl
4242
continue
4343
}
4444

45-
arg := reflect.New(argT).Interface()
46-
err := c.From(inputs[input], arg)
47-
if err != nil {
48-
return nil, false, fmt.Errorf("converting inputs: %w", err)
49-
}
45+
if input < len(inputs) {
46+
arg := reflect.New(argT).Interface()
47+
err := c.From(inputs[input], arg)
48+
if err != nil {
49+
return nil, false, fmt.Errorf("converting inputs: %w", err)
50+
}
5051

51-
args[i] = reflect.ValueOf(arg).Elem()
52+
args[i] = reflect.ValueOf(arg).Elem()
53+
}
5254

5355
input++
5456
}
5557

58+
if addContext {
59+
if (numArgs - 1) != len(inputs) {
60+
return nil, false, fmt.Errorf("mismatched argument count: expected %d, got %d", numArgs-1, len(inputs))
61+
}
62+
} else if numArgs != len(inputs) {
63+
return nil, false, fmt.Errorf("mismatched argument count: expected %d, got %d", numArgs, len(inputs))
64+
}
65+
5666
return args, addContext, nil
5767
}
5868

internal/args/args_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package args
2+
3+
import (
4+
"context"
5+
"reflect"
6+
"testing"
7+
8+
"github.com/cschleiden/go-workflows/internal/converter"
9+
"github.com/cschleiden/go-workflows/internal/payload"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestInputsToArgs(t *testing.T) {
14+
type args struct {
15+
fn interface{}
16+
inputs []interface{}
17+
}
18+
tests := []struct {
19+
name string
20+
args args
21+
addContext bool
22+
wantErr bool
23+
err string
24+
}{
25+
{
26+
name: "just context",
27+
args: args{
28+
fn: func(context.Context) error { return nil },
29+
inputs: []interface{}{},
30+
},
31+
addContext: true,
32+
},
33+
{
34+
name: "arguments with context",
35+
args: args{
36+
fn: func(context.Context, int, string) error { return nil },
37+
inputs: []interface{}{42, ""},
38+
},
39+
addContext: true,
40+
},
41+
{
42+
name: "mismatched argument count - too many",
43+
args: args{
44+
fn: func(int, string) error { return nil },
45+
inputs: []interface{}{42, "", 13},
46+
},
47+
wantErr: true,
48+
err: "mismatched argument count: expected 2, got 3",
49+
},
50+
{
51+
name: "mismatched argument count - too few",
52+
args: args{
53+
fn: func(int, string) error { return nil },
54+
inputs: []interface{}{42},
55+
},
56+
wantErr: true,
57+
err: "mismatched argument count: expected 2, got 1",
58+
},
59+
}
60+
for _, tt := range tests {
61+
t.Run(tt.name, func(t *testing.T) {
62+
inputs := make([]payload.Payload, 0)
63+
for _, input := range tt.args.inputs {
64+
p, err := converter.DefaultConverter.To(input)
65+
require.NoError(t, err)
66+
67+
inputs = append(inputs, p)
68+
}
69+
70+
args, addContext, err := InputsToArgs(converter.DefaultConverter, reflect.ValueOf(tt.args.fn), inputs)
71+
if (err != nil) != tt.wantErr {
72+
t.Errorf("InputsToArgs() error = %v, wantErr %v", err, tt.wantErr)
73+
return
74+
}
75+
if tt.wantErr {
76+
require.EqualError(t, err, tt.err)
77+
require.Equal(t, tt.addContext, addContext)
78+
} else {
79+
if addContext {
80+
// Skip the first argument, it will be filled with the context later
81+
args = args[1:]
82+
}
83+
84+
argValues := make([]interface{}, 0)
85+
for _, arg := range args {
86+
argValues = append(argValues, arg.Interface())
87+
}
88+
}
89+
})
90+
}
91+
}

internal/workflow/executor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func Test_ReplayWorkflowWithActivityResult(t *testing.T) {
133133
history.EventType_WorkflowExecutionStarted,
134134
&history.ExecutionStartedAttributes{
135135
Name: fn.Name(workflowWithActivity),
136-
Inputs: []payload.Payload{inputs},
136+
Inputs: []payload.Payload{},
137137
},
138138
),
139139
history.NewHistoryEvent(
@@ -332,7 +332,7 @@ func Test_ExecuteNewEvents(t *testing.T) {
332332
history.EventType_WorkflowExecutionStarted,
333333
&history.ExecutionStartedAttributes{
334334
Name: fn.Name(workflowWithActivity),
335-
Inputs: []payload.Payload{inputs},
335+
Inputs: []payload.Payload{},
336336
},
337337
),
338338
history.NewPendingEvent(

0 commit comments

Comments
 (0)