Skip to content

Commit 38274ae

Browse files
committed
optimize poller behavior, remove BlockingBackend
1 parent 5c9f3da commit 38274ae

File tree

4 files changed

+10
-59
lines changed

4 files changed

+10
-59
lines changed

backend/redis/redis.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/cschleiden/go-workflows/internal/core"
1313
"github.com/cschleiden/go-workflows/internal/history"
1414
"github.com/cschleiden/go-workflows/internal/metrickeys"
15-
"github.com/cschleiden/go-workflows/internal/worker"
1615
"github.com/cschleiden/go-workflows/metrics"
1716
"github.com/redis/go-redis/v9"
1817
"go.opentelemetry.io/otel/trace"
@@ -110,7 +109,3 @@ func (rb *redisBackend) ContextPropagators() []contextpropagation.ContextPropaga
110109
func (rb *redisBackend) Close() error {
111110
return rb.rdb.Close()
112111
}
113-
114-
var _ worker.BlockingBackend = (*redisBackend)(nil)
115-
116-
func (rb *redisBackend) BlockOnGetTask() { /* satisfy interface */ }

internal/worker/activity.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -74,38 +74,22 @@ func (aw *ActivityWorker) WaitForCompletion() error {
7474
func (aw *ActivityWorker) runPoll(ctx context.Context) {
7575
defer aw.pollersWg.Done()
7676

77-
poll := func() {
77+
ticker := time.NewTicker(aw.options.ActivityPollingInterval)
78+
defer ticker.Stop()
79+
for {
7880
task, err := aw.poll(ctx, 30*time.Second)
7981
if err != nil {
8082
aw.logger.ErrorContext(ctx, "error while polling for activity task", "error", err)
81-
return
8283
}
83-
8484
if task != nil {
8585
aw.activityTaskQueue <- task
86+
continue // check for new tasks right away
8687
}
87-
}
8888

89-
if _, ok := aw.backend.(BlockingBackend); ok {
90-
// Backend blocks on calls to GetActivityTask, poll continuously
91-
for {
92-
if ctx.Err() != nil {
93-
return
94-
}
95-
poll()
96-
}
97-
}
98-
99-
// Backend is polling tasks and returns on calls to GetActivityTask, we need
100-
// to throttle the calls.
101-
ticker := time.NewTicker(aw.options.ActivityPollingInterval)
102-
defer ticker.Stop()
103-
for {
10489
select {
10590
case <-ctx.Done():
10691
return
10792
case <-ticker.C:
108-
poll()
10993
}
11094
}
11195
}
@@ -214,7 +198,7 @@ func (aw *ActivityWorker) poll(ctx context.Context, timeout time.Duration) (*tas
214198

215199
task, err := aw.backend.GetActivityTask(ctx)
216200
if err != nil {
217-
if errors.Is(err, context.Canceled) {
201+
if errors.Is(err, context.DeadlineExceeded) {
218202
return nil, nil
219203
}
220204
}

internal/worker/backend.go

Lines changed: 0 additions & 12 deletions
This file was deleted.

internal/worker/workflow.go

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,39 +84,23 @@ func (ww *WorkflowWorker) WaitForCompletion() error {
8484
func (ww *WorkflowWorker) runPoll(ctx context.Context) {
8585
defer ww.pollersWg.Done()
8686

87-
poll := func() {
87+
ticker := time.NewTicker(ww.options.WorkflowPollingInterval)
88+
defer ticker.Stop()
89+
for {
8890
task, err := ww.poll(ctx, 30*time.Second)
8991
if err != nil {
9092
ww.logger.ErrorContext(ctx, "error while polling for workflow task", "error", err)
91-
return
9293
}
93-
9494
if task != nil {
9595
ww.wg.Add(1)
9696
ww.workflowTaskQueue <- task
97+
continue // check for new tasks right away
9798
}
98-
}
9999

100-
if _, ok := ww.backend.(BlockingBackend); ok {
101-
// Backend blocks on calls to GetActivityTask, poll continuously
102-
for {
103-
if ctx.Err() != nil {
104-
return
105-
}
106-
poll()
107-
}
108-
}
109-
110-
// Backend is polling tasks and returns on calls to GetActivityTask, we need
111-
// to throttle the calls.
112-
ticker := time.NewTicker(ww.options.WorkflowPollingInterval)
113-
defer ticker.Stop()
114-
for {
115100
select {
116101
case <-ctx.Done():
117102
return
118103
case <-ticker.C:
119-
poll()
120104
}
121105
}
122106
}
@@ -274,7 +258,7 @@ func (ww *WorkflowWorker) poll(ctx context.Context, timeout time.Duration) (*tas
274258

275259
task, err := ww.backend.GetWorkflowTask(ctx)
276260
if err != nil {
277-
if errors.Is(err, context.Canceled) {
261+
if errors.Is(err, context.DeadlineExceeded) {
278262
return nil, nil
279263
}
280264

0 commit comments

Comments
 (0)