Skip to content

Commit cd70aaa

Browse files
authored
Merge pull request #397 from cschleiden/remove-single-worker-mode
Remove single worker mode and auto registration
2 parents a9bc485 + 4b459c5 commit cd70aaa

File tree

17 files changed

+46
-212
lines changed

17 files changed

+46
-212
lines changed

.vscode/settings.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
// Some integration tests take quite long to run, increase the overall limit for now
3-
"go.testFlags": ["-timeout", "120s", "-race", "-count", "1"], // , "-short"],
3+
"go.testFlags": ["-timeout", "120s", "-race", "-count", "1" , "-short"],
44
"files.exclude": {
55
"**/.git": true,
66
"**/.svn": true,

docs/source/index.html.md

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,12 @@ With the exception of the in-memory backend, we do not have to start the workflo
141141
For simpler scenarios where you don't need the separation between client and worker, you can use the `WorkflowOrchestrator` which combines both:
142142

143143
```go
144-
// Define the workflow with activities that will be auto-registered
145-
func AutoRegisteredWorkflow(ctx workflow.Context, input string) (string, error) {
146-
// When using WorkflowOrchestrator, activities are automatically registered
147-
// You can directly use any activity function without registering it first
144+
func MyWorkflow(ctx workflow.Context, input string) (string, error) {
148145
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1).Get(ctx)
149146
if err != nil {
150147
return "", err
151148
}
152149

153-
// Another activity that will be auto-registered
154150
r2, err := workflow.ExecuteActivity[string](ctx, workflow.DefaultActivityOptions, FormatResult, input, r1).Get(ctx)
155151
if err != nil {
156152
return "", err
@@ -159,7 +155,6 @@ func AutoRegisteredWorkflow(ctx workflow.Context, input string) (string, error)
159155
return r2, nil
160156
}
161157

162-
// Activities are automatically registered when using WorkflowOrchestrator
163158
func Activity1(ctx context.Context) (int, error) {
164159
return 42, nil
165160
}
@@ -176,8 +171,10 @@ func main() {
176171
// Create orchestrator instead of separate client and worker
177172
orchestrator := worker.NewWorkflowOrchestrator(b, nil)
178173

179-
// Workflows are automatically registered
180-
// Activities defined with InlineActivity don't need registration
174+
// Register workflows and activities explicitly
175+
orchestrator.RegisterWorkflow(MyWorkflow)
176+
orchestrator.RegisterActivity(Activity1)
177+
orchestrator.RegisterActivity(FormatResult)
181178

182179
// Start the orchestrator
183180
if err := orchestrator.Start(ctx); err != nil {
@@ -187,7 +184,7 @@ func main() {
187184
// Create workflow instance
188185
wf, err := orchestrator.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
189186
InstanceID: uuid.NewString(),
190-
}, AutoRegisteredWorkflow, "input-for-workflow")
187+
}, MyWorkflow, "input-for-workflow")
191188
if err != nil {
192189
panic("could not start workflow")
193190
}
@@ -202,4 +199,4 @@ func main() {
202199
}
203200
```
204201

205-
The `WorkflowOrchestrator` provides automatic registration of workflows when passing workflow functions directly to `CreateWorkflowInstance`. For activities, the orchestrator automatically registers any activities used in the workflows via `ExecuteActivity` without explicit registration. Similarly, any sub-workflows created with `CreateSubWorkflowInstance` are registered automatically. This approach offers a unified API for workflow creation and execution in a single component, simplifying the development experience for simpler scenarios.
202+
The `WorkflowOrchestrator` provides a unified API for workflow creation and execution in a single component, combining both client and worker functionality. You need to explicitly register workflows and activities before starting the orchestrator, ensuring proper registration before any workflow or activity tasks are processed.

internal/contextvalue/singleworkermode.go

Lines changed: 0 additions & 21 deletions
This file was deleted.

internal/worker/workflow.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ type WorkflowWorkerOptions struct {
2727
WorkflowExecutorCache executor.Cache
2828
WorkflowExecutorCacheSize int
2929
WorkflowExecutorCacheTTL time.Duration
30-
SingleWorkerMode bool
3130
}
3231

3332
func NewWorkflowWorker(
@@ -40,22 +39,20 @@ func NewWorkflowWorker(
4039
}
4140

4241
tw := &WorkflowTaskWorker{
43-
backend: b,
44-
registry: registry,
45-
cache: options.WorkflowExecutorCache,
46-
logger: b.Options().Logger,
47-
singleWorkerMode: options.SingleWorkerMode,
42+
backend: b,
43+
registry: registry,
44+
cache: options.WorkflowExecutorCache,
45+
logger: b.Options().Logger,
4846
}
4947

5048
return NewWorker(b, tw, &options.WorkerOptions)
5149
}
5250

5351
type WorkflowTaskWorker struct {
54-
backend backend.Backend
55-
registry *registry.Registry
56-
cache executor.Cache
57-
logger *slog.Logger
58-
singleWorkerMode bool
52+
backend backend.Backend
53+
registry *registry.Registry
54+
cache executor.Cache
55+
logger *slog.Logger
5956
}
6057

6158
func (wtw *WorkflowTaskWorker) Start(ctx context.Context, queues []workflow.Queue) error {
@@ -184,7 +181,6 @@ func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.Workf
184181
t.Metadata,
185182
clock.New(),
186183
wtw.backend.Options().MaxHistorySize,
187-
wtw.singleWorkerMode,
188184
)
189185
if err != nil {
190186
return nil, fmt.Errorf("creating workflow task executor: %w", err)

samples/orchestrator/main.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,18 @@ func main() {
2525
nil,
2626
)
2727

28+
// Register workflows and activities explicitly
29+
orchestrator.RegisterWorkflow(SimpleWorkflow)
30+
orchestrator.RegisterActivity(ProcessMessage)
31+
2832
if err := orchestrator.Start(ctx); err != nil {
2933
panic("could not start orchestrator: " + err.Error())
3034
}
3135

3236
// Create instance ID
3337
instanceID := uuid.NewString()
3438

35-
// Create and run workflow using the orchestrator - no explicit registration needed
39+
// Create and run workflow using the orchestrator
3640
instance, err := orchestrator.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
3741
InstanceID: instanceID,
3842
}, SimpleWorkflow, "Hello from orchestrator!")
@@ -69,7 +73,6 @@ func SimpleWorkflow(ctx workflow.Context, message string) (string, error) {
6973
}
7074

