Skip to content

Commit 8f44821

Browse files
authored
Merge pull request #245 from lovromazgon/fn-full
Register workflow and activity with name
2 parents fd3024e + 4a9b41a commit 8f44821

File tree

7 files changed

+144
-27
lines changed

7 files changed

+144
-27
lines changed

internal/fn/fn_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ import (
77
"github.com/stretchr/testify/require"
88
)
99

10-
type foo struct {
11-
}
10+
type foo struct{}
1211

1312
func (f *foo) DoSomething(ctx context.Context) error {
1413
return nil
@@ -23,7 +22,7 @@ var f foo
2322
func bar(_ int) {
2423
}
2524

26-
func Test_GetFunctionName(t *testing.T) {
25+
func Test_FuncName(t *testing.T) {
2726
tests := []struct {
2827
name string
2928
i interface{}
@@ -44,6 +43,11 @@ func Test_GetFunctionName(t *testing.T) {
4443
i: f.DoSomething,
4544
want: "DoSomething",
4645
},
46+
{
47+
name: "anonymous function",
48+
i: func() {},
49+
want: "func1",
50+
},
4751
}
4852
for _, tt := range tests {
4953
t.Run(tt.name, func(t *testing.T) {

internal/workflow/registry.go

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package workflow
22

33
import (
44
"errors"
5+
"fmt"
56
"reflect"
67
"sync"
78

@@ -33,6 +34,14 @@ func (e *ErrInvalidWorkflow) Error() string {
3334
return e.msg
3435
}
3536

37+
type ErrWorkflowAlreadyRegistered struct {
38+
msg string
39+
}
40+
41+
func (e *ErrWorkflowAlreadyRegistered) Error() string {
42+
return e.msg
43+
}
44+
3645
type ErrInvalidActivity struct {
3746
msg string
3847
}
@@ -41,7 +50,25 @@ func (e *ErrInvalidActivity) Error() string {
4150
return e.msg
4251
}
4352

44-
func (r *Registry) RegisterWorkflowByName(name string, workflow wf.Workflow) error {
53+
type ErrActivityAlreadyRegistered struct {
54+
msg string
55+
}
56+
57+
func (e *ErrActivityAlreadyRegistered) Error() string {
58+
return e.msg
59+
}
60+
61+
type RegisterConfig struct {
62+
Name string
63+
}
64+
65+
func (r *Registry) RegisterWorkflow(workflow wf.Workflow, opts ...RegisterOption) error {
66+
cfg := registerOptions(opts).applyRegisterOptions(RegisterConfig{})
67+
name := cfg.Name
68+
if name == "" {
69+
name = fn.Name(workflow)
70+
}
71+
4572
wfType := reflect.TypeOf(workflow)
4673
if wfType.Kind() != reflect.Func {
4774
return &ErrInvalidWorkflow{"workflow is not a function"}
@@ -72,17 +99,17 @@ func (r *Registry) RegisterWorkflowByName(name string, workflow wf.Workflow) err
7299
r.Lock()
73100
defer r.Unlock()
74101

102+
if _, ok := r.workflowMap[name]; ok {
103+
return &ErrWorkflowAlreadyRegistered{fmt.Sprintf("workflow with name %q already registered", name)}
104+
}
75105
r.workflowMap[name] = workflow
76106

77107
return nil
78108
}
79109

80-
func (r *Registry) RegisterWorkflow(workflow wf.Workflow) error {
81-
name := fn.Name(workflow)
82-
return r.RegisterWorkflowByName(name, workflow)
83-
}
110+
func (r *Registry) RegisterActivity(activity wf.Activity, opts ...RegisterOption) error {
111+
cfg := registerOptions(opts).applyRegisterOptions(RegisterConfig{})
84112

85-
func (r *Registry) RegisterActivityByName(name string, activity interface{}) error {
86113
t := reflect.TypeOf(activity)
87114

88115
// Activities on struct
@@ -91,27 +118,34 @@ func (r *Registry) RegisterActivityByName(name string, activity interface{}) err
91118
}
92119

93120
// Activity as function
121+
name := cfg.Name
122+
if name == "" {
123+
name = fn.Name(activity)
124+
}
125+
94126
if err := checkActivity(reflect.TypeOf(activity)); err != nil {
95127
return err
96128
}
97129

98130
r.Lock()
99131
defer r.Unlock()
100132

133+
if _, ok := r.activityMap[name]; ok {
134+
return &ErrActivityAlreadyRegistered{fmt.Sprintf("activity with name %q already registered", name)}
135+
}
101136
r.activityMap[name] = activity
102137

103138
return nil
104139
}
105140

106-
func (r *Registry) RegisterActivity(activity interface{}) error {
107-
name := fn.Name(activity)
108-
return r.RegisterActivityByName(name, activity)
109-
}
110-
111141
func (r *Registry) registerActivitiesFromStruct(a interface{}) error {
112142
// Enumerate functions defined on a
113143
v := reflect.ValueOf(a)
114144
t := v.Type()
145+
146+
r.Lock()
147+
defer r.Unlock()
148+
115149
for i := 0; i < v.NumMethod(); i++ {
116150
mv := v.Method(i)
117151
mt := t.Method(i)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package workflow
2+
3+
type RegisterOption interface {
4+
applyRegisterOption(RegisterConfig) RegisterConfig
5+
}
6+
7+
type registerOptions []RegisterOption
8+
9+
func (opts registerOptions) applyRegisterOptions(cfg RegisterConfig) RegisterConfig {
10+
for _, opt := range opts {
11+
cfg = opt.applyRegisterOption(cfg)
12+
}
13+
return cfg
14+
}
15+
16+
type registerOptionFunc func(RegisterConfig) RegisterConfig
17+
18+
func (f registerOptionFunc) applyRegisterOption(cfg RegisterConfig) RegisterConfig {
19+
return f(cfg)
20+
}
21+
22+
func WithName(name string) RegisterOption {
23+
return registerOptionFunc(func(cfg RegisterConfig) RegisterConfig {
24+
cfg.Name = name
25+
return cfg
26+
})
27+
}

internal/workflow/registry_test.go

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,8 @@ func TestRegistry_RegisterWorkflow(t *testing.T) {
8484
for _, tt := range tests {
8585
t.Run(tt.name, func(t *testing.T) {
8686
r := NewRegistry()
87-
var err error
8887

89-
if tt.args.name != "" {
90-
err = r.RegisterWorkflowByName(tt.args.name, tt.args.workflow)
91-
} else {
92-
err = r.RegisterWorkflow(tt.args.workflow)
93-
}
88+
err := r.RegisterWorkflow(tt.args.workflow, WithName(tt.args.name))
9489

9590
if (err != nil) != tt.wantErr {
9691
t.Errorf("Registry.RegisterWorkflow() error = %v, wantErr %v", err, tt.wantErr)
@@ -106,6 +101,25 @@ func TestRegistry_RegisterWorkflow(t *testing.T) {
106101
}
107102
}
108103

104+
func Test_RegisterWorkflow_Conflict(t *testing.T) {
105+
r := NewRegistry()
106+
require.NotNil(t, r)
107+
108+
var wantErr *ErrWorkflowAlreadyRegistered
109+
110+
err := r.RegisterWorkflow(reg_workflow1)
111+
require.NoError(t, err)
112+
113+
err = r.RegisterWorkflow(reg_workflow1)
114+
require.ErrorAs(t, err, &wantErr)
115+
116+
err = r.RegisterWorkflow(reg_workflow1, WithName("CustomName"))
117+
require.NoError(t, err)
118+
119+
err = r.RegisterWorkflow(reg_workflow1, WithName("CustomName"))
120+
require.ErrorAs(t, err, &wantErr)
121+
}
122+
109123
func reg_activity(ctx context.Context) error {
110124
return nil
111125
}
@@ -127,7 +141,7 @@ func Test_ActivityRegistration(t *testing.T) {
127141
err = fn(context.Background())
128142
require.NoError(t, err)
129143

130-
err = r.RegisterActivityByName("CustomName", reg_activity)
144+
err = r.RegisterActivity(reg_activity, WithName("CustomName"))
131145
require.NoError(t, err)
132146

133147
x, err = r.GetActivity("CustomName")
@@ -185,6 +199,25 @@ func Test_ActivityRegistrationOnStruct(t *testing.T) {
185199
require.Equal(t, "test", v)
186200
}
187201

202+
func Test_RegisterActivity_Conflict(t *testing.T) {
203+
r := NewRegistry()
204+
require.NotNil(t, r)
205+
206+
var wantErr *ErrActivityAlreadyRegistered
207+
208+
err := r.RegisterActivity(reg_activity)
209+
require.NoError(t, err)
210+
211+
err = r.RegisterActivity(reg_activity)
212+
require.ErrorAs(t, err, &wantErr)
213+
214+
err = r.RegisterActivity(reg_activity, WithName("CustomName"))
215+
require.NoError(t, err)
216+
217+
err = r.RegisterActivity(reg_activity, WithName("CustomName"))
218+
require.ErrorAs(t, err, &wantErr)
219+
}
220+
188221
type reg_invalid_activities struct {
189222
SomeValue string
190223
}

tester/tester.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ func (wt *workflowTester[TResult]) ListenSubWorkflow(listener func(*core.Workflo
252252

253253
func (wt *workflowTester[TResult]) OnActivityByName(name string, activity workflow.Activity, args ...any) *mock.Call {
254254
// Register activity so that we can correctly identify its arguments later
255-
wt.registry.RegisterActivityByName(name, activity)
255+
wt.registry.RegisterActivity(activity, wf.WithName(name))
256256

257257
wt.mockedActivities[name] = true
258258
return wt.ma.On(name, args...)
@@ -269,7 +269,7 @@ func (wt *workflowTester[TResult]) OnActivity(activity workflow.Activity, args .
269269

270270
func (wt *workflowTester[TResult]) OnSubWorkflowByName(name string, workflow workflow.Workflow, args ...any) *mock.Call {
271271
// Register workflow so that we can correctly identify its arguments later
272-
wt.registry.RegisterWorkflowByName(name, workflow)
272+
wt.registry.RegisterWorkflow(workflow, wf.WithName(name))
273273

274274
wt.mockedWorkflows[name] = true
275275
return wt.mw.On(name, args...)

worker/option.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package worker
2+
3+
import workflowinternal "github.com/cschleiden/go-workflows/internal/workflow"
4+
5+
type RegisterOption workflowinternal.RegisterOption
6+
7+
type registerOptions []RegisterOption
8+
9+
func (opts registerOptions) asInternalOptions() []workflowinternal.RegisterOption {
10+
repacked := make([]workflowinternal.RegisterOption, len(opts))
11+
for i, opt := range opts {
12+
repacked[i] = workflowinternal.RegisterOption(opt)
13+
}
14+
return repacked
15+
}
16+
17+
func WithName(name string) RegisterOption {
18+
return workflowinternal.WithName(name)
19+
}

worker/worker.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ func (w *Worker) WaitForCompletion() error {
9292
return nil
9393
}
9494

95-
func (w *Worker) RegisterWorkflow(wf workflow.Workflow) error {
96-
return w.registry.RegisterWorkflow(wf)
95+
func (w *Worker) RegisterWorkflow(wf workflow.Workflow, opts ...RegisterOption) error {
96+
return w.registry.RegisterWorkflow(wf, registerOptions(opts).asInternalOptions()...)
9797
}
9898

99-
func (w *Worker) RegisterActivity(a workflow.Activity) error {
100-
return w.registry.RegisterActivity(a)
99+
func (w *Worker) RegisterActivity(a workflow.Activity, opts ...RegisterOption) error {
100+
return w.registry.RegisterActivity(a, registerOptions(opts).asInternalOptions()...)
101101
}

0 commit comments

Comments
 (0)