Skip to content

Commit 4b44787

Browse files
committed
Change how workers wait for completion
1 parent d2b4631 commit 4b44787

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

internal/worker/activity.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ type ActivityWorker struct {
2525
activityTaskQueue chan *task.Activity
2626
activityTaskExecutor activity.Executor
2727

28-
wg *sync.WaitGroup
28+
wg sync.WaitGroup
29+
pollersWg sync.WaitGroup
2930

3031
clock clock.Clock
3132
}
@@ -39,14 +40,14 @@ func NewActivityWorker(backend backend.Backend, registry *workflow.Registry, clo
3940
activityTaskQueue: make(chan *task.Activity),
4041
activityTaskExecutor: activity.NewExecutor(backend.Logger(), backend.Tracer(), backend.Converter(), registry),
4142

42-
wg: &sync.WaitGroup{},
43-
4443
clock: clock,
4544
}
4645
}
4746

4847
func (aw *ActivityWorker) Start(ctx context.Context) error {
49-
for i := 0; i <= aw.options.ActivityPollers; i++ {
48+
aw.pollersWg.Add(aw.options.ActivityPollers)
49+
50+
for i := 0; i < aw.options.ActivityPollers; i++ {
5051
go aw.runPoll(ctx)
5152
}
5253

@@ -56,9 +57,12 @@ func (aw *ActivityWorker) Start(ctx context.Context) error {
5657
}
5758

5859
func (aw *ActivityWorker) WaitForCompletion() error {
59-
close(aw.activityTaskQueue)
60+
// Wait for task pollers to finish
61+
aw.pollersWg.Wait()
6062

63+
// Wait for tasks to finish
6164
aw.wg.Wait()
65+
close(aw.activityTaskQueue)
6266

6367
return nil
6468
}

internal/worker/workflow.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ type WorkflowWorker struct {
3131

3232
logger log.Logger
3333

34-
wg *sync.WaitGroup
34+
pollersWg sync.WaitGroup
35+
wg sync.WaitGroup
3536
}
3637

3738
func NewWorkflowWorker(backend backend.Backend, registry *workflow.Registry, options *Options) *WorkflowWorker {
@@ -53,13 +54,13 @@ func NewWorkflowWorker(backend backend.Backend, registry *workflow.Registry, opt
5354
cache: c,
5455

5556
logger: backend.Logger(),
56-
57-
wg: &sync.WaitGroup{},
5857
}
5958
}
6059

6160
func (ww *WorkflowWorker) Start(ctx context.Context) error {
62-
for i := 0; i <= ww.options.WorkflowPollers; i++ {
61+
ww.pollersWg.Add(ww.options.WorkflowPollers)
62+
63+
for i := 0; i < ww.options.WorkflowPollers; i++ {
6364
go ww.runPoll(ctx)
6465
}
6566

@@ -69,14 +70,19 @@ func (ww *WorkflowWorker) Start(ctx context.Context) error {
6970
}
7071

7172
func (ww *WorkflowWorker) WaitForCompletion() error {
72-
close(ww.workflowTaskQueue)
73+
// Wait for task pollers to finish
74+
ww.pollersWg.Wait()
7375

76+
// Wait for tasks to finish
7477
ww.wg.Wait()
78+
close(ww.workflowTaskQueue)
7579

7680
return nil
7781
}
7882

7983
func (ww *WorkflowWorker) runPoll(ctx context.Context) {
84+
defer ww.pollersWg.Done()
85+
8086
for {
8187
select {
8288
case <-ctx.Done():

0 commit comments

Comments
 (0)