Skip to content

Commit 099dd93

Browse files
authored
Merge pull request #239 from lovromazgon/polling-interval
Make polling interval configurable
2 parents 3f453f5 + 22bad5a commit 099dd93

File tree

4 files changed

+53
-36
lines changed

4 files changed

+53
-36
lines changed

backend/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type Backend interface {
4444
// If the given instance does not exist, it will return an error
4545
SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error
4646

47-
// GetWorkflowInstance returns a pending workflow task or nil if there are no pending worflow executions
47+
// GetWorkflowTask returns a pending workflow task or nil if there are no pending workflow executions
4848
GetWorkflowTask(ctx context.Context) (*task.Workflow, error)
4949

5050
// ExtendWorkflowTask extends the lock of a workflow task

internal/worker/activity.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package worker
33
import (
44
"context"
55
"errors"
6-
"log"
6+
"log/slog"
77
"sync"
88
"time"
99

@@ -30,7 +30,8 @@ type ActivityWorker struct {
3030
wg sync.WaitGroup
3131
pollersWg sync.WaitGroup
3232

33-
clock clock.Clock
33+
clock clock.Clock
34+
logger *slog.Logger
3435
}
3536

3637
func NewActivityWorker(backend backend.Backend, registry *workflow.Registry, clock clock.Clock, options *Options) *ActivityWorker {
@@ -42,7 +43,8 @@ func NewActivityWorker(backend backend.Backend, registry *workflow.Registry, clo
4243
activityTaskQueue: make(chan *task.Activity),
4344
activityTaskExecutor: activity.NewExecutor(backend.Logger(), backend.Tracer(), backend.Converter(), backend.ContextPropagators(), registry),
4445

45-
clock: clock,
46+
clock: clock,
47+
logger: backend.Logger(),
4648
}
4749
}
4850

@@ -53,7 +55,7 @@ func (aw *ActivityWorker) Start(ctx context.Context) error {
5355
go aw.runPoll(ctx)
5456
}
5557

56-
go aw.runDispatcher(context.Background())
58+
go aw.runDispatcher()
5759

5860
return nil
5961
}
@@ -72,25 +74,28 @@ func (aw *ActivityWorker) WaitForCompletion() error {
7274
func (aw *ActivityWorker) runPoll(ctx context.Context) {
7375
defer aw.pollersWg.Done()
7476

77+
ticker := time.NewTicker(aw.options.ActivityPollingInterval)
78+
defer ticker.Stop()
7579
for {
80+
task, err := aw.poll(ctx, 30*time.Second)
81+
if err != nil {
82+
aw.logger.ErrorContext(ctx, "error while polling for activity task", "error", err)
83+
}
84+
if task != nil {
85+
aw.wg.Add(1)
86+
aw.activityTaskQueue <- task
87+
continue // check for new tasks right away
88+
}
89+
7690
select {
7791
case <-ctx.Done():
7892
return
79-
default:
80-
task, err := aw.poll(ctx, 30*time.Second)
81-
if err != nil {
82-
log.Println("error while polling for activity task:", err)
83-
continue
84-
}
85-
86-
if task != nil {
87-
aw.activityTaskQueue <- task
88-
}
93+
case <-ticker.C:
8994
}
9095
}
9196
}
9297

93-
func (aw *ActivityWorker) runDispatcher(ctx context.Context) {
98+
func (aw *ActivityWorker) runDispatcher() {
9499
var sem chan struct{}
95100
if aw.options.MaxParallelActivityTasks > 0 {
96101
sem = make(chan struct{}, aw.options.MaxParallelActivityTasks)
@@ -103,7 +108,6 @@ func (aw *ActivityWorker) runDispatcher(ctx context.Context) {
103108

104109
task := task
105110

106-
aw.wg.Add(1)
107111
go func() {
108112
defer aw.wg.Done()
109113

@@ -194,7 +198,7 @@ func (aw *ActivityWorker) poll(ctx context.Context, timeout time.Duration) (*tas
194198

195199
task, err := aw.backend.GetActivityTask(ctx)
196200
if err != nil {
197-
if errors.Is(err, context.Canceled) {
201+
if errors.Is(err, context.DeadlineExceeded) {
198202
return nil, nil
199203
}
200204
}

internal/worker/options.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ type Options struct {
3333
// WorkflowHeartbeatInterval is the interval between heartbeat attempts on workflow tasks, when enabled.
3434
WorkflowHeartbeatInterval time.Duration
3535

36+
// WorkflowPollingInterval is the interval between polling for new workflow tasks.
37+
// Note that if you use a backend that can wait for tasks to be available (e.g. redis) this field has no effect.
38+
// Defaults to 200ms.
39+
WorkflowPollingInterval time.Duration
40+
41+
// ActivityPollingInterval is the interval between polling for new activity tasks.
42+
// Note that if you use a backend that can wait for tasks to be available (e.g. redis) this field has no effect.
43+
// Defaults to 200ms.
44+
ActivityPollingInterval time.Duration
45+
3646
// WorkflowExecutorCache is the max size of the workflow executor cache. Defaults to 128
3747
WorkflowExecutorCacheSize int
3848

@@ -51,6 +61,8 @@ var DefaultOptions = Options{
5161
MaxParallelActivityTasks: 0,
5262
ActivityHeartbeatInterval: 25 * time.Second,
5363
WorkflowHeartbeatInterval: 25 * time.Second,
64+
WorkflowPollingInterval: 200 * time.Millisecond,
65+
ActivityPollingInterval: 200 * time.Millisecond,
5466

5567
WorkflowExecutorCacheSize: 128,
5668
WorkflowExecutorCacheTTL: time.Second * 10,

internal/worker/workflow.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -84,28 +84,29 @@ func (ww *WorkflowWorker) WaitForCompletion() error {
8484
func (ww *WorkflowWorker) runPoll(ctx context.Context) {
8585
defer ww.pollersWg.Done()
8686

87+
ticker := time.NewTicker(ww.options.WorkflowPollingInterval)
88+
defer ticker.Stop()
8789
for {
90+
task, err := ww.poll(ctx, 30*time.Second)
91+
if err != nil {
92+
ww.logger.ErrorContext(ctx, "error while polling for workflow task", "error", err)
93+
}
94+
if task != nil {
95+
ww.wg.Add(1)
96+
ww.workflowTaskQueue <- task
97+
continue // check for new tasks right away
98+
}
99+
88100
select {
89101
case <-ctx.Done():
90102
return
91-
92-
default:
93-
task, err := ww.poll(ctx, 30*time.Second)
94-
if err != nil {
95-
ww.logger.Error("error while polling for workflow task", "error", err)
96-
continue
97-
}
98-
99-
if task != nil {
100-
ww.wg.Add(1)
101-
ww.workflowTaskQueue <- task
102-
}
103+
case <-ticker.C:
103104
}
104105
}
105106
}
106107

107108
func (ww *WorkflowWorker) runDispatcher() {
108-
var sem chan (struct{})
109+
var sem chan struct{}
109110

110111
if ww.options.MaxParallelWorkflowTasks > 0 {
111112
sem = make(chan struct{}, ww.options.MaxParallelWorkflowTasks)
@@ -177,7 +178,7 @@ func (ww *WorkflowWorker) handle(ctx context.Context, t *task.Workflow) {
177178

178179
if err := ww.backend.CompleteWorkflowTask(
179180
ctx, t, t.WorkflowInstance, state, result.Executed, result.ActivityEvents, result.TimerEvents, result.WorkflowEvents); err != nil {
180-
ww.logger.Error("could not complete workflow task", "error", err)
181+
ww.logger.ErrorContext(ctx, "could not complete workflow task", "error", err)
181182
panic("could not complete workflow task")
182183
}
183184
}
@@ -210,7 +211,7 @@ func (ww *WorkflowWorker) getExecutor(ctx context.Context, t *task.Workflow) (wo
210211
// Try to get a cached executor
211212
executor, ok, err := ww.cache.Get(ctx, t.WorkflowInstance)
212213
if err != nil {
213-
ww.logger.Error("could not get cached workflow task executor", "error", err)
214+
ww.logger.ErrorContext(ctx, "could not get cached workflow task executor", "error", err)
214215
}
215216

216217
if !ok {
@@ -224,7 +225,7 @@ func (ww *WorkflowWorker) getExecutor(ctx context.Context, t *task.Workflow) (wo
224225

225226
// Cache executor instance for future continuation tasks, or refresh last access time
226227
if err := ww.cache.Store(ctx, t.WorkflowInstance, executor); err != nil {
227-
ww.logger.Error("error while caching workflow task executor:", "error", err)
228+
ww.logger.ErrorContext(ctx, "error while caching workflow task executor:", "error", err)
228229
}
229230

230231
return executor, nil
@@ -240,7 +241,7 @@ func (ww *WorkflowWorker) heartbeatTask(ctx context.Context, task *task.Workflow
240241
return
241242
case <-t.C:
242243
if err := ww.backend.ExtendWorkflowTask(ctx, task.ID, task.WorkflowInstance); err != nil {
243-
ww.logger.Error("could not heartbeat workflow task", "error", err)
244+
ww.logger.ErrorContext(ctx, "could not heartbeat workflow task", "error", err)
244245
panic("could not heartbeat workflow task")
245246
}
246247
}
@@ -257,7 +258,7 @@ func (ww *WorkflowWorker) poll(ctx context.Context, timeout time.Duration) (*tas
257258

258259
task, err := ww.backend.GetWorkflowTask(ctx)
259260
if err != nil {
260-
if errors.Is(err, context.Canceled) {
261+
if errors.Is(err, context.DeadlineExceeded) {
261262
return nil, nil
262263
}
263264

0 commit comments

Comments
 (0)