Skip to content

Commit 8b5a4e5

Browse files
yycpttyux0
authored andcommitted
Fix nil pointer exception when retrying local activity (#913)
1 parent 9bbb0d3 commit 8b5a4e5

File tree

2 files changed

+158
-23
lines changed

2 files changed

+158
-23
lines changed

internal/internal_task_handlers.go

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"reflect"
3232
"strings"
3333
"sync"
34+
"sync/atomic"
3435
"time"
3536

3637
"github.com/opentracing/opentracing-go"
@@ -91,7 +92,10 @@ type (
9192
workflowInfo *WorkflowInfo
9293
wth *workflowTaskHandlerImpl
9394

94-
eventHandler *workflowExecutionEventHandlerImpl
95+
// eventHandler is changed to a atomic.Value as a temporally bug fix for local activity
96+
// retry issue (github issue #915). Therefore, when accessing/modifying this field, the
97+
// mutex should still be held.
98+
eventHandler atomic.Value
9599

96100
isWorkflowCompleted bool
97101
result []byte
@@ -123,6 +127,7 @@ type (
123127
}
124128

125129
activityProvider func(name string) activity
130+
126131
// activityTaskHandlerImpl is the implementation of ActivityTaskHandler
127132
activityTaskHandlerImpl struct {
128133
taskListName string
@@ -401,6 +406,20 @@ func removeWorkflowContext(runID string) {
401406
getWorkflowCache().Delete(runID)
402407
}
403408

409+
func newWorkflowExecutionContext(
410+
startTime time.Time,
411+
workflowInfo *WorkflowInfo,
412+
taskHandler *workflowTaskHandlerImpl,
413+
) *workflowExecutionContextImpl {
414+
workflowContext := &workflowExecutionContextImpl{
415+
workflowStartTime: startTime,
416+
workflowInfo: workflowInfo,
417+
wth: taskHandler,
418+
}
419+
workflowContext.createEventHandler()
420+
return workflowContext
421+
}
422+
404423
func (w *workflowExecutionContextImpl) Lock() {
405424
w.mutex.Lock()
406425
}
@@ -421,6 +440,18 @@ func (w *workflowExecutionContextImpl) Unlock(err error) {
421440
w.mutex.Unlock()
422441
}
423442

443+
func (w *workflowExecutionContextImpl) getEventHandler() *workflowExecutionEventHandlerImpl {
444+
eventHandler := w.eventHandler.Load()
445+
if eventHandler == nil {
446+
return nil
447+
}
448+
eventHandlerImpl, ok := eventHandler.(*workflowExecutionEventHandlerImpl)
449+
if !ok {
450+
panic("unknown type for workflow execution event handler")
451+
}
452+
return eventHandlerImpl
453+
}
454+
424455
func (w *workflowExecutionContextImpl) completeWorkflow(result []byte, err error) {
425456
w.isWorkflowCompleted = true
426457
w.result = result
@@ -453,7 +484,7 @@ func (w *workflowExecutionContextImpl) onEviction() {
453484
}
454485

455486
func (w *workflowExecutionContextImpl) IsDestroyed() bool {
456-
return w.eventHandler == nil
487+
return w.getEventHandler() == nil
457488
}
458489

459490
func (w *workflowExecutionContextImpl) queueResetStickinessTask() {
@@ -479,17 +510,19 @@ func (w *workflowExecutionContextImpl) clearState() {
479510
w.err = nil
480511
w.previousStartedEventID = 0
481512
w.newDecisions = nil
482-
if w.eventHandler != nil {
513+
514+
eventHandler := w.getEventHandler()
515+
if eventHandler != nil {
483516
// Set isReplay to true to prevent user code in defer guarded by !isReplaying() from running
484-
w.eventHandler.isReplay = true
485-
w.eventHandler.Close()
486-
w.eventHandler = nil
517+
eventHandler.isReplay = true
518+
eventHandler.Close()
519+
w.eventHandler.Store((*workflowExecutionEventHandlerImpl)(nil))
487520
}
488521
}
489522

490523
func (w *workflowExecutionContextImpl) createEventHandler() {
491524
w.clearState()
492-
w.eventHandler = newWorkflowExecutionEventHandler(
525+
eventHandler := newWorkflowExecutionEventHandler(
493526
w.workflowInfo,
494527
w.completeWorkflow,
495528
w.wth.logger,
@@ -499,7 +532,8 @@ func (w *workflowExecutionContextImpl) createEventHandler() {
499532
w.wth.dataConverter,
500533
w.wth.contextPropagators,
501534
w.wth.tracer,
502-
).(*workflowExecutionEventHandlerImpl)
535+
)
536+
w.eventHandler.Store(eventHandler)
503537
}
504538

505539
func resetHistory(task *s.PollForDecisionTaskResponse, historyIterator HistoryIterator) (*s.History, error) {
@@ -555,10 +589,7 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *s.PollForDecisio
555589
}
556590

557591
wfStartTime := time.Unix(0, h.Events[0].GetTimestamp())
558-
workflowContext := &workflowExecutionContextImpl{workflowStartTime: wfStartTime, workflowInfo: workflowInfo, wth: wth}
559-
workflowContext.createEventHandler()
560-
561-
return workflowContext, nil
592+
return newWorkflowExecutionContext(wfStartTime, workflowInfo, wth), nil
562593
}
563594

564595
func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(
@@ -733,7 +764,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
733764
}
734765
w.SetCurrentTask(task)
735766

736-
eventHandler := w.eventHandler
767+
eventHandler := w.getEventHandler()
737768
reorderedHistory := newHistory(workflowTask, eventHandler)
738769
var replayDecisions []*s.Decision
739770
var respondEvents []*s.HistoryEvent
@@ -870,7 +901,7 @@ func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(workflowTask *
870901
return nil, nil // nothing to do here as we are retrying...
871902
}
872903

873-
err := w.eventHandler.ProcessLocalActivityResult(lar)
904+
err := w.getEventHandler().ProcessLocalActivityResult(lar)
874905
if err != nil {
875906
return nil, err
876907
}
@@ -887,7 +918,17 @@ func (w *workflowExecutionContextImpl) retryLocalActivity(lar *localActivityResu
887918
if backoff > 0 && backoff <= w.GetDecisionTimeout() {
888919
// we need a local retry
889920
time.AfterFunc(backoff, func() {
890-
if _, ok := w.eventHandler.pendingLaTasks[lar.task.activityID]; !ok {
921+
// TODO: this should not be a separate goroutine as it introduces race condition when accessing eventHandler.
922+
// currently this is solved by changing eventHandler to an atomic.Value. Ideally, this retry timer should be
923+
// part of the event loop for processing the workflow task.
924+
eventHandler := w.getEventHandler()
925+
926+
// if decision heartbeat failed, the workflow execution context will be cleared and eventHandler will be nil
927+
if eventHandler == nil {
928+
return
929+
}
930+
931+
if _, ok := eventHandler.pendingLaTasks[lar.task.activityID]; !ok {
891932
return
892933
}
893934

@@ -964,42 +1005,45 @@ func (w *workflowExecutionContextImpl) CompleteDecisionTask(workflowTask *workfl
9641005
if w.currentDecisionTask == nil {
9651006
return nil
9661007
}
1008+
eventHandler := w.getEventHandler()
1009+
9671010
// w.laTunnel could be nil for worker.ReplayHistory() because there is no worker started, in that case we don't
9681011
// care about the pending local activities, and just return because the result is ignored anyway by the caller.
9691012
if w.hasPendingLocalActivityWork() && w.laTunnel != nil {
970-
if len(w.eventHandler.unstartedLaTasks) > 0 {
1013+
if len(eventHandler.unstartedLaTasks) > 0 {
9711014
// start new local activity tasks
972-
for activityID := range w.eventHandler.unstartedLaTasks {
973-
task := w.eventHandler.pendingLaTasks[activityID]
1015+
for activityID := range eventHandler.unstartedLaTasks {
1016+
task := eventHandler.pendingLaTasks[activityID]
9741017
task.wc = w
9751018
task.workflowTask = workflowTask
9761019
w.laTunnel.sendTask(task)
9771020
}
978-
w.eventHandler.unstartedLaTasks = make(map[string]struct{})
1021+
eventHandler.unstartedLaTasks = make(map[string]struct{})
9791022
}
9801023
// cannot complete decision task as there are pending local activities
9811024
if waitLocalActivities {
9821025
return nil
9831026
}
9841027
}
9851028

986-
eventDecisions := w.eventHandler.decisionsHelper.getDecisions(true)
1029+
eventDecisions := eventHandler.decisionsHelper.getDecisions(true)
9871030
if len(eventDecisions) > 0 {
9881031
w.newDecisions = append(w.newDecisions, eventDecisions...)
9891032
}
9901033

991-
completeRequest := w.wth.completeWorkflow(w.eventHandler, w.currentDecisionTask, w, w.newDecisions, !waitLocalActivities)
1034+
completeRequest := w.wth.completeWorkflow(eventHandler, w.currentDecisionTask, w, w.newDecisions, !waitLocalActivities)
9921035
w.clearCurrentTask()
9931036

9941037
return completeRequest
9951038
}
9961039

9971040
func (w *workflowExecutionContextImpl) hasPendingLocalActivityWork() bool {
1041+
eventHandler := w.getEventHandler()
9981042
return !w.isWorkflowCompleted &&
9991043
w.currentDecisionTask != nil &&
10001044
w.currentDecisionTask.Query == nil && // don't run local activity for query task
1001-
w.eventHandler != nil &&
1002-
len(w.eventHandler.pendingLaTasks) > 0
1045+
eventHandler != nil &&
1046+
len(eventHandler.pendingLaTasks) > 0
10031047
}
10041048

10051049
func (w *workflowExecutionContextImpl) clearCurrentTask() {

internal/internal_task_handlers_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,97 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_PageToken() {
843843
t.NotNil(response)
844844
}
845845

846+
func (t *TaskHandlersTestSuite) TestLocalActivityRetry_DecisionHeartbeatFail() {
847+
backoffIntervalInSeconds := int32(1)
848+
backoffDuration := time.Second * time.Duration(backoffIntervalInSeconds)
849+
workflowComplete := false
850+
851+
retryLocalActivityWorkflowFunc := func(ctx Context, intput []byte) error {
852+
ao := LocalActivityOptions{
853+
ScheduleToCloseTimeout: time.Minute,
854+
RetryPolicy: &RetryPolicy{
855+
InitialInterval: backoffDuration,
856+
BackoffCoefficient: 1.1,
857+
MaximumInterval: time.Minute,
858+
ExpirationInterval: time.Minute,
859+
},
860+
}
861+
ctx = WithLocalActivityOptions(ctx, ao)
862+
863+
err := ExecuteLocalActivity(ctx, func() error {
864+
return errors.New("some random error")
865+
}).Get(ctx, nil)
866+
workflowComplete = true
867+
return err
868+
}
869+
RegisterWorkflowWithOptions(
870+
retryLocalActivityWorkflowFunc,
871+
RegisterWorkflowOptions{Name: "RetryLocalActivityWorkflow"},
872+
)
873+
874+
decisionTaskStartedEvent := createTestEventDecisionTaskStarted(3)
875+
decisionTaskStartedEvent.Timestamp = common.Int64Ptr(time.Now().UnixNano())
876+
testEvents := []*s.HistoryEvent{
877+
createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{
878+
// make sure the timeout is same as the backoff interval
879+
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(backoffIntervalInSeconds),
880+
TaskList: &s.TaskList{Name: &testWorkflowTaskTasklist}},
881+
),
882+
createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{}),
883+
decisionTaskStartedEvent,
884+
}
885+
886+
task := createWorkflowTask(testEvents, 0, "RetryLocalActivityWorkflow")
887+
params := workerExecutionParameters{
888+
TaskList: testWorkflowTaskTasklist,
889+
Identity: "test-id-1",
890+
Logger: t.logger,
891+
Tracer: opentracing.NoopTracer{},
892+
}
893+
894+
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
895+
laTunnel := &localActivityTunnel{
896+
taskCh: make(chan *localActivityTask, 1000),
897+
resultCh: make(chan interface{}),
898+
}
899+
taskHandlerImpl, ok := taskHandler.(*workflowTaskHandlerImpl)
900+
t.True(ok)
901+
taskHandlerImpl.laTunnel = laTunnel
902+
903+
laTaskPoller := newLocalActivityPoller(params, laTunnel)
904+
doneCh := make(chan struct{})
905+
go func() {
906+
// laTaskPoller needs to poll the local activity and process it
907+
task, err := laTaskPoller.PollTask()
908+
t.NoError(err)
909+
err = laTaskPoller.ProcessTask(task)
910+
t.NoError(err)
911+
912+
// before clearing workflow state, a reset sticky task will be sent to this chan,
913+
// drain the chan so that workflow state can be cleared
914+
<-laTunnel.resultCh
915+
916+
close(doneCh)
917+
}()
918+
919+
laResultCh := make(chan *localActivityResult)
920+
response, err := taskHandler.ProcessWorkflowTask(
921+
&workflowTask{
922+
task: task,
923+
laResultCh: laResultCh,
924+
},
925+
func(response interface{}, startTime time.Time) (*workflowTask, error) {
926+
return nil, &s.EntityNotExistsError{Message: "Decision task not found."}
927+
})
928+
t.Nil(response)
929+
t.Error(err)
930+
931+
// wait for the retry timer to fire
932+
time.Sleep(backoffDuration)
933+
t.False(workflowComplete)
934+
<-doneCh
935+
}
936+
846937
func (t *TaskHandlersTestSuite) TestHeartBeat_NoError() {
847938
mockCtrl := gomock.NewController(t.T())
848939
mockService := workflowservicetest.NewMockClient(mockCtrl)

0 commit comments

Comments
 (0)