Skip to content

Commit fae4358

Browse files
committed
Support separate activity/workflow workers
1 parent f659c75 commit fae4358

File tree

4 files changed

+127
-88
lines changed

4 files changed

+127
-88
lines changed

internal/worker/activity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func NewActivityWorker(
3434
logger: b.Options().Logger,
3535
}
3636

37-
return NewWorker[backend.ActivityTask, history.Event](b, tw, &options)
37+
return NewWorker(b, tw, &options)
3838
}
3939

4040
type ActivityTaskWorker struct {

samples/scale/worker/main.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,14 @@ func main() {
3333

3434
func RunWorker(ctx context.Context, mb backend.Backend) {
3535
w := worker.New(mb, &worker.Options{
36-
WorkflowPollers: 1,
37-
MaxParallelWorkflowTasks: 100,
38-
ActivityPollers: 1,
39-
MaxParallelActivityTasks: 100,
36+
WorkflowWorkerOptions: worker.WorkflowWorkerOptions{
37+
WorkflowPollers: 1,
38+
MaxParallelWorkflowTasks: 100,
39+
},
40+
ActivityWorkerOptions: worker.ActivityWorkerOptions{
41+
ActivityPollers: 1,
42+
MaxParallelActivityTasks: 100,
43+
},
4044
})
4145

4246
w.RegisterWorkflow(scale.Workflow1)

worker/options.go

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,14 @@ import (
77
"github.com/cschleiden/go-workflows/workflow/executor"
88
)
99

10-
type Options struct {
10+
type WorkflowWorkerOptions struct {
1111
// WorkflowsPollers is the number of pollers to start. Defaults to 2.
1212
WorkflowPollers int
1313

1414
// MaxParallelWorkflowTasks determines the maximum number of concurrent workflow tasks processed
1515
// by the worker. The default is 0 which is no limit.
1616
MaxParallelWorkflowTasks int
1717

18-
// ActivityPollers is the number of pollers to start. Defaults to 2.
19-
ActivityPollers int
20-
21-
// MaxParallelActivityTasks determines the maximum number of concurrent activity tasks processed
22-
// by the worker. The default is 0 which is no limit.
23-
MaxParallelActivityTasks int
24-
25-
// ActivityHeartbeatInterval is the interval between heartbeat attempts for activity tasks. Defaults
26-
// to 25 seconds
27-
ActivityHeartbeatInterval time.Duration
28-
2918
// WorkflowHeartbeatInterval is the interval between heartbeat attempts on workflow tasks. Defaults
3019
// to 25 seconds
3120
WorkflowHeartbeatInterval time.Duration
@@ -35,11 +24,6 @@ type Options struct {
3524
// Defaults to 200ms.
3625
WorkflowPollingInterval time.Duration
3726

38-
// ActivityPollingInterval is the interval between polling for new activity tasks.
39-
// Note that if you use a backend that can wait for tasks to be available (e.g. redis) this field has no effect.
40-
// Defaults to 200ms.
41-
ActivityPollingInterval time.Duration
42-
4327
// WorkflowExecutorCache is the max size of the workflow executor cache. Defaults to 128
4428
WorkflowExecutorCacheSize int
4529

@@ -50,23 +34,52 @@ type Options struct {
5034
// will be used.
5135
WorkflowExecutorCache executor.Cache
5236

37+
// WorkflowQueues are the queue the worker listens to
5338
WorkflowQueues []workflow.Queue
39+
}
40+
41+
type Options struct {
42+
WorkflowWorkerOptions
43+
ActivityWorkerOptions
44+
}
45+
46+
type ActivityWorkerOptions struct {
47+
// ActivityPollers is the number of pollers to start. Defaults to 2.
48+
ActivityPollers int
49+
50+
// MaxParallelActivityTasks determines the maximum number of concurrent activity tasks processed
51+
// by the worker. The default is 0 which is no limit.
52+
MaxParallelActivityTasks int
53+
54+
// ActivityHeartbeatInterval is the interval between heartbeat attempts for activity tasks. Defaults
55+
// to 25 seconds
56+
ActivityHeartbeatInterval time.Duration
57+
58+
// ActivityPollingInterval is the interval between polling for new activity tasks.
59+
// Note that if you use a backend that can wait for tasks to be available (e.g. redis) this field has no effect.
60+
// Defaults to 200ms.
61+
ActivityPollingInterval time.Duration
5462

63+
// ActivityQueues are the queues the worker listens to
5564
ActivityQueues []workflow.Queue
5665
}
5766

5867
var DefaultOptions = Options{
59-
WorkflowPollers: 2,
60-
WorkflowPollingInterval: 200 * time.Millisecond,
61-
MaxParallelWorkflowTasks: 0,
62-
WorkflowHeartbeatInterval: 25 * time.Second,
63-
64-
WorkflowExecutorCacheSize: 128,
65-
WorkflowExecutorCacheTTL: time.Second * 10,
66-
WorkflowExecutorCache: nil,
67-
68-
ActivityPollers: 2,
69-
ActivityPollingInterval: 200 * time.Millisecond,
70-
MaxParallelActivityTasks: 0,
71-
ActivityHeartbeatInterval: 25 * time.Second,
68+
WorkflowWorkerOptions: WorkflowWorkerOptions{
69+
WorkflowPollers: 2,
70+
WorkflowPollingInterval: 200 * time.Millisecond,
71+
MaxParallelWorkflowTasks: 0,
72+
WorkflowHeartbeatInterval: 25 * time.Second,
73+
74+
WorkflowExecutorCacheSize: 128,
75+
WorkflowExecutorCacheTTL: time.Second * 10,
76+
WorkflowExecutorCache: nil,
77+
},
78+
79+
ActivityWorkerOptions: ActivityWorkerOptions{
80+
ActivityPollers: 2,
81+
ActivityPollingInterval: 200 * time.Millisecond,
82+
MaxParallelActivityTasks: 0,
83+
ActivityHeartbeatInterval: 25 * time.Second,
84+
},
7285
}

worker/worker.go

Lines changed: 75 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -3,106 +3,128 @@ package worker
33
import (
44
"context"
55
"fmt"
6-
"sync"
76

87
"github.com/benbjohnson/clock"
98
"github.com/cschleiden/go-workflows/backend"
10-
"github.com/cschleiden/go-workflows/backend/history"
119
"github.com/cschleiden/go-workflows/client"
1210
"github.com/cschleiden/go-workflows/internal/signals"
1311
internal "github.com/cschleiden/go-workflows/internal/worker"
1412
"github.com/cschleiden/go-workflows/registry"
1513
"github.com/cschleiden/go-workflows/workflow"
16-
"github.com/cschleiden/go-workflows/workflow/executor"
1714
)
1815

1916
type Worker struct {
2017
backend backend.Backend
2118

22-
done chan struct{}
23-
wg *sync.WaitGroup
24-
2519
registry *registry.Registry
2620

27-
workflowWorker *internal.Worker[backend.WorkflowTask, executor.ExecutionResult]
28-
activityWorker *internal.Worker[backend.ActivityTask, history.Event]
21+
workers []worker
22+
}
23+
24+
type worker interface {
25+
Start(context.Context) error
26+
WaitForCompletion() error
2927
}
3028

29+
// New creates a worker that processes workflows and activities.
3130
func New(backend backend.Backend, options *Options) *Worker {
31+
registry := registry.New()
32+
3233
if options == nil {
3334
options = &DefaultOptions
34-
} else {
35-
if options.WorkflowExecutorCacheSize == 0 {
36-
options.WorkflowExecutorCacheSize = DefaultOptions.WorkflowExecutorCacheSize
37-
}
38-
39-
if options.WorkflowExecutorCacheTTL == 0 {
40-
options.WorkflowExecutorCacheTTL = DefaultOptions.WorkflowExecutorCacheTTL
41-
}
4235
}
4336

44-
registry := registry.New()
37+
workflowWorker := newWorkflowWorker(backend, registry, &options.WorkflowWorkerOptions)
38+
activityWorker := newActivityWorker(backend, registry, &options.ActivityWorkerOptions)
4539

4640
// Register internal activities
41+
return newWorker(backend, registry, []worker{workflowWorker, activityWorker})
42+
}
43+
44+
// NewWorkflowWorker creates a worker that only processes workflows.
45+
func NewWorkflowWorker(backend backend.Backend, options *WorkflowWorkerOptions) *Worker {
46+
registry := registry.New()
47+
48+
return newWorker(backend, registry, []worker{newWorkflowWorker(backend, registry, options)})
49+
}
50+
51+
// NewActivityWorker creates a worker that only processes activities.
52+
func NewActivityWorker(backend backend.Backend, options *ActivityWorkerOptions) *Worker {
53+
registry := registry.New()
54+
55+
return newWorker(backend, registry, []worker{newActivityWorker(backend, registry, options)})
56+
}
57+
58+
func newWorker(backend backend.Backend, registry *registry.Registry, workers []worker) *Worker {
4759
if err := registry.RegisterActivity(&signals.Activities{Signaler: client.New(backend)}); err != nil {
4860
panic(fmt.Errorf("registering internal activities: %w", err))
4961
}
5062

5163
return &Worker{
5264
backend: backend,
5365

54-
done: make(chan struct{}),
55-
wg: &sync.WaitGroup{},
56-
57-
workflowWorker: internal.NewWorkflowWorker(backend, registry, internal.WorkflowWorkerOptions{
58-
WorkerOptions: internal.WorkerOptions{
59-
Pollers: options.WorkflowPollers,
60-
PollingInterval: options.WorkflowPollingInterval,
61-
MaxParallelTasks: options.MaxParallelWorkflowTasks,
62-
HeartbeatInterval: options.WorkflowHeartbeatInterval,
63-
Queues: options.WorkflowQueues,
64-
},
65-
WorkflowExecutorCache: options.WorkflowExecutorCache,
66-
WorkflowExecutorCacheSize: options.WorkflowExecutorCacheSize,
67-
WorkflowExecutorCacheTTL: options.WorkflowExecutorCacheTTL,
68-
}),
69-
70-
activityWorker: internal.NewActivityWorker(backend, registry, clock.New(), internal.WorkerOptions{
71-
Pollers: options.ActivityPollers,
72-
PollingInterval: options.ActivityPollingInterval,
73-
MaxParallelTasks: options.MaxParallelActivityTasks,
74-
HeartbeatInterval: options.ActivityHeartbeatInterval,
75-
Queues: options.ActivityQueues,
76-
}),
77-
66+
workers: workers,
7867
registry: registry,
7968
}
8069
}
8170

71+
func newActivityWorker(backend backend.Backend, registry *registry.Registry, options *ActivityWorkerOptions) worker {
72+
if options == nil {
73+
options = &DefaultOptions.ActivityWorkerOptions
74+
}
75+
76+
activityWorker := internal.NewActivityWorker(backend, registry, clock.New(), internal.WorkerOptions{
77+
Pollers: options.ActivityPollers,
78+
PollingInterval: options.ActivityPollingInterval,
79+
MaxParallelTasks: options.MaxParallelActivityTasks,
80+
HeartbeatInterval: options.ActivityHeartbeatInterval,
81+
Queues: options.ActivityQueues,
82+
})
83+
84+
return activityWorker
85+
}
86+
87+
func newWorkflowWorker(backend backend.Backend, registry *registry.Registry, options *WorkflowWorkerOptions) worker {
88+
if options == nil {
89+
options = &DefaultOptions.WorkflowWorkerOptions
90+
}
91+
92+
workflowWorker := internal.NewWorkflowWorker(backend, registry, internal.WorkflowWorkerOptions{
93+
WorkerOptions: internal.WorkerOptions{
94+
Pollers: options.WorkflowPollers,
95+
PollingInterval: options.WorkflowPollingInterval,
96+
MaxParallelTasks: options.MaxParallelWorkflowTasks,
97+
HeartbeatInterval: options.WorkflowHeartbeatInterval,
98+
Queues: options.WorkflowQueues,
99+
},
100+
WorkflowExecutorCache: options.WorkflowExecutorCache,
101+
WorkflowExecutorCacheSize: options.WorkflowExecutorCacheSize,
102+
WorkflowExecutorCacheTTL: options.WorkflowExecutorCacheTTL,
103+
})
104+
105+
return workflowWorker
106+
}
107+
82108
// Start starts the worker.
83109
//
84110
// To stop the worker, cancel the context passed to Start. To wait for completion of the active
85111
// tasks, call `WaitForCompletion`.
86112
func (w *Worker) Start(ctx context.Context) error {
87-
if err := w.workflowWorker.Start(ctx); err != nil {
88-
return fmt.Errorf("starting workflow worker: %w", err)
89-
}
90-
91-
if err := w.activityWorker.Start(ctx); err != nil {
92-
return fmt.Errorf("starting activity worker: %w", err)
113+
for _, worker := range w.workers {
114+
if err := worker.Start(ctx); err != nil {
115+
return fmt.Errorf("starting worker: %w", err)
116+
}
93117
}
94118

95119
return nil
96120
}
97121

98122
// WaitForCompletion waits for all active tasks to complete.
99123
func (w *Worker) WaitForCompletion() error {
100-
if err := w.workflowWorker.WaitForCompletion(); err != nil {
101-
return err
102-
}
103-
104-
if err := w.activityWorker.WaitForCompletion(); err != nil {
105-
return err
124+
for _, worker := range w.workers {
125+
if err := worker.WaitForCompletion(); err != nil {
126+
return fmt.Errorf("waiting for worker completion: %w", err)
127+
}
106128
}
107129

108130
return nil

0 commit comments

Comments
 (0)