Skip to content

Commit 04a4da0

Browse files
authored
Merge pull request #383 from cschleiden/inline
Add single worker mode
2 parents 5cb923b + 84f4f50 commit 04a4da0

File tree

20 files changed

+400
-50
lines changed

20 files changed

+400
-50
lines changed

.vscode/settings.json

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,10 @@
88
"**/CVS": true,
99
"**/.DS_Store": true,
1010
"**/Thumbs.db": true,
11-
"**/vendor": true,
11+
"**/vendor": true
1212
},
13-
"github-actions.workflows.pinned.workflows": [
14-
".github/workflows/go.yml"
15-
],
13+
"github-actions.workflows.pinned.workflows": [],
1614
"go.testExplorer.showDynamicSubtestsInEditor": true,
1715
"go.lintTool": "golangci-lint",
18-
"Lua.diagnostics.globals": [
19-
"KEYS",
20-
"ARGV",
21-
"redis",
22-
"cjson"
23-
]
16+
"Lua.diagnostics.globals": ["KEYS", "ARGV", "redis", "cjson"]
2417
}

docs/source/index.html.md

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func Workflow1(ctx workflow.Context, input string) error {
5858
}
5959
```
6060

61-
Let's first write a simple workflows. Our workflow executes two _activities_ in sequence waiting for each result. Both workflows and activities are written in plain Go. Workflows can be long-running and have to be deterministic so that they can be interrupted and resumed. Activities are functions that can have side-effects and don't have to be deterministic.
61+
Let's first write a simple workflow. Our workflow executes two _activities_ in sequence waiting for each result. Both workflows and activities are written in plain Go. Workflows can be long-running and have to be deterministic so that they can be interrupted and resumed. Activities are functions that can have side-effects and don't have to be deterministic.
6262

6363
Both workflows and activities support arbitrary inputs and outputs as long as those are serializable.
6464

@@ -135,3 +135,71 @@ func main() {
135135
To finish the example, we create the backend, start a worker in a separate go-routine. We then create a `Client` instance which we then use to create a new _workflow instance_. A workflow instance is just one running instance of a previously registered workflow.
136136

137137
With the exception of the in-memory backend, we do not have to start the workflow from the same process the worker runs in, we could create the client from another process and create/wait for/cancel/... workflow instances from there.
138+
139+
## Using the WorkflowOrchestrator
140+
141+
For simpler scenarios where you don't need the separation between client and worker, you can use the `WorkflowOrchestrator` which combines both:
142+
143+
```go
144+
// Define the workflow with activities that will be auto-registered
145+
func AutoRegisteredWorkflow(ctx workflow.Context, input string) (string, error) {
146+
// When using WorkflowOrchestrator, activities are automatically registered
147+
// You can directly use any activity function without registering it first
148+
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1).Get(ctx)
149+
if err != nil {
150+
return "", err
151+
}
152+
153+
// Another activity that will be auto-registered
154+
r2, err := workflow.ExecuteActivity[string](ctx, workflow.DefaultActivityOptions, FormatResult, input, r1).Get(ctx)
155+
if err != nil {
156+
return "", err
157+
}
158+
159+
return r2, nil
160+
}
161+
162+
// Activities are automatically registered when using WorkflowOrchestrator
163+
func Activity1(ctx context.Context) (int, error) {
164+
return 42, nil
165+
}
166+
167+
func FormatResult(ctx context.Context, input string, value int) (string, error) {
168+
return fmt.Sprintf("Processed %s with result %d", input, value), nil
169+
}
170+
171+
func main() {
172+
ctx := context.Background()
173+
174+
b := sqlite.NewSqliteBackend("simple.sqlite")
175+
176+
// Create orchestrator instead of separate client and worker
177+
orchestrator := worker.NewWorkflowOrchestrator(b, nil)
178+
179+
// Workflows are automatically registered
180+
// Activities defined with InlineActivity don't need registration
181+
182+
// Start the orchestrator
183+
if err := orchestrator.Start(ctx); err != nil {
184+
panic("could not start orchestrator")
185+
}
186+
187+
// Create workflow instance
188+
wf, err := orchestrator.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
189+
InstanceID: uuid.NewString(),
190+
}, AutoRegisteredWorkflow, "input-for-workflow")
191+
if err != nil {
192+
panic("could not start workflow")
193+
}
194+
195+
// Get result directly
196+
result, err := client.GetWorkflowResult[string](ctx, orchestrator.Client, wf, 5*time.Second)
197+
if err != nil {
198+
panic("error getting workflow result: " + err.Error())
199+
}
200+
201+
fmt.Println("Workflow completed with result:", result)
202+
}
203+
```
204+
205+
The `WorkflowOrchestrator` provides automatic registration of workflows when passing workflow functions directly to `CreateWorkflowInstance`. For activities, the orchestrator automatically registers any activities used in the workflows via `ExecuteActivity` without explicit registration. Similarly, any sub-workflows created with `CreateSubWorkflowInstance` are registered automatically. This approach offers a unified API for workflow creation and execution in a single component, simplifying the development experience for simpler scenarios.

internal/contextvalue/registry.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package contextvalue
2+
3+
import (
4+
"github.com/cschleiden/go-workflows/internal/sync"
5+
"github.com/cschleiden/go-workflows/registry"
6+
)
7+
8+
type registryKey struct{}
9+
10+
func WithRegistry(ctx sync.Context, r *registry.Registry) sync.Context {
11+
return sync.WithValue(ctx, registryKey{}, r)
12+
}
13+
14+
func GetRegistry(ctx sync.Context) *registry.Registry {
15+
if v := ctx.Value(registryKey{}); v != nil {
16+
return v.(*registry.Registry)
17+
}
18+
19+
return nil
20+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package contextvalue
2+
3+
import (
4+
"github.com/cschleiden/go-workflows/internal/sync"
5+
)
6+
7+
type singleWorkerModeKey struct{}
8+
9+
// WithSingleWorkerMode sets the single worker mode flag in the context.
10+
// When this flag is set, activities and workflows will be automatically registered when needed.
11+
func WithSingleWorkerMode(ctx sync.Context) sync.Context {
12+
return sync.WithValue(ctx, singleWorkerModeKey{}, true)
13+
}
14+
15+
// IsSingleWorkerMode checks if the context is in single worker mode.
16+
func IsSingleWorkerMode(ctx sync.Context) bool {
17+
if v := ctx.Value(singleWorkerModeKey{}); v != nil {
18+
return v.(bool)
19+
}
20+
return false
21+
}

internal/worker/workflow.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type WorkflowWorkerOptions struct {
2727
WorkflowExecutorCache executor.Cache
2828
WorkflowExecutorCacheSize int
2929
WorkflowExecutorCacheTTL time.Duration
30+
SingleWorkerMode bool
3031
}
3132

3233
func NewWorkflowWorker(
@@ -39,20 +40,22 @@ func NewWorkflowWorker(
3940
}
4041

4142
tw := &WorkflowTaskWorker{
42-
backend: b,
43-
registry: registry,
44-
cache: options.WorkflowExecutorCache,
45-
logger: b.Options().Logger,
43+
backend: b,
44+
registry: registry,
45+
cache: options.WorkflowExecutorCache,
46+
logger: b.Options().Logger,
47+
singleWorkerMode: options.SingleWorkerMode,
4648
}
4749

4850
return NewWorker(b, tw, &options.WorkerOptions)
4951
}
5052

5153
type WorkflowTaskWorker struct {
52-
backend backend.Backend
53-
registry *registry.Registry
54-
cache executor.Cache
55-
logger *slog.Logger
54+
backend backend.Backend
55+
registry *registry.Registry
56+
cache executor.Cache
57+
logger *slog.Logger
58+
singleWorkerMode bool
5659
}
5760

5861
func (wtw *WorkflowTaskWorker) Start(ctx context.Context, queues []workflow.Queue) error {
@@ -181,6 +184,7 @@ func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.Workf
181184
t.Metadata,
182185
clock.New(),
183186
wtw.backend.Options().MaxHistorySize,
187+
wtw.singleWorkerMode,
184188
)
185189
if err != nil {
186190
return nil, fmt.Errorf("creating workflow task executor: %w", err)

registry/registry.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,30 @@ import (
88

99
"github.com/cschleiden/go-workflows/internal/args"
1010
"github.com/cschleiden/go-workflows/internal/fn"
11-
wf "github.com/cschleiden/go-workflows/workflow"
1211
)
1312

1413
type Registry struct {
14+
sealed bool
15+
1516
sync.Mutex
1617

17-
workflowMap map[string]wf.Workflow
18-
activityMap map[string]interface{}
18+
workflowMap map[string]any
19+
activityMap map[string]any
1920
}
2021

2122
// New creates a new registry instance.
2223
func New() *Registry {
2324
return &Registry{
24-
workflowMap: make(map[string]wf.Workflow),
25-
activityMap: make(map[string]interface{}),
25+
workflowMap: make(map[string]any),
26+
activityMap: make(map[string]any),
2627
}
2728
}
2829

2930
type registerConfig struct {
3031
Name string
3132
}
3233

33-
func (r *Registry) RegisterWorkflow(workflow wf.Workflow, opts ...RegisterOption) error {
34+
func (r *Registry) RegisterWorkflow(workflow any, opts ...RegisterOption) error {
3435
cfg := registerOptions(opts).applyRegisterOptions(registerConfig{})
3536
name := cfg.Name
3637
if name == "" {
@@ -75,7 +76,7 @@ func (r *Registry) RegisterWorkflow(workflow wf.Workflow, opts ...RegisterOption
7576
return nil
7677
}
7778

78-
func (r *Registry) RegisterActivity(activity wf.Activity, opts ...RegisterOption) error {
79+
func (r *Registry) RegisterActivity(activity any, opts ...RegisterOption) error {
7980
cfg := registerOptions(opts).applyRegisterOptions(registerConfig{})
8081

8182
t := reflect.TypeOf(activity)
@@ -151,7 +152,7 @@ func checkActivity(actType reflect.Type) error {
151152
return nil
152153
}
153154

154-
func (r *Registry) GetWorkflow(name string) (wf.Workflow, error) {
155+
func (r *Registry) GetWorkflow(name string) (any, error) {
155156
r.Lock()
156157
defer r.Unlock()
157158

registry/registry_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
"github.com/cschleiden/go-workflows/internal/fn"
88
"github.com/cschleiden/go-workflows/internal/sync"
9-
wf "github.com/cschleiden/go-workflows/workflow"
109
"github.com/stretchr/testify/require"
1110
)
1211

@@ -17,7 +16,7 @@ func reg_workflow1(ctx sync.Context) error {
1716
func TestRegistry_RegisterWorkflow(t *testing.T) {
1817
type args struct {
1918
name string
20-
workflow wf.Workflow
19+
workflow any
2120
}
2221
tests := []struct {
2322
name string

samples/orchestrator/main.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/client"
9+
"github.com/cschleiden/go-workflows/samples"
10+
"github.com/cschleiden/go-workflows/worker"
11+
"github.com/cschleiden/go-workflows/workflow"
12+
13+
"github.com/google/uuid"
14+
)
15+
16+
func main() {
17+
ctx, cancel := context.WithCancel(context.Background())
18+
defer cancel()
19+
20+
backend := samples.GetBackend("orchestrator")
21+
22+
orchestrator := worker.NewWorkflowOrchestrator(
23+
backend,
24+
nil,
25+
)
26+
27+
if err := orchestrator.Start(ctx); err != nil {
28+
panic("could not start orchestrator: " + err.Error())
29+
}
30+
31+
// Create instance ID
32+
instanceID := uuid.NewString()
33+
34+
// Create and run workflow using the orchestrator - no explicit registration needed
35+
instance, err := orchestrator.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
36+
InstanceID: instanceID,
37+
}, SimpleWorkflow, "Hello from orchestrator!")
38+
if err != nil {
39+
panic("could not create workflow instance: " + err.Error())
40+
}
41+
42+
// Wait for result using client package directly
43+
result, err := client.GetWorkflowResult[string](ctx, orchestrator.Client, instance, 10*time.Second)
44+
if err != nil {
45+
panic("error getting workflow result: " + err.Error())
46+
}
47+
48+
log.Printf("Workflow result: %s\n", result)
49+
50+
// Clean shutdown
51+
cancel()
52+
53+
if err := orchestrator.WaitForCompletion(); err != nil {
54+
panic("could not stop orchestrator: " + err.Error())
55+
}
56+
}
57+
58+
// SimpleWorkflow is a basic workflow that calls an activity and returns its result
59+
func SimpleWorkflow(ctx workflow.Context, message string) (string, error) {
60+
future := workflow.ExecuteActivity[string](ctx, workflow.DefaultActivityOptions, ProcessMessage, message)
61+
62+
result, err := future.Get(ctx)
63+
if err != nil {
64+
return "", err
65+
}
66+
67+
return result, nil
68+
}
69+
70+
// ProcessMessage is a simple activity that processes a message
71+
// When orchestrator mode is enabled, this will be automatically registered
72+
func ProcessMessage(ctx context.Context, message string) (string, error) {
73+
return message + " (processed by activity)", nil
74+
}
75+
76+
// Note: No separate activity function needed when using InlineActivity

tester/options.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ import (
99
)
1010

1111
type options struct {
12-
TestTimeout time.Duration
13-
Logger *slog.Logger
14-
Converter converter.Converter
15-
Propagators []workflow.ContextPropagator
16-
InitialTime time.Time
17-
MaxHistorySize int64
12+
TestTimeout time.Duration
13+
Logger *slog.Logger
14+
Converter converter.Converter
15+
Propagators []workflow.ContextPropagator
16+
InitialTime time.Time
17+
MaxHistorySize int64
18+
SingleWorkerMode bool
1819
}
1920

2021
type WorkflowTesterOption func(*options)
@@ -54,3 +55,9 @@ func WithMaxHistorySize(size int64) WorkflowTesterOption {
5455
o.MaxHistorySize = size
5556
}
5657
}
58+
59+
func WithSingleWorkerMode(enabled bool) WorkflowTesterOption {
60+
return func(o *options) {
61+
o.SingleWorkerMode = enabled
62+
}
63+
}

tester/tester.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) {
369369
tw.metadata,
370370
wt.clock,
371371
wt.options.MaxHistorySize,
372+
wt.options.SingleWorkerMode,
372373
)
373374
if err != nil {
374375
panic(fmt.Errorf("could not create workflow executor: %v", err))

0 commit comments

Comments
 (0)