Skip to content

Commit 35ed2b6

Browse files
committed
Add single worker mode
1 parent ae35998 commit 35ed2b6

File tree

20 files changed

+455
-40
lines changed

20 files changed

+455
-40
lines changed

docs/source/index.html.md

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func Workflow1(ctx workflow.Context, input string) error {
5858
}
5959
```
6060

61-
Let's first write a simple workflows. Our workflow executes two _activities_ in sequence waiting for each result. Both workflows and activities are written in plain Go. Workflows can be long-running and have to be deterministic so that they can be interrupted and resumed. Activities are functions that can have side-effects and don't have to be deterministic.
61+
Let's first write a simple workflow. Our workflow executes two _activities_ in sequence waiting for each result. Both workflows and activities are written in plain Go. Workflows can be long-running and have to be deterministic so that they can be interrupted and resumed. Activities are functions that can have side-effects and don't have to be deterministic.
6262

6363
Both workflows and activities support arbitrary inputs and outputs as long as those are serializable.
6464

@@ -135,3 +135,71 @@ func main() {
135135
To finish the example, we create the backend, start a worker in a separate go-routine. We then create a `Client` instance which we then use to create a new _workflow instance_. A workflow instance is just one running instance of a previously registered workflow.
136136

137137
With the exception of the in-memory backend, we do not have to start the workflow from the same process the worker runs in, we could create the client from another process and create/wait for/cancel/... workflow instances from there.
138+
139+
## Using the WorkflowOrchestrator
140+
141+
For simpler scenarios where you don't need the separation between client and worker, you can use the `WorkflowOrchestrator` which combines both:
142+
143+
```go
144+
// Define the workflow with activities that will be auto-registered
145+
func AutoRegisteredWorkflow(ctx workflow.Context, input string) (string, error) {
146+
// When using WorkflowOrchestrator, activities are automatically registered
147+
// You can directly use any activity function without registering it first
148+
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1).Get(ctx)
149+
if err != nil {
150+
return "", err
151+
}
152+
153+
// Another activity that will be auto-registered
154+
r2, err := workflow.ExecuteActivity[string](ctx, workflow.DefaultActivityOptions, FormatResult, input, r1).Get(ctx)
155+
if err != nil {
156+
return "", err
157+
}
158+
159+
return r2, nil
160+
}
161+
162+
// Activities are automatically registered when using WorkflowOrchestrator
163+
func Activity1(ctx context.Context) (int, error) {
164+
return 42, nil
165+
}
166+
167+
func FormatResult(ctx context.Context, input string, value int) (string, error) {
168+
return fmt.Sprintf("Processed %s with result %d", input, value), nil
169+
}
170+
171+
func main() {
172+
ctx := context.Background()
173+
174+
b := sqlite.NewSqliteBackend("simple.sqlite")
175+
176+
// Create orchestrator instead of separate client and worker
177+
orchestrator := worker.NewWorkflowOrchestrator(b, nil)
178+
179+
// Workflows are automatically registered
180+
// Activities defined with InlineActivity don't need registration
181+
182+
// Start the orchestrator
183+
if err := orchestrator.Start(ctx); err != nil {
184+
panic("could not start orchestrator")
185+
}
186+
187+
// Create workflow instance
188+
wf, err := orchestrator.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
189+
InstanceID: uuid.NewString(),
190+
}, AutoRegisteredWorkflow, "input-for-workflow")
191+
if err != nil {
192+
panic("could not start workflow")
193+
}
194+
195+
// Get result directly
196+
result, err := client.GetWorkflowResult[string](ctx, orchestrator.Client, wf, 5*time.Second)
197+
if err != nil {
198+
panic("error getting workflow result: " + err.Error())
199+
}
200+
201+
fmt.Println("Workflow completed with result:", result)
202+
}
203+
```
204+
205+
The `WorkflowOrchestrator` provides automatic registration of workflows when passing workflow functions directly to `CreateWorkflowInstance`. For activities, the orchestrator automatically registers any activities used in the workflows via `ExecuteActivity` without explicit registration. Similarly, any sub-workflows created with `CreateSubWorkflowInstance` are registered automatically. This approach offers a unified API for workflow creation and execution in a single component, simplifying the development experience for simpler scenarios.

internal/contextvalue/registry.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package contextvalue
2+
3+
import (
4+
"github.com/cschleiden/go-workflows/internal/sync"
5+
"github.com/cschleiden/go-workflows/registry"
6+
)
7+
8+
type registryKey struct{}
9+
10+
func WithRegistry(ctx sync.Context, r *registry.Registry) sync.Context {
11+
return sync.WithValue(ctx, registryKey{}, r)
12+
}
13+
14+
func GetRegistry(ctx sync.Context) *registry.Registry {
15+
if v := ctx.Value(registryKey{}); v != nil {
16+
return v.(*registry.Registry)
17+
}
18+
19+
return nil
20+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package contextvalue
2+
3+
import (
4+
"github.com/cschleiden/go-workflows/internal/sync"
5+
)
6+
7+
type singleWorkerModeKey struct{}
8+
9+
// WithSingleWorkerMode sets the single worker mode flag in the context.
10+
// When this flag is set, activities and workflows will be automatically registered when needed.
11+
func WithSingleWorkerMode(ctx sync.Context) sync.Context {
12+
return sync.WithValue(ctx, singleWorkerModeKey{}, true)
13+
}
14+
15+
// IsSingleWorkerMode checks if the context is in single worker mode.
16+
func IsSingleWorkerMode(ctx sync.Context) bool {
17+
if v := ctx.Value(singleWorkerModeKey{}); v != nil {
18+
return v.(bool)
19+
}
20+
return false
21+
}

internal/worker/workflow.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type WorkflowWorkerOptions struct {
2727
WorkflowExecutorCache executor.Cache
2828
WorkflowExecutorCacheSize int
2929
WorkflowExecutorCacheTTL time.Duration
30+
SingleWorkerMode bool
3031
}
3132

3233
func NewWorkflowWorker(
@@ -39,20 +40,22 @@ func NewWorkflowWorker(
3940
}
4041

4142
tw := &WorkflowTaskWorker{
42-
backend: b,
43-
registry: registry,
44-
cache: options.WorkflowExecutorCache,
45-
logger: b.Options().Logger,
43+
backend: b,
44+
registry: registry,
45+
cache: options.WorkflowExecutorCache,
46+
logger: b.Options().Logger,
47+
singleWorkerMode: options.SingleWorkerMode,
4648
}
4749

4850
return NewWorker(b, tw, &options.WorkerOptions)
4951
}
5052

5153
type WorkflowTaskWorker struct {
52-
backend backend.Backend
53-
registry *registry.Registry
54-
cache executor.Cache
55-
logger *slog.Logger
54+
backend backend.Backend
55+
registry *registry.Registry
56+
cache executor.Cache
57+
logger *slog.Logger
58+
singleWorkerMode bool
5659
}
5760

5861
func (wtw *WorkflowTaskWorker) Start(ctx context.Context, queues []workflow.Queue) error {
@@ -181,6 +184,7 @@ func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.Workf
181184
t.Metadata,
182185
clock.New(),
183186
wtw.backend.Options().MaxHistorySize,
187+
wtw.singleWorkerMode,
184188
)
185189
if err != nil {
186190
return nil, fmt.Errorf("creating workflow task executor: %w", err)

registry/auto_registry.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package registry
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
// AutoRegisteringRegistry extends the regular Registry with auto-registration capabilities
8+
// It can be used as an alternative to context-based SingleWorkerMode
9+
type AutoRegisteringRegistry struct {
10+
*Registry
11+
}
12+
13+
// NewAutoRegistering creates a new auto-registering registry
14+
func NewAutoRegistering() *AutoRegisteringRegistry {
15+
return &AutoRegisteringRegistry{
16+
Registry: New(),
17+
}
18+
}
19+
20+
// GetWorkflowWithAutoRegister returns the workflow function for the given name.
21+
// If the workflow is not found and wf is provided, it will attempt to register it.
22+
func (r *AutoRegisteringRegistry) GetWorkflowWithAutoRegister(name string, wf interface{}) (interface{}, error) {
23+
v, err := r.Registry.GetWorkflow(name)
24+
25+
// If not found, try to register the workflow
26+
if err != nil && wf != nil {
27+
if err := r.RegisterWorkflow(wf); err != nil {
28+
return nil, fmt.Errorf("auto-registering workflow %s: %w", name, err)
29+
}
30+
return r.Registry.GetWorkflow(name)
31+
}
32+
33+
return v, err
34+
}
35+
36+
// GetActivityWithAutoRegister returns the activity function for the given name.
37+
// If the activity is not found and activity is provided, it will attempt to register it.
38+
func (r *AutoRegisteringRegistry) GetActivityWithAutoRegister(name string, activity interface{}) (interface{}, error) {
39+
v, err := r.Registry.GetActivity(name)
40+
41+
// If not found, try to register the activity
42+
if err != nil && activity != nil {
43+
if err := r.RegisterActivity(activity); err != nil {
44+
return nil, fmt.Errorf("auto-registering activity %s: %w", name, err)
45+
}
46+
return r.Registry.GetActivity(name)
47+
}
48+
49+
return v, err
50+
}

registry/registry.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,38 @@ import (
88

99
"github.com/cschleiden/go-workflows/internal/args"
1010
"github.com/cschleiden/go-workflows/internal/fn"
11-
wf "github.com/cschleiden/go-workflows/workflow"
1211
)
1312

1413
type Registry struct {
14+
sealed bool
15+
1516
sync.Mutex
1617

17-
workflowMap map[string]wf.Workflow
18-
activityMap map[string]interface{}
18+
workflowMap map[string]any
19+
activityMap map[string]any
1920
}
2021

2122
// New creates a new registry instance.
2223
func New() *Registry {
2324
return &Registry{
24-
workflowMap: make(map[string]wf.Workflow),
25-
activityMap: make(map[string]interface{}),
25+
workflowMap: make(map[string]any),
26+
activityMap: make(map[string]any),
2627
}
2728
}
2829

2930
type registerConfig struct {
3031
Name string
3132
}
3233

33-
func (r *Registry) RegisterWorkflow(workflow wf.Workflow, opts ...RegisterOption) error {
34+
func (r *Registry) Seal() {
35+
r.sealed = true
36+
}
37+
38+
func (r *Registry) Sealed() bool {
39+
return r.sealed
40+
}
41+
42+
func (r *Registry) RegisterWorkflow(workflow any, opts ...RegisterOption) error {
3443
cfg := registerOptions(opts).applyRegisterOptions(registerConfig{})
3544
name := cfg.Name
3645
if name == "" {
@@ -75,7 +84,7 @@ func (r *Registry) RegisterWorkflow(workflow wf.Workflow, opts ...RegisterOption
7584
return nil
7685
}
7786

78-
func (r *Registry) RegisterActivity(activity wf.Activity, opts ...RegisterOption) error {
87+
func (r *Registry) RegisterActivity(activity any, opts ...RegisterOption) error {
7988
cfg := registerOptions(opts).applyRegisterOptions(registerConfig{})
8089

8190
t := reflect.TypeOf(activity)
@@ -151,7 +160,7 @@ func checkActivity(actType reflect.Type) error {
151160
return nil
152161
}
153162

154-
func (r *Registry) GetWorkflow(name string) (wf.Workflow, error) {
163+
func (r *Registry) GetWorkflow(name string) (any, error) {
155164
r.Lock()
156165
defer r.Unlock()
157166

registry/registry_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
"github.com/cschleiden/go-workflows/internal/fn"
88
"github.com/cschleiden/go-workflows/internal/sync"
9-
wf "github.com/cschleiden/go-workflows/workflow"
109
"github.com/stretchr/testify/require"
1110
)
1211

@@ -17,7 +16,7 @@ func reg_workflow1(ctx sync.Context) error {
1716
func TestRegistry_RegisterWorkflow(t *testing.T) {
1817
type args struct {
1918
name string
20-
workflow wf.Workflow
19+
workflow any
2120
}
2221
tests := []struct {
2322
name string

samples/orchestrator/main.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
"github.com/cschleiden/go-workflows/client"
9+
"github.com/cschleiden/go-workflows/samples"
10+
"github.com/cschleiden/go-workflows/worker"
11+
"github.com/cschleiden/go-workflows/workflow"
12+
13+
"github.com/google/uuid"
14+
)
15+
16+
func main() {
17+
ctx, cancel := context.WithCancel(context.Background())
18+
defer cancel()
19+
20+
backend := samples.GetBackend("orchestrator")
21+
22+
orchestrator := worker.NewWorkflowOrchestrator(
23+
backend,
24+
nil,
25+
)
26+
27+
if err := orchestrator.Start(ctx); err != nil {
28+
panic("could not start orchestrator: " + err.Error())
29+
}
30+
31+
// Create instance ID
32+
instanceID := uuid.NewString()
33+
34+
// Create and run workflow using the orchestrator - no explicit registration needed
35+
instance, err := orchestrator.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
36+
InstanceID: instanceID,
37+
}, SimpleWorkflow, "Hello from orchestrator!")
38+
if err != nil {
39+
panic("could not create workflow instance: " + err.Error())
40+
}
41+
42+
// Wait for result using client package directly
43+
result, err := client.GetWorkflowResult[string](ctx, orchestrator.Client, instance, 10*time.Second)
44+
if err != nil {
45+
panic("error getting workflow result: " + err.Error())
46+
}
47+
48+
log.Printf("Workflow result: %s\n", result)
49+
50+
// Clean shutdown
51+
cancel()
52+
53+
if err := orchestrator.WaitForCompletion(); err != nil {
54+
panic("could not stop orchestrator: " + err.Error())
55+
}
56+
}
57+
58+
// SimpleWorkflow is a basic workflow that calls an activity and returns its result
59+
func SimpleWorkflow(ctx workflow.Context, message string) (string, error) {
60+
future := workflow.ExecuteActivity[string](ctx, workflow.DefaultActivityOptions, ProcessMessage, message)
61+
62+
result, err := future.Get(ctx)
63+
if err != nil {
64+
return "", err
65+
}
66+
67+
return result, nil
68+
}
69+
70+
// ProcessMessage is a simple activity that processes a message
71+
// When orchestrator mode is enabled, this will be automatically registered
72+
func ProcessMessage(ctx context.Context, message string) (string, error) {
73+
return message + " (processed by activity)", nil
74+
}
75+
76+
// Note: No separate activity function needed when using InlineActivity

0 commit comments

Comments
 (0)