7175
// ProcessMessage is a simple activity that processes a message
72-
// When orchestrator mode is enabled, this will be automatically registered
7376
func ProcessMessage(ctx context.Context, message string) (string, error) {
7477
return message + " (processed by activity)", nil
7578
}

samples/orchestrator/orchestrator

24.4 MB
Binary file not shown.

tester/tester.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,6 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) {
369369
tw.metadata,
370370
wt.clock,
371371
wt.options.MaxHistorySize,
372-
wt.options.SingleWorkerMode,
373372
)
374373
if err != nil {
375374
panic(fmt.Errorf("could not create workflow executor: %v", err))

worker/options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ type Options struct {
4242
WorkflowWorkerOptions
4343
ActivityWorkerOptions
4444

45-
// SingleWorkerMode enables automatic registration of workflows and activities
46-
// when they are used in workflows. This is useful for simple scenarios where
47-
// you don't want to explicitly register each workflow and activity.
45+
// SingleWorkerMode enables optimizations for scenarios where only a single worker
46+
// is processing tasks. This should only be enabled when you have exactly one worker
47+
// instance running.
4848
SingleWorkerMode bool
4949
}
5050

worker/worker.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func New(backend backend.Backend, options *Options) *Worker {
3535
options = &DefaultOptions
3636
}
3737

38-
workflowWorker := newWorkflowWorker(backend, registry, &options.WorkflowWorkerOptions, options.SingleWorkerMode)
38+
workflowWorker := newWorkflowWorker(backend, registry, &options.WorkflowWorkerOptions)
3939
activityWorker := newActivityWorker(backend, registry, &options.ActivityWorkerOptions)
4040

4141
// Register internal activities
@@ -46,7 +46,7 @@ func New(backend backend.Backend, options *Options) *Worker {
4646
func NewWorkflowWorker(backend backend.Backend, options *WorkflowWorkerOptions) *Worker {
4747
registry := registry.New()
4848

49-
return newWorker(backend, registry, []worker{newWorkflowWorker(backend, registry, options, false)})
49+
return newWorker(backend, registry, []worker{newWorkflowWorker(backend, registry, options)})
5050
}
5151

5252
// NewActivityWorker creates a worker that only processes activities.
@@ -94,7 +94,7 @@ func newActivityWorker(backend backend.Backend, registry *registry.Registry, opt
9494
return activityWorker
9595
}
9696

97-
func newWorkflowWorker(backend backend.Backend, registry *registry.Registry, options *WorkflowWorkerOptions, singleWorkerMode bool) worker {
97+
func newWorkflowWorker(backend backend.Backend, registry *registry.Registry, options *WorkflowWorkerOptions) worker {
9898
if options == nil {
9999
options = &DefaultOptions.WorkflowWorkerOptions
100100
}
@@ -110,7 +110,6 @@ func newWorkflowWorker(backend backend.Backend, registry *registry.Registry, opt
110110
WorkflowExecutorCache: options.WorkflowExecutorCache,
111111
WorkflowExecutorCacheSize: options.WorkflowExecutorCacheSize,
112112
WorkflowExecutorCacheTTL: options.WorkflowExecutorCacheTTL,
113-
SingleWorkerMode: singleWorkerMode,
114113
})
115114

116115
return workflowWorker

worker/workflow_orchestrator.go

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ package worker
22

33
import (
44
"context"
5-
"fmt"
65
"time"
76

87
"github.com/cschleiden/go-workflows/backend"
98
"github.com/cschleiden/go-workflows/client"
10-
"github.com/cschleiden/go-workflows/internal/fn"
119
"github.com/cschleiden/go-workflows/registry"
1210
"github.com/cschleiden/go-workflows/workflow"
1311
)
@@ -26,9 +24,8 @@ func NewWorkflowOrchestrator(backend backend.Backend, options *Options) *Workflo
2624
options = &DefaultOptions
2725
}
2826

29-
// Enable SingleWorkerMode automatically for the orchestrator
27+
// Create copy of options without auto-enabling SingleWorkerMode
3028
orchestratorOptions := *options
31-
orchestratorOptions.SingleWorkerMode = true
3229

3330
// Set default pollers to 1 for orchestrator mode (unless explicitly overridden)
3431
if orchestratorOptions.WorkflowPollers == DefaultOptions.WorkflowPollers {
@@ -41,8 +38,8 @@ func NewWorkflowOrchestrator(backend backend.Backend, options *Options) *Workflo
4138
// Create registry that will be shared between worker and orchestrator
4239
reg := registry.New()
4340

44-
// Create a regular worker with the registry and SingleWorkerMode enabled
45-
workflowWorker := newWorkflowWorker(backend, reg, &orchestratorOptions.WorkflowWorkerOptions, orchestratorOptions.SingleWorkerMode)
41+
// Create a regular worker with the registry
42+
workflowWorker := newWorkflowWorker(backend, reg, &orchestratorOptions.WorkflowWorkerOptions)
4643
activityWorker := newActivityWorker(backend, reg, &orchestratorOptions.ActivityWorkerOptions)
4744
w := newWorker(backend, reg, []worker{workflowWorker, activityWorker})
4845
c := client.New(backend)
@@ -57,7 +54,7 @@ func NewWorkflowOrchestrator(backend backend.Backend, options *Options) *Workflo
5754
return orchestrator
5855
}
5956

60-
// Start starts the worker with single worker mode enabled.
57+
// Start starts the worker.
6158
func (o *WorkflowOrchestrator) Start(ctx context.Context) error {
6259
return o.worker.Start(ctx)
6360
}
@@ -68,21 +65,7 @@ func (o *WorkflowOrchestrator) WaitForCompletion() error {
6865
}
6966

7067
// CreateWorkflowInstance creates a new workflow instance using the client.
71-
// Automatically registers the workflow if it's not already registered.
7268
func (o *WorkflowOrchestrator) CreateWorkflowInstance(ctx context.Context, options client.WorkflowInstanceOptions, wf workflow.Workflow, args ...any) (*workflow.Instance, error) {
73-
// Check if the workflow is a function (not a string name) and register it if needed
74-
if _, ok := wf.(string); !ok {
75-
// It's a function reference, try to register it if not already registered
76-
name := fn.Name(wf)
77-
_, err := o.registry.GetWorkflow(name)
78-
if err != nil {
79-
// Workflow not found in registry, register it directly
80-
if err := o.worker.RegisterWorkflow(wf); err != nil {
81-
return nil, fmt.Errorf("auto-registering workflow %s: %w", name, err)
82-
}
83-
}
84-
}
85-
8669
return o.Client.CreateWorkflowInstance(ctx, options, wf, args...)
8770
}
8871

@@ -99,6 +82,16 @@ func (o *WorkflowOrchestrator) SignalWorkflow(ctx context.Context, instanceID st
9982
return o.Client.SignalWorkflow(ctx, instanceID, name, arg)
10083
}
10184

85+
// RegisterWorkflow registers a workflow with the orchestrator's registry.
86+
func (o *WorkflowOrchestrator) RegisterWorkflow(wf workflow.Workflow, opts ...registry.RegisterOption) error {
87+
return o.worker.RegisterWorkflow(wf, opts...)
88+
}
89+
90+
// RegisterActivity registers an activity with the orchestrator's registry.
91+
func (o *WorkflowOrchestrator) RegisterActivity(a workflow.Activity, opts ...registry.RegisterOption) error {
92+
return o.worker.RegisterActivity(a, opts...)
93+
}
94+
10295
// RemoveWorkflowInstance removes a workflow instance.
10396
func (o *WorkflowOrchestrator) RemoveWorkflowInstance(ctx context.Context, instance *workflow.Instance) error {
10497
return o.Client.RemoveWorkflowInstance(ctx, instance)

0 commit comments

Comments
 (0)