Skip to content

Commit e5081b0

Browse files
yycpttyux0
authored andcommitted
Fix potential go-routine leak when retrying local activity (#918)
* Add stopCh to local activity tunnel to fix go-routine leak
1 parent 137ca72 commit e5081b0

File tree

5 files changed

+40
-22
lines changed

5 files changed

+40
-22
lines changed

internal/internal_task_handlers.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -976,7 +976,10 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu
976976
}
977977

978978
lar.task.attempt++
979-
w.laTunnel.sendTask(lar.task)
979+
980+
if !w.laTunnel.sendTask(lar.task) {
981+
lar.task.attempt--
982+
}
980983
})
981984
return true
982985
}
@@ -1055,13 +1058,18 @@ func (w *workflowExecutionContextImpl) CompleteDecisionTask(workflowTask *workfl
10551058
if w.hasPendingLocalActivityWork() && w.laTunnel != nil {
10561059
if len(eventHandler.unstartedLaTasks) > 0 {
10571060
// start new local activity tasks
1061+
unstartedLaTasks := make(map[string]struct{})
10581062
for activityID := range eventHandler.unstartedLaTasks {
10591063
task := eventHandler.pendingLaTasks[activityID]
10601064
task.wc = w
10611065
task.workflowTask = workflowTask
1062-
w.laTunnel.sendTask(task)
1066+
if !w.laTunnel.sendTask(task) {
1067+
unstartedLaTasks[activityID] = struct{}{}
1068+
task.wc = nil
1069+
task.workflowTask = nil
1070+
}
10631071
}
1064-
eventHandler.unstartedLaTasks = make(map[string]struct{})
1072+
eventHandler.unstartedLaTasks = unstartedLaTasks
10651073
}
10661074
// cannot complete decision task as there are pending local activities
10671075
if waitLocalActivities {

internal/internal_task_handlers_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,18 +1106,18 @@ func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() {
11061106
}
11071107

11081108
task := createWorkflowTask(testEvents, 0, "RetryLocalActivityWorkflow")
1109+
stopCh := make(chan struct{})
11091110
params := workerExecutionParameters{
1110-
TaskList: testWorkflowTaskTasklist,
1111-
Identity: "test-id-1",
1112-
Logger: t.logger,
1113-
Tracer: opentracing.NoopTracer{},
1111+
TaskList: testWorkflowTaskTasklist,
1112+
Identity: "test-id-1",
1113+
Logger: t.logger,
1114+
Tracer: opentracing.NoopTracer{},
1115+
WorkerStopChannel: stopCh,
11141116
}
1117+
defer close(stopCh)
11151118

11161119
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
1117-
laTunnel := &localActivityTunnel{
1118-
taskCh: make(chan *localActivityTask, 1000),
1119-
resultCh: make(chan interface{}),
1120-
}
1120+
laTunnel := newLocalActivityTunnel(params.WorkerStopChannel)
11211121
taskHandlerImpl, ok := taskHandler.(*workflowTaskHandlerImpl)
11221122
t.True(ok)
11231123
taskHandlerImpl.laTunnel = laTunnel

internal/internal_task_pollers.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,20 +132,34 @@ type (
132132
localActivityTunnel struct {
133133
taskCh chan *localActivityTask
134134
resultCh chan interface{}
135+
stopCh <-chan struct{}
135136
}
136137
)
137138

138-
func (lat *localActivityTunnel) getTask(stopC <-chan struct{}) *localActivityTask {
139+
func newLocalActivityTunnel(stopCh <-chan struct{}) *localActivityTunnel {
140+
return &localActivityTunnel{
141+
taskCh: make(chan *localActivityTask, 1000),
142+
resultCh: make(chan interface{}),
143+
stopCh: stopCh,
144+
}
145+
}
146+
147+
func (lat *localActivityTunnel) getTask() *localActivityTask {
139148
select {
140149
case task := <-lat.taskCh:
141150
return task
142-
case <-stopC:
151+
case <-lat.stopCh:
143152
return nil
144153
}
145154
}
146155

147-
func (lat *localActivityTunnel) sendTask(task *localActivityTask) {
148-
lat.taskCh <- task
156+
func (lat *localActivityTunnel) sendTask(task *localActivityTask) bool {
157+
select {
158+
case lat.taskCh <- task:
159+
return true
160+
case <-lat.stopCh:
161+
return false
162+
}
149163
}
150164

151165
func isClientSideError(err error) bool {
@@ -414,7 +428,7 @@ func newLocalActivityPoller(params workerExecutionParameters, laTunnel *localAct
414428
}
415429

416430
func (latp *localActivityTaskPoller) PollTask() (interface{}, error) {
417-
return latp.laTunnel.getTask(latp.shutdownC), nil
431+
return latp.laTunnel.getTask(), nil
418432
}
419433

420434
func (latp *localActivityTaskPoller) ProcessTask(task interface{}) error {

internal/internal_worker.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,7 @@ func newWorkflowTaskWorkerInternal(
308308
)
309309

310310
// laTunnel is the glue that hookup 3 parts
311-
laTunnel := &localActivityTunnel{
312-
taskCh: make(chan *localActivityTask, 1000),
313-
resultCh: make(chan interface{}),
314-
}
311+
laTunnel := newLocalActivityTunnel(params.WorkerStopChannel)
315312

316313
// 1) workflow handler will send local activity task to laTunnel
317314
if handlerImpl, ok := taskHandler.(*workflowTaskHandlerImpl); ok {

test/integration_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,8 @@ func (ts *IntegrationTestSuite) TearDownSuite() {
112112
if last != nil {
113113
ts.NoError(last)
114114
return
115-
} else {
116-
ts.FailNow("leaks timed out but no error, should be impossible")
117115
}
116+
ts.FailNow("leaks timed out but no error, should be impossible")
118117
case <-time.After(time.Second):
119118
// https://github.com/uber-go/cadence-client/issues/739
120119
last = goleak.FindLeaks(goleak.IgnoreTopFunction("go.uber.org/cadence/internal.(*coroutineState).initialYield"))

0 commit comments

Comments
 (0)