Skip to content

Commit 19583ac

Browse files
Wait for local activity while holding workflow context lock (#760)
Ensure query decisions are serialized with other decision processing including local activities. * move wait for local activity into process workflow task - holding the context lock * implement decision heartbeat logic while holding lock
1 parent d7f3686 commit 19583ac

10 files changed

+253
-173
lines changed

internal/internal_public.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import (
3434
)
3535

3636
type (
37+
decisionHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error)
38+
3739
// HistoryIterator iterator through history events
3840
HistoryIterator interface {
3941
// GetNextPage returns next page of history events
@@ -74,7 +76,10 @@ type (
7476
// - RespondDecisionTaskCompletedRequest
7577
// - RespondDecisionTaskFailedRequest
7678
// - RespondQueryTaskCompletedRequest
77-
ProcessWorkflowTask(task *workflowTask) (response interface{}, w WorkflowExecutionContext, err error)
79+
ProcessWorkflowTask(
80+
task *workflowTask,
81+
f decisionHeartbeatFunc,
82+
) (response interface{}, err error)
7883
}
7984

8085
// ActivityTaskHandler represents activity task handlers.

internal/internal_task_handlers.go

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ type (
148148
currentIndex int
149149
next []*s.HistoryEvent
150150
}
151+
152+
decisionHeartbeatError struct {
153+
Message string
154+
}
151155
)
152156

153157
func newHistory(task *workflowTask, eventsHandler *workflowExecutionEventHandlerImpl) *history {
@@ -161,6 +165,10 @@ func newHistory(task *workflowTask, eventsHandler *workflowExecutionEventHandler
161165
return result
162166
}
163167

168+
func (e decisionHeartbeatError) Error() string {
169+
return e.Message
170+
}
171+
164172
// Get workflow start event.
165173
func (eh *history) GetWorkflowStartedEvent() (*s.HistoryEvent, error) {
166174
events := eh.workflowTask.task.History.Events
@@ -545,8 +553,10 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *s.PollForDecisio
545553
return workflowContext, nil
546554
}
547555

548-
func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(task *s.PollForDecisionTaskResponse,
549-
historyIterator HistoryIterator) (workflowContext *workflowExecutionContextImpl, err error) {
556+
func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(
557+
task *s.PollForDecisionTaskResponse,
558+
historyIterator HistoryIterator,
559+
) (workflowContext *workflowExecutionContextImpl, err error) {
550560
metricsScope := wth.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName())
551561
defer func() {
552562
if err == nil && workflowContext != nil && workflowContext.laTunnel == nil {
@@ -631,9 +641,10 @@ func (w *workflowExecutionContextImpl) resetStateIfDestroyed(task *s.PollForDeci
631641
// ProcessWorkflowTask processes all the events of the workflow task.
632642
func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
633643
workflowTask *workflowTask,
634-
) (completeRequest interface{}, context WorkflowExecutionContext, errRet error) {
644+
heartbeatFunc decisionHeartbeatFunc,
645+
) (completeRequest interface{}, errRet error) {
635646
if workflowTask == nil || workflowTask.task == nil {
636-
return nil, nil, errors.New("nil workflow task provided")
647+
return nil, errors.New("nil workflow task provided")
637648
}
638649
task := workflowTask.task
639650
if task.History == nil || len(task.History.Events) == 0 {
@@ -642,7 +653,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
642653
}
643654
}
644655
if task.Query == nil && len(task.History.Events) == 0 {
645-
return nil, nil, errors.New("nil or empty history")
656+
return nil, errors.New("nil or empty history")
646657
}
647658

648659
runID := task.WorkflowExecution.GetRunId()
@@ -657,15 +668,53 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
657668

658669
workflowContext, err := wth.getOrCreateWorkflowContext(task, workflowTask.historyIterator)
659670
if err != nil {
660-
return nil, nil, err
671+
return nil, err
661672
}
662673

663674
defer func() {
664675
workflowContext.Unlock(errRet)
665676
}()
666677

667-
response, err := workflowContext.ProcessWorkflowTask(workflowTask)
668-
return response, workflowContext, err
678+
var response interface{}
679+
process_Workflow_Loop:
680+
for {
681+
startTime := time.Now()
682+
response, err = workflowContext.ProcessWorkflowTask(workflowTask)
683+
if err == nil && response == nil {
684+
wait_LocalActivity_Loop:
685+
for {
686+
deadlineToTrigger := time.Duration(float32(ratioToForceCompleteDecisionTaskComplete) * float32(workflowContext.GetDecisionTimeout()))
687+
delayDuration := startTime.Add(deadlineToTrigger).Sub(time.Now())
688+
select {
689+
case <-time.After(delayDuration):
690+
// force complete, call the decision heartbeat function
691+
workflowTask, err = heartbeatFunc(
692+
workflowContext.CompleteDecisionTask(workflowTask, false),
693+
startTime,
694+
)
695+
if err != nil {
696+
return nil, &decisionHeartbeatError{Message: fmt.Sprintf("error sending decision heartbeat %v", err)}
697+
}
698+
if workflowTask == nil {
699+
return nil, nil
700+
}
701+
continue process_Workflow_Loop
702+
703+
case lar := <-workflowTask.laResultCh:
704+
// local activity result ready
705+
response, err = workflowContext.ProcessLocalActivityResult(workflowTask, lar)
706+
if err == nil && response == nil {
707+
// decision task is not done yet, still waiting for more local activities
708+
continue wait_LocalActivity_Loop
709+
}
710+
break process_Workflow_Loop
711+
}
712+
}
713+
} else {
714+
break process_Workflow_Loop
715+
}
716+
}
717+
return response, err
669718
}
670719

671720
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
@@ -956,10 +1005,6 @@ func (w *workflowExecutionContextImpl) skipReplayCheck() bool {
9561005
return w.currentDecisionTask.Query != nil || !isFullHistory(w.currentDecisionTask.History)
9571006
}
9581007

959-
func (w *workflowExecutionContextImpl) GetCurrentDecisionTask() *s.PollForDecisionTaskResponse {
960-
return w.currentDecisionTask
961-
}
962-
9631008
func (w *workflowExecutionContextImpl) SetCurrentTask(task *s.PollForDecisionTaskResponse) {
9641009
w.currentDecisionTask = task
9651010
// do not update the previousStartedEventID for query task
@@ -993,13 +1038,6 @@ func (w *workflowExecutionContextImpl) GetDecisionTimeout() time.Duration {
9931038
return time.Second * time.Duration(w.workflowInfo.TaskStartToCloseTimeoutSeconds)
9941039
}
9951040

996-
func (w *workflowExecutionContextImpl) StackTrace() string {
997-
if w.eventHandler == nil {
998-
return "eventHandler is closed"
999-
}
1000-
return w.eventHandler.StackTrace()
1001-
}
1002-
10031041
func skipDeterministicCheckForDecision(d *s.Decision) bool {
10041042
if d.GetDecisionType() == s.DecisionTypeRecordMarker {
10051043
markerName := d.RecordMarkerDecisionAttributes.GetMarkerName()

internal/internal_task_handlers_interfaces_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
package internal
2222

2323
import (
24+
"testing"
25+
2426
"github.com/golang/mock/gomock"
2527
"github.com/stretchr/testify/suite"
2628
"github.com/uber/tchannel-go/thrift"
2729
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
2830
m "go.uber.org/cadence/.gen/go/shared"
2931
"go.uber.org/cadence/internal/common"
3032
"golang.org/x/net/context"
31-
"testing"
3233
)
3334

3435
type (
@@ -44,10 +45,12 @@ type sampleWorkflowTaskHandler struct {
4445
}
4546

4647
func (wth sampleWorkflowTaskHandler) ProcessWorkflowTask(
47-
workflowTask *workflowTask) (interface{}, WorkflowExecutionContext, error) {
48+
workflowTask *workflowTask,
49+
d decisionHeartbeatFunc,
50+
) (interface{}, error) {
4851
return &m.RespondDecisionTaskCompletedRequest{
4952
TaskToken: workflowTask.task.TaskToken,
50-
}, nil, nil
53+
}, nil
5154
}
5255

5356
func newSampleWorkflowTaskHandler() *sampleWorkflowTaskHandler {
@@ -104,7 +107,7 @@ func (s *PollLayerInterfacesTestSuite) TestProcessWorkflowTaskInterface() {
104107

105108
// Process task and respond to the service.
106109
taskHandler := newSampleWorkflowTaskHandler()
107-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: response})
110+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: response}, nil)
108111
completionRequest := request.(*m.RespondDecisionTaskCompletedRequest)
109112
s.NoError(err)
110113

internal/internal_task_handlers_test.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ func (t *TaskHandlersTestSuite) testWorkflowTaskWorkflowExecutionStartedHelper(p
234234
}
235235
task := createWorkflowTask(testEvents, 0, "HelloWorld_Workflow")
236236
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
237-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
237+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
238238
response := request.(*s.RespondDecisionTaskCompletedRequest)
239239
t.NoError(err)
240240
t.NotNil(response)
@@ -286,7 +286,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
286286
Logger: t.logger,
287287
}
288288
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
289-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
289+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
290290
response := request.(*s.RespondDecisionTaskCompletedRequest)
291291

292292
t.NoError(err)
@@ -297,7 +297,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
297297

298298
// Schedule an activity and see if we complete workflow, Having only one last decision.
299299
task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow")
300-
request, _, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
300+
request, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
301301
response = request.(*s.RespondDecisionTaskCompletedRequest)
302302
t.NoError(err)
303303
t.NotNil(response)
@@ -337,7 +337,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() {
337337
task := createWorkflowTask(testEvents[0:1], 0, "HelloWorld_Workflow")
338338
task.StartedEventId = common.Int64Ptr(1)
339339
task.WorkflowExecution = execution
340-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
340+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
341341
response := request.(*s.RespondDecisionTaskCompletedRequest)
342342
t.NoError(err)
343343
t.NotNil(response)
@@ -348,7 +348,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_Sticky() {
348348
// then check the current state using query task
349349
task = createQueryTask([]*s.HistoryEvent{}, 6, "HelloWorld_Workflow", "test-query")
350350
task.WorkflowExecution = execution
351-
queryResp, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
351+
queryResp, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
352352
t.NoError(err)
353353
t.verifyQueryResult(queryResp, "waiting-activity-result")
354354
}
@@ -380,30 +380,30 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() {
380380
// query after first decision task (notice the previousStartEventID is always the last eventID for query task)
381381
task := createQueryTask(testEvents[0:3], 3, "HelloWorld_Workflow", "test-query")
382382
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
383-
response, _, _ := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
383+
response, _ := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
384384
t.verifyQueryResult(response, "waiting-activity-result")
385385

386386
// query after activity task complete but before second decision task started
387387
task = createQueryTask(testEvents[0:7], 7, "HelloWorld_Workflow", "test-query")
388388
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
389-
response, _, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
389+
response, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
390390
t.verifyQueryResult(response, "waiting-activity-result")
391391

392392
// query after second decision task
393393
task = createQueryTask(testEvents[0:8], 8, "HelloWorld_Workflow", "test-query")
394394
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
395-
response, _, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
395+
response, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
396396
t.verifyQueryResult(response, "done")
397397

398398
// query after second decision task with extra events
399399
task = createQueryTask(testEvents[0:9], 9, "HelloWorld_Workflow", "test-query")
400400
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
401-
response, _, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
401+
response, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
402402
t.verifyQueryResult(response, "done")
403403

404404
task = createQueryTask(testEvents[0:9], 9, "HelloWorld_Workflow", "invalid-query-type")
405405
taskHandler = newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
406-
response, _, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
406+
response, _ = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
407407
t.NotNil(response)
408408
queryResp, ok := response.(*s.RespondQueryTaskCompletedRequest)
409409
t.True(ok)
@@ -451,7 +451,7 @@ func (t *TaskHandlersTestSuite) TestCacheEvictionWhenErrorOccurs() {
451451
// newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask()
452452
// will fail as it can't find laTunnel in getWorkflowCache().
453453
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, make(chan struct{}))
454-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
454+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
455455

456456
t.Error(err)
457457
t.Nil(request)
@@ -507,7 +507,7 @@ func (t *TaskHandlersTestSuite) testSideEffectDeferHelper(disableSticky bool) {
507507

508508
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
509509
task := createWorkflowTask(testEvents, 0, workflowName)
510-
_, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
510+
_, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
511511
t.Nil(err)
512512

513513
if !params.DisableStickyExecution {
@@ -549,7 +549,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
549549
}
550550

551551
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
552-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
552+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
553553
response := request.(*s.RespondDecisionTaskCompletedRequest)
554554
// there should be no error as the history events matched the decisions.
555555
t.NoError(err)
@@ -561,7 +561,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
561561
// newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask()
562562
// will fail as it can't find laTunnel in getWorkflowCache().
563563
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, stopC)
564-
request, _, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
564+
request, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
565565
t.Error(err)
566566
t.Nil(request)
567567
t.Contains(err.Error(), "nondeterministic")
@@ -571,7 +571,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
571571
params.NonDeterministicWorkflowPolicy = NonDeterministicWorkflowPolicyFailWorkflow
572572
failOnNondeterminismTaskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
573573
task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow")
574-
request, _, err = failOnNondeterminismTaskHandler.ProcessWorkflowTask(&workflowTask{task: task})
574+
request, err = failOnNondeterminismTaskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
575575
// When FailWorkflow policy is set, task handler does not return an error,
576576
// because it will indicate non determinism in the request.
577577
t.NoError(err)
@@ -589,7 +589,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
589589
// now with different package name to activity type
590590
testEvents[4].ActivityTaskScheduledEventAttributes.ActivityType.Name = common.StringPtr("new-package.Greeter_Activity")
591591
task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow")
592-
request, _, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
592+
request, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
593593
t.NoError(err)
594594
t.NotNil(request)
595595
}
@@ -610,7 +610,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowReturnsPanicError() {
610610
}
611611

612612
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
613-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
613+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
614614
t.NoError(err)
615615
t.NotNil(request)
616616
r, ok := request.(*s.RespondDecisionTaskCompletedRequest)
@@ -638,7 +638,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowPanics() {
638638
}
639639

640640
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
641-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
641+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
642642
t.NoError(err)
643643
t.NotNil(request)
644644
r, ok := request.(*s.RespondDecisionTaskFailedRequest)
@@ -690,7 +690,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() {
690690
}
691691

692692
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
693-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
693+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
694694
t.NoError(err)
695695
t.NotNil(request)
696696
r, ok := request.(*s.RespondDecisionTaskCompletedRequest)
@@ -728,11 +728,10 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_CancelActivityBeforeSent() {
728728
Logger: t.logger,
729729
}
730730
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
731-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task})
731+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
732732
response := request.(*s.RespondDecisionTaskCompletedRequest)
733733
t.NoError(err)
734734
t.NotNil(response)
735-
//t.printAllDecisions(response.Decisions)
736735
t.Equal(1, len(response.Decisions))
737736
t.Equal(s.DecisionTypeCompleteWorkflowExecution, response.Decisions[0].GetDecisionType())
738737
t.NotNil(response.Decisions[0].CompleteWorkflowExecutionDecisionAttributes)
@@ -764,7 +763,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_PageToken() {
764763
},
765764
}
766765
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
767-
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task, historyIterator: historyIterator})
766+
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task, historyIterator: historyIterator}, nil)
768767
response := request.(*s.RespondDecisionTaskCompletedRequest)
769768
t.NoError(err)
770769
t.NotNil(response)

0 commit comments

Comments
 (0)