Skip to content

Commit d7edac9

Browse files
authored
Merge pull request #294 from cschleiden/workflow-activity-by-name
Execute workflows by (custom) name
2 parents b9d0260 + 0d6a2e4 commit d7edac9

File tree

8 files changed

+222
-20
lines changed

8 files changed

+222
-20
lines changed

.vscode/launch.json

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,25 @@
33
// Hover to view descriptions of existing attributes.
44
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
55
"version": "0.2.0",
6+
"inputs": [
7+
{
8+
"id": "backend",
9+
"type": "pickString",
10+
"description": "Choose a backend",
11+
"default": "memory",
12+
"options": ["mysql", "sqlite", "redis", "memory"]
13+
}
14+
],
615
"configurations": [
7-
816
{
917
"name": "Current sample",
1018
"type": "go",
1119
"request": "launch",
1220
"mode": "debug",
13-
"program": "${fileDirname}"
21+
"program": "${fileDirname}",
22+
"args": [
23+
"-backend", "${input:backend}",
24+
]
1425
},
1526
{
1627
"name": "Launch simple sample",

client/client.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,17 @@ func New(backend backend.Backend) *Client {
4444

4545
// CreateWorkflowInstance creates a new workflow instance of the given workflow.
4646
func (c *Client) CreateWorkflowInstance(ctx context.Context, options WorkflowInstanceOptions, wf workflow.Workflow, args ...any) (*workflow.Instance, error) {
47-
// Check arguments
48-
if err := a.ParamsMatch(wf, args...); err != nil {
49-
return nil, err
47+
var workflowName string
48+
49+
if name, ok := wf.(string); ok {
50+
workflowName = name
51+
} else {
52+
workflowName = fn.Name(wf)
53+
54+
// Check arguments if actual workflow function given here
55+
if err := a.ParamsMatch(wf, args...); err != nil {
56+
return nil, err
57+
}
5058
}
5159

5260
inputs, err := a.ArgsToInputs(c.backend.Converter(), args...)
@@ -57,8 +65,6 @@ func (c *Client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
5765
wfi := core.NewWorkflowInstance(options.InstanceID, uuid.NewString())
5866
metadata := &workflow.Metadata{}
5967

60-
workflowName := fn.Name(wf)
61-
6268
// Start new span for the workflow instance
6369
ctx, span := c.backend.Tracer().Start(ctx, fmt.Sprintf("CreateWorkflowInstance: %s", workflowName), trace.WithAttributes(
6470
attribute.String(log.InstanceIDKey, wfi.InstanceID),

client/client_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cschleiden/go-workflows/backend/converter"
1313
"github.com/cschleiden/go-workflows/backend/history"
1414
"github.com/cschleiden/go-workflows/core"
15+
"github.com/cschleiden/go-workflows/internal/metrics"
1516
"github.com/cschleiden/go-workflows/workflow"
1617
"github.com/google/uuid"
1718
"github.com/stretchr/testify/mock"
@@ -40,6 +41,38 @@ func Test_Client_CreateWorkflowInstance_ParamMismatch(t *testing.T) {
4041
b.AssertExpectations(t)
4142
}
4243

44+
func Test_Client_CreateWorkflowInstance_NameGiven(t *testing.T) {
45+
ctx := context.Background()
46+
47+
b := &backend.MockBackend{}
48+
b.On("Converter").Return(converter.DefaultConverter)
49+
b.On("Logger").Return(slog.Default())
50+
b.On("Tracer").Return(trace.NewNoopTracerProvider().Tracer("test"))
51+
b.On("Metrics").Return(metrics.NewNoopMetricsClient())
52+
b.On("ContextPropagators").Return(nil)
53+
b.On("CreateWorkflowInstance", mock.Anything, mock.Anything, mock.MatchedBy(func(event *history.Event) bool {
54+
if event.Type != history.EventType_WorkflowExecutionStarted {
55+
return false
56+
}
57+
58+
a := event.Attributes.(*history.ExecutionStartedAttributes)
59+
60+
return a.Name == "workflowName"
61+
})).Return(nil, nil)
62+
63+
c := &Client{
64+
backend: b,
65+
clock: clock.New(),
66+
}
67+
68+
result, err := c.CreateWorkflowInstance(ctx, WorkflowInstanceOptions{
69+
InstanceID: "id",
70+
}, "workflowName", "foo")
71+
require.NotZero(t, result)
72+
require.NoError(t, err)
73+
b.AssertExpectations(t)
74+
}
75+
4376
func Test_Client_GetWorkflowResultTimeout(t *testing.T) {
4477
instance := core.NewWorkflowInstance(uuid.NewString(), "test")
4578

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package main
2+
3+
import (
4+
"context"
5+
6+
"github.com/cschleiden/go-workflows/activity"
7+
"github.com/cschleiden/go-workflows/workflow"
8+
)
9+
10+
func Workflow1(ctx workflow.Context, msg string) (int, error) {
11+
logger := workflow.Logger(ctx)
12+
logger.Debug("Entering Workflow1", "msg", msg)
13+
defer logger.Debug("Leaving Workflow1")
14+
15+
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
16+
if err != nil {
17+
panic("error getting activity 1 result")
18+
}
19+
logger.Debug("R1 result", "r1", r1)
20+
21+
return r1, nil
22+
}
23+
24+
func SubWorkflow1(ctx workflow.Context, msg string) (int, error) {
25+
return 42, nil
26+
}
27+
28+
func Activity1(ctx context.Context, a, b int) (int, error) {
29+
logger := activity.Logger(ctx)
30+
logger.Debug("Entering Activity1")
31+
defer logger.Debug("Leaving Activity1")
32+
33+
return a + b, nil
34+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/backend"
9+
"github.com/cschleiden/go-workflows/client"
10+
"github.com/cschleiden/go-workflows/internal/workflow"
11+
"github.com/cschleiden/go-workflows/samples"
12+
"github.com/cschleiden/go-workflows/worker"
13+
14+
"github.com/google/uuid"
15+
)
16+
17+
func main() {
18+
ctx, cancel := context.WithCancel(context.Background())
19+
20+
b := samples.GetBackend("simple")
21+
22+
// Run worker
23+
w := RunWorker(ctx, b)
24+
25+
// Start workflow via client
26+
c := client.New(b)
27+
28+
runWorkflow(ctx, c)
29+
30+
cancel()
31+
32+
if err := w.WaitForCompletion(); err != nil {
33+
panic("could not stop worker" + err.Error())
34+
}
35+
}
36+
37+
func runWorkflow(ctx context.Context, c *client.Client) {
38+
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
39+
InstanceID: uuid.NewString(),
40+
}, "WorkflowName", "Hello world")
41+
if err != nil {
42+
log.Fatal(err)
43+
panic("could not start workflow")
44+
}
45+
46+
result, err := client.GetWorkflowResult[int](ctx, c, wf, time.Second*10)
47+
if err != nil {
48+
log.Fatal(err)
49+
}
50+
51+
log.Println("Workflow finished. Result:", result)
52+
}
53+
54+
func RunWorker(ctx context.Context, mb backend.Backend) *worker.Worker {
55+
w := worker.New(mb, nil)
56+
57+
w.RegisterWorkflow(Workflow1, workflow.WithName("WorkflowName"))
58+
w.RegisterWorkflow(SubWorkflow1, workflow.WithName("SubWorkflowName"))
59+
60+
w.RegisterActivity(Activity1)
61+
62+
if err := w.Start(ctx); err != nil {
63+
panic("could not start worker")
64+
}
65+
66+
return w
67+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/cschleiden/go-workflows/tester"
8+
"github.com/stretchr/testify/mock"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func Test_Workflow(t *testing.T) {
13+
tester := tester.NewWorkflowTester[int](Workflow1)
14+
15+
tester.OnSubWorkflowByName("SubWorkflowName", mock.Anything, "Hello world").Return(42, nil)
16+
17+
tester.OnActivity(Activity1, mock.Anything, 35, 12).Return(47, nil)
18+
19+
tester.Execute(context.Background(), "Hello world")
20+
21+
require.True(t, tester.WorkflowFinished())
22+
23+
wr, werr := tester.WorkflowResult()
24+
require.Equal(t, 47, wr)
25+
require.Empty(t, werr)
26+
tester.AssertExpectations(t)
27+
}

workflow/subworkflow.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,24 @@ func createSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOpti
5050
}
5151

5252
// Check return type
53-
if err := a.ReturnTypeMatch[TResult](wf); err != nil {
54-
f.Set(*new(TResult), err)
55-
return f
56-
}
53+
var workflowName string
54+
if name, ok := wf.(string); ok {
55+
workflowName = name
56+
} else {
57+
workflowName = fn.Name(wf)
58+
59+
if err := a.ReturnTypeMatch[TResult](wf); err != nil {
60+
f.Set(*new(TResult), err)
61+
return f
62+
}
5763

58-
// Check arguments
59-
if err := a.ParamsMatch(wf, args...); err != nil {
60-
f.Set(*new(TResult), err)
61-
return f
64+
// Check arguments
65+
if err := a.ParamsMatch(wf, args...); err != nil {
66+
f.Set(*new(TResult), err)
67+
return f
68+
}
6269
}
6370

64-
name := fn.Name(wf)
65-
6671
cv := contextvalue.Converter(ctx)
6772
inputs, err := a.ArgsToInputs(cv, args...)
6873
if err != nil {
@@ -74,9 +79,9 @@ func createSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOpti
7479
scheduleEventID := wfState.GetNextScheduleEventID()
7580

7681
ctx, span := workflowtracer.Tracer(ctx).Start(ctx,
77-
fmt.Sprintf("CreateSubworkflowInstance: %s", name),
82+
fmt.Sprintf("CreateSubworkflowInstance: %s", workflowName),
7883
trace.WithAttributes(
79-
attribute.String(log.WorkflowNameKey, name),
84+
attribute.String(log.WorkflowNameKey, workflowName),
8085
attribute.Int64(log.ScheduleEventIDKey, scheduleEventID),
8186
attribute.Int(log.AttemptKey, attempt),
8287
))
@@ -90,7 +95,7 @@ func createSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOpti
9095
return f
9196
}
9297

93-
cmd := command.NewScheduleSubWorkflowCommand(scheduleEventID, wfState.Instance(), options.InstanceID, name, inputs, metadata)
98+
cmd := command.NewScheduleSubWorkflowCommand(scheduleEventID, wfState.Instance(), options.InstanceID, workflowName, inputs, metadata)
9499

95100
wfState.AddCommand(cmd)
96101
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, f))

workflow/subworkflow_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,25 @@ import (
1515
"go.opentelemetry.io/otel/trace"
1616
)
1717

18+
func Test_createSubWorkflowInstance_NameAsString(t *testing.T) {
19+
ctx := sync.Background()
20+
ctx = contextvalue.WithConverter(ctx, converter.DefaultConverter)
21+
ctx = workflowstate.WithWorkflowState(
22+
ctx,
23+
workflowstate.NewWorkflowState(core.NewWorkflowInstance("a", ""), slog.Default(), clock.New()),
24+
)
25+
ctx = workflowtracer.WithWorkflowTracer(ctx, workflowtracer.New(trace.NewNoopTracerProvider().Tracer("test")))
26+
27+
c := sync.NewCoroutine(ctx, func(ctx Context) error {
28+
createSubWorkflowInstance[int](ctx, DefaultSubWorkflowOptions, 1, "workflowName", "foo")
29+
return nil
30+
})
31+
32+
c.Execute()
33+
require.NoError(t, c.Error())
34+
require.True(t, c.Finished())
35+
}
36+
1837
func Test_createSubWorkflowInstance_ParamMismatch(t *testing.T) {
1938
wf := func(Context, int) (int, error) {
2039
return 42, nil

0 commit comments

Comments
 (0)