Skip to content

Commit a004357

Browse files
committed
Allow starting workflows by name
1 parent df5cdca commit a004357

File tree

4 files changed

+81
-18
lines changed

4 files changed

+81
-18
lines changed

client/client.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,17 @@ func New(backend backend.Backend) *Client {
4444

4545
// CreateWorkflowInstance creates a new workflow instance of the given workflow.
4646
func (c *Client) CreateWorkflowInstance(ctx context.Context, options WorkflowInstanceOptions, wf workflow.Workflow, args ...any) (*workflow.Instance, error) {
47-
// Check arguments
48-
if err := a.ParamsMatch(wf, args...); err != nil {
49-
return nil, err
47+
var workflowName string
48+
49+
if name, ok := wf.(string); ok {
50+
workflowName = name
51+
} else {
52+
workflowName = fn.Name(wf)
53+
54+
// Check arguments if actual workflow function given here
55+
if err := a.ParamsMatch(wf, args...); err != nil {
56+
return nil, err
57+
}
5058
}
5159

5260
inputs, err := a.ArgsToInputs(c.backend.Converter(), args...)
@@ -57,8 +65,6 @@ func (c *Client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
5765
wfi := core.NewWorkflowInstance(options.InstanceID, uuid.NewString())
5866
metadata := &workflow.Metadata{}
5967

60-
workflowName := fn.Name(wf)
61-
6268
// Start new span for the workflow instance
6369
ctx, span := c.backend.Tracer().Start(ctx, fmt.Sprintf("CreateWorkflowInstance: %s", workflowName), trace.WithAttributes(
6470
attribute.String(log.InstanceIDKey, wfi.InstanceID),

client/client_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cschleiden/go-workflows/backend/converter"
1313
"github.com/cschleiden/go-workflows/backend/history"
1414
"github.com/cschleiden/go-workflows/core"
15+
"github.com/cschleiden/go-workflows/internal/metrics"
1516
"github.com/cschleiden/go-workflows/workflow"
1617
"github.com/google/uuid"
1718
"github.com/stretchr/testify/mock"
@@ -40,6 +41,38 @@ func Test_Client_CreateWorkflowInstance_ParamMismatch(t *testing.T) {
4041
b.AssertExpectations(t)
4142
}
4243

44+
func Test_Client_CreateWorkflowInstance_NameGiven(t *testing.T) {
45+
ctx := context.Background()
46+
47+
b := &backend.MockBackend{}
48+
b.On("Converter").Return(converter.DefaultConverter)
49+
b.On("Logger").Return(slog.Default())
50+
b.On("Tracer").Return(trace.NewNoopTracerProvider().Tracer("test"))
51+
b.On("Metrics").Return(metrics.NewNoopMetricsClient())
52+
b.On("ContextPropagators").Return(nil)
53+
b.On("CreateWorkflowInstance", mock.Anything, mock.Anything, mock.MatchedBy(func(event *history.Event) bool {
54+
if event.Type != history.EventType_WorkflowExecutionStarted {
55+
return false
56+
}
57+
58+
a := event.Attributes.(*history.ExecutionStartedAttributes)
59+
60+
return a.Name == "workflowName"
61+
})).Return(nil, nil)
62+
63+
c := &Client{
64+
backend: b,
65+
clock: clock.New(),
66+
}
67+
68+
result, err := c.CreateWorkflowInstance(ctx, WorkflowInstanceOptions{
69+
InstanceID: "id",
70+
}, "workflowName", "foo")
71+
require.NotZero(t, result)
72+
require.NoError(t, err)
73+
b.AssertExpectations(t)
74+
}
75+
4376
func Test_Client_GetWorkflowResultTimeout(t *testing.T) {
4477
instance := core.NewWorkflowInstance(uuid.NewString(), "test")
4578

workflow/subworkflow.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,24 @@ func createSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOpti
5050
}
5151

5252
// Check return type
53-
if err := a.ReturnTypeMatch[TResult](wf); err != nil {
54-
f.Set(*new(TResult), err)
55-
return f
56-
}
53+
var workflowName string
54+
if name, ok := wf.(string); ok {
55+
workflowName = name
56+
} else {
57+
workflowName = fn.Name(wf)
58+
59+
if err := a.ReturnTypeMatch[TResult](wf); err != nil {
60+
f.Set(*new(TResult), err)
61+
return f
62+
}
5763

58-
// Check arguments
59-
if err := a.ParamsMatch(wf, args...); err != nil {
60-
f.Set(*new(TResult), err)
61-
return f
64+
// Check arguments
65+
if err := a.ParamsMatch(wf, args...); err != nil {
66+
f.Set(*new(TResult), err)
67+
return f
68+
}
6269
}
6370

64-
name := fn.Name(wf)
65-
6671
cv := contextvalue.Converter(ctx)
6772
inputs, err := a.ArgsToInputs(cv, args...)
6873
if err != nil {
@@ -74,9 +79,9 @@ func createSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOpti
7479
scheduleEventID := wfState.GetNextScheduleEventID()
7580

7681
ctx, span := workflowtracer.Tracer(ctx).Start(ctx,
77-
fmt.Sprintf("CreateSubworkflowInstance: %s", name),
82+
fmt.Sprintf("CreateSubworkflowInstance: %s", workflowName),
7883
trace.WithAttributes(
79-
attribute.String(log.WorkflowNameKey, name),
84+
attribute.String(log.WorkflowNameKey, workflowName),
8085
attribute.Int64(log.ScheduleEventIDKey, scheduleEventID),
8186
attribute.Int(log.AttemptKey, attempt),
8287
))
@@ -90,7 +95,7 @@ func createSubWorkflowInstance[TResult any](ctx Context, options SubWorkflowOpti
9095
return f
9196
}
9297

93-
cmd := command.NewScheduleSubWorkflowCommand(scheduleEventID, wfState.Instance(), options.InstanceID, name, inputs, metadata)
98+
cmd := command.NewScheduleSubWorkflowCommand(scheduleEventID, wfState.Instance(), options.InstanceID, workflowName, inputs, metadata)
9499

95100
wfState.AddCommand(cmd)
96101
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(cv, f))

workflow/subworkflow_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,25 @@ import (
1515
"go.opentelemetry.io/otel/trace"
1616
)
1717

18+
func Test_createSubWorkflowInstance_NameAsString(t *testing.T) {
19+
ctx := sync.Background()
20+
ctx = contextvalue.WithConverter(ctx, converter.DefaultConverter)
21+
ctx = workflowstate.WithWorkflowState(
22+
ctx,
23+
workflowstate.NewWorkflowState(core.NewWorkflowInstance("a", ""), slog.Default(), clock.New()),
24+
)
25+
ctx = workflowtracer.WithWorkflowTracer(ctx, workflowtracer.New(trace.NewNoopTracerProvider().Tracer("test")))
26+
27+
c := sync.NewCoroutine(ctx, func(ctx Context) error {
28+
createSubWorkflowInstance[int](ctx, DefaultSubWorkflowOptions, 1, "workflowName", "foo")
29+
return nil
30+
})
31+
32+
c.Execute()
33+
require.NoError(t, c.Error())
34+
require.True(t, c.Finished())
35+
}
36+
1837
func Test_createSubWorkflowInstance_ParamMismatch(t *testing.T) {
1938
wf := func(Context, int) (int, error) {
2039
return 42, nil

0 commit comments

Comments
 (0)