Skip to content

Commit 2b0ac7c

Browse files
committed
increase wait group when adding task to queue, cleanup
1 parent 38274ae commit 2b0ac7c

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

internal/worker/activity.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (aw *ActivityWorker) Start(ctx context.Context) error {
5555
go aw.runPoll(ctx)
5656
}
5757

58-
go aw.runDispatcher(context.Background())
58+
go aw.runDispatcher()
5959

6060
return nil
6161
}
@@ -82,6 +82,7 @@ func (aw *ActivityWorker) runPoll(ctx context.Context) {
8282
aw.logger.ErrorContext(ctx, "error while polling for activity task", "error", err)
8383
}
8484
if task != nil {
85+
aw.wg.Add(1)
8586
aw.activityTaskQueue <- task
8687
continue // check for new tasks right away
8788
}
@@ -94,7 +95,7 @@ func (aw *ActivityWorker) runPoll(ctx context.Context) {
9495
}
9596
}
9697

97-
func (aw *ActivityWorker) runDispatcher(ctx context.Context) {
98+
func (aw *ActivityWorker) runDispatcher() {
9899
var sem chan struct{}
99100
if aw.options.MaxParallelActivityTasks > 0 {
100101
sem = make(chan struct{}, aw.options.MaxParallelActivityTasks)
@@ -107,7 +108,6 @@ func (aw *ActivityWorker) runDispatcher(ctx context.Context) {
107108

108109
task := task
109110

110-
aw.wg.Add(1)
111111
go func() {
112112
defer aw.wg.Done()
113113

internal/worker/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (ww *WorkflowWorker) runPoll(ctx context.Context) {
106106
}
107107

108108
func (ww *WorkflowWorker) runDispatcher() {
109-
var sem chan (struct{})
109+
var sem chan struct{}
110110

111111
if ww.options.MaxParallelWorkflowTasks > 0 {
112112
sem = make(chan struct{}, ww.options.MaxParallelWorkflowTasks)

0 commit comments

Comments
 (0)