Skip to content

Commit 252a76b

Browse files
yycpttyux0
authored andcommitted
Fix potential go-routine leak when retrying local activity (#918)
* Add stopCh to local activity tunnel to fix go-routine leak
1 parent 19df4b8 commit 252a76b

File tree

4 files changed

+39
-20
lines changed

4 files changed

+39
-20
lines changed

internal/internal_task_handlers.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,10 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu
933933
}
934934

935935
lar.task.attempt++
936-
w.laTunnel.sendTask(lar.task)
936+
937+
if !w.laTunnel.sendTask(lar.task) {
938+
lar.task.attempt--
939+
}
937940
})
938941
return true
939942
}
@@ -1012,13 +1015,18 @@ func (w *workflowExecutionContextImpl) CompleteDecisionTask(workflowTask *workfl
10121015
if w.hasPendingLocalActivityWork() && w.laTunnel != nil {
10131016
if len(eventHandler.unstartedLaTasks) > 0 {
10141017
// start new local activity tasks
1018+
unstartedLaTasks := make(map[string]struct{})
10151019
for activityID := range eventHandler.unstartedLaTasks {
10161020
task := eventHandler.pendingLaTasks[activityID]
10171021
task.wc = w
10181022
task.workflowTask = workflowTask
1019-
w.laTunnel.sendTask(task)
1023+
if !w.laTunnel.sendTask(task) {
1024+
unstartedLaTasks[activityID] = struct{}{}
1025+
task.wc = nil
1026+
task.workflowTask = nil
1027+
}
10201028
}
1021-
eventHandler.unstartedLaTasks = make(map[string]struct{})
1029+
eventHandler.unstartedLaTasks = unstartedLaTasks
10221030
}
10231031
// cannot complete decision task as there are pending local activities
10241032
if waitLocalActivities {

internal/internal_task_handlers_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -884,18 +884,18 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() {
884884
}
885885

886886
task := createWorkflowTask(testEvents, 0, "RetryLocalActivityWorkflow")
887+
stopCh := make(chan struct{})
887888
params := workerExecutionParameters{
888-
TaskList: testWorkflowTaskTasklist,
889-
Identity: "test-id-1",
890-
Logger: t.logger,
891-
Tracer: opentracing.NoopTracer{},
889+
TaskList: testWorkflowTaskTasklist,
890+
Identity: "test-id-1",
891+
Logger: t.logger,
892+
Tracer: opentracing.NoopTracer{},
893+
WorkerStopChannel: stopCh,
892894
}
895+
defer close(stopCh)
893896

894897
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
895-
laTunnel := &localActivityTunnel{
896-
taskCh: make(chan *localActivityTask, 1000),
897-
resultCh: make(chan interface{}),
898-
}
898+
laTunnel := newLocalActivityTunnel(params.WorkerStopChannel)
899899
taskHandlerImpl, ok := taskHandler.(*workflowTaskHandlerImpl)
900900
t.True(ok)
901901
taskHandlerImpl.laTunnel = laTunnel

internal/internal_task_pollers.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,20 +130,34 @@ type (
130130
localActivityTunnel struct {
131131
taskCh chan *localActivityTask
132132
resultCh chan interface{}
133+
stopCh <-chan struct{}
133134
}
134135
)
135136

136-
func (lat *localActivityTunnel) getTask(stopC <-chan struct{}) *localActivityTask {
137+
func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel {
138+
return &localActivityTunnel{
139+
taskCh: make(chan *localActivityTask, 1000),
140+
resultCh: make(chan interface{}),
141+
stopCh: stopCh,
142+
}
143+
}
144+
145+
func (lat *localActivityTunnel) getTask() *localActivityTask {
137146
select {
138147
case task := <-lat.taskCh:
139148
return task
140-
case <-stopC:
149+
case <-lat.stopCh:
141150
return nil
142151
}
143152
}
144153

145-
func (lat *localActivityTunnel) sendTask(task *localActivityTask) {
146-
lat.taskCh <- task
154+
func (lat *localActivityTunnel) sendTask(task *localActivityTask) bool {
155+
select {
156+
case lat.taskCh <- task:
157+
return true
158+
case <-lat.stopCh:
159+
return false
160+
}
147161
}
148162

149163
func isClientSideError(err error) bool {
@@ -408,7 +422,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
408422
}
409423

410424
func (latp *localActivityTaskPoller) PollTask() (interface{}, error) {
411-
return latp.laTunnel.getTask(latp.shutdownC), nil
425+
return latp.laTunnel.getTask(), nil
412426
}
413427

414428
func (latp *localActivityTaskPoller) ProcessTask(task interface{}) error {

internal/internal_worker.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,7 @@ func newWorkflowTaskWorkerInternal(
308308
)
309309

310310
// laTunnel is the glue that hookup 3 parts
311-
laTunnel := &localActivityTunnel{
312-
taskCh: make(chan *localActivityTask, 1000),
313-
resultCh: make(chan interface{}),
314-
}
311+
laTunnel := newLocalActivityTunnel(params.WorkerStopChannel)
315312

316313
// 1) workflow handler will send local activity task to laTunnel
317314
if handlerImpl, ok := taskHandler.(*workflowTaskHandlerImpl); ok {

0 commit comments

Comments
 (0)