Skip to content

Commit d63fec7

Browse files
committed
Better shutdown pattern
1 parent 9e9d1d7 commit d63fec7

File tree

2 files changed

+35
-39
lines changed

2 files changed

+35
-39
lines changed

internal/worker/activity.go

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (aw *activityWorker) Start(ctx context.Context) error {
5252
go aw.runPoll(ctx)
5353
}
5454

55-
go aw.runDispatcher(ctx)
55+
go aw.runDispatcher(context.Background())
5656

5757
return nil
5858
}
@@ -90,28 +90,25 @@ func (aw *activityWorker) runDispatcher(ctx context.Context) {
9090
sem = make(chan struct{}, aw.options.MaxParallelActivityTasks)
9191
}
9292

93-
for {
94-
select {
95-
case <-ctx.Done():
96-
return
97-
case task := <-aw.activityTaskQueue:
98-
if sem != nil {
99-
sem <- struct{}{}
100-
}
93+
for task := range aw.activityTaskQueue {
94+
if sem != nil {
95+
sem <- struct{}{}
96+
}
10197

102-
aw.wg.Add(1)
103-
go func() {
104-
defer aw.wg.Done()
98+
task := task
10599

106-
// Create new context to allow activities to complete when root context is canceled
107-
taskCtx := context.Background()
108-
aw.handleTask(taskCtx, task)
100+
aw.wg.Add(1)
101+
go func() {
102+
defer aw.wg.Done()
109103

110-
if sem != nil {
111-
<-sem
112-
}
113-
}()
114-
}
104+
// Create new context to allow activities to complete when root context is canceled
105+
taskCtx := context.Background()
106+
aw.handleTask(taskCtx, task)
107+
108+
if sem != nil {
109+
<-sem
110+
}
111+
}()
115112
}
116113
}
117114

internal/worker/workflow.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (ww *workflowWorker) Start(ctx context.Context) error {
6666
go ww.runPoll(ctx)
6767
}
6868

69-
go ww.runDispatcher(ctx)
69+
go ww.runDispatcher()
7070

7171
return nil
7272
}
@@ -99,33 +99,32 @@ func (ww *workflowWorker) runPoll(ctx context.Context) {
9999
}
100100
}
101101

102-
func (ww *workflowWorker) runDispatcher(ctx context.Context) {
102+
func (ww *workflowWorker) runDispatcher() {
103103
var sem chan (struct{})
104104

105105
if ww.options.MaxParallelWorkflowTasks > 0 {
106106
sem = make(chan struct{}, ww.options.MaxParallelWorkflowTasks)
107107
}
108108

109-
for {
110-
select {
111-
case <-ctx.Done():
112-
return
113-
case t := <-ww.workflowTaskQueue:
114-
if sem != nil {
115-
sem <- struct{}{}
116-
}
109+
for t := range ww.workflowTaskQueue {
110+
if sem != nil {
111+
sem <- struct{}{}
112+
}
117113

118-
ww.wg.Add(1)
119-
go func() {
120-
defer ww.wg.Done()
114+
t := t
121115

122-
ww.handle(ctx, t)
116+
ww.wg.Add(1)
117+
go func() {
118+
defer ww.wg.Done()
123119

124-
if sem != nil {
125-
<-sem
126-
}
127-
}()
128-
}
120+
// Create new context to allow workflows to complete when root context is canceled
121+
taskCtx := context.Background()
122+
ww.handle(taskCtx, t)
123+
124+
if sem != nil {
125+
<-sem
126+
}
127+
}()
129128
}
130129
}
131130

0 commit comments

Comments
 (0)