@@ -84,28 +84,29 @@ func (ww *WorkflowWorker) WaitForCompletion() error {
84
84
func (ww * WorkflowWorker ) runPoll (ctx context.Context ) {
85
85
defer ww .pollersWg .Done ()
86
86
87
+ ticker := time .NewTicker (ww .options .WorkflowPollingInterval )
88
+ defer ticker .Stop ()
87
89
for {
90
+ task , err := ww .poll (ctx , 30 * time .Second )
91
+ if err != nil {
92
+ ww .logger .ErrorContext (ctx , "error while polling for workflow task" , "error" , err )
93
+ }
94
+ if task != nil {
95
+ ww .wg .Add (1 )
96
+ ww .workflowTaskQueue <- task
97
+ continue // check for new tasks right away
98
+ }
99
+
88
100
select {
89
101
case <- ctx .Done ():
90
102
return
91
-
92
- default :
93
- task , err := ww .poll (ctx , 30 * time .Second )
94
- if err != nil {
95
- ww .logger .Error ("error while polling for workflow task" , "error" , err )
96
- continue
97
- }
98
-
99
- if task != nil {
100
- ww .wg .Add (1 )
101
- ww .workflowTaskQueue <- task
102
- }
103
+ case <- ticker .C :
103
104
}
104
105
}
105
106
}
106
107
107
108
func (ww * WorkflowWorker ) runDispatcher () {
108
- var sem chan ( struct {})
109
+ var sem chan struct {}
109
110
110
111
if ww .options .MaxParallelWorkflowTasks > 0 {
111
112
sem = make (chan struct {}, ww .options .MaxParallelWorkflowTasks )
@@ -177,7 +178,7 @@ func (ww *WorkflowWorker) handle(ctx context.Context, t *task.Workflow) {
177
178
178
179
if err := ww .backend .CompleteWorkflowTask (
179
180
ctx , t , t .WorkflowInstance , state , result .Executed , result .ActivityEvents , result .TimerEvents , result .WorkflowEvents ); err != nil {
180
- ww .logger .Error ( "could not complete workflow task" , "error" , err )
181
+ ww .logger .ErrorContext ( ctx , "could not complete workflow task" , "error" , err )
181
182
panic ("could not complete workflow task" )
182
183
}
183
184
}
@@ -210,7 +211,7 @@ func (ww *WorkflowWorker) getExecutor(ctx context.Context, t *task.Workflow) (wo
210
211
// Try to get a cached executor
211
212
executor , ok , err := ww .cache .Get (ctx , t .WorkflowInstance )
212
213
if err != nil {
213
- ww .logger .Error ( "could not get cached workflow task executor" , "error" , err )
214
+ ww .logger .ErrorContext ( ctx , "could not get cached workflow task executor" , "error" , err )
214
215
}
215
216
216
217
if ! ok {
@@ -224,7 +225,7 @@ func (ww *WorkflowWorker) getExecutor(ctx context.Context, t *task.Workflow) (wo
224
225
225
226
// Cache executor instance for future continuation tasks, or refresh last access time
226
227
if err := ww .cache .Store (ctx , t .WorkflowInstance , executor ); err != nil {
227
- ww .logger .Error ( "error while caching workflow task executor:" , "error" , err )
228
+ ww .logger .ErrorContext ( ctx , "error while caching workflow task executor:" , "error" , err )
228
229
}
229
230
230
231
return executor , nil
@@ -240,7 +241,7 @@ func (ww *WorkflowWorker) heartbeatTask(ctx context.Context, task *task.Workflow
240
241
return
241
242
case <- t .C :
242
243
if err := ww .backend .ExtendWorkflowTask (ctx , task .ID , task .WorkflowInstance ); err != nil {
243
- ww .logger .Error ( "could not heartbeat workflow task" , "error" , err )
244
+ ww .logger .ErrorContext ( ctx , "could not heartbeat workflow task" , "error" , err )
244
245
panic ("could not heartbeat workflow task" )
245
246
}
246
247
}
@@ -257,7 +258,7 @@ func (ww *WorkflowWorker) poll(ctx context.Context, timeout time.Duration) (*tas
257
258
258
259
task , err := ww .backend .GetWorkflowTask (ctx )
259
260
if err != nil {
260
- if errors .Is (err , context .Canceled ) {
261
+ if errors .Is (err , context .DeadlineExceeded ) {
261
262
return nil , nil
262
263
}
263
264
0 commit comments