Skip to content

Commit 16d20b1

Browse files
authored
Truncate history for query based on nextEventID (#1096)
1 parent 97f887f commit 16d20b1

File tree

8 files changed

+289
-28
lines changed

8 files changed

+289
-28
lines changed

.gen/go/shared/shared.go

Lines changed: 48 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

idls

Submodule idls updated from 9ef8076 to 68a720e

internal/common/metrics/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const (
4242
DecisionPollNoTaskCounter = CadenceMetricsPrefix + "decision-poll-no-task"
4343
DecisionPollSucceedCounter = CadenceMetricsPrefix + "decision-poll-succeed"
4444
DecisionPollLatency = CadenceMetricsPrefix + "decision-poll-latency" // measure succeed poll request latency
45+
DecisionPollInvalidCounter = CadenceMetricsPrefix + "decision-poll-invalid"
4546
DecisionScheduledToStartLatency = CadenceMetricsPrefix + "decision-scheduled-to-start-latency"
4647
DecisionExecutionFailedCounter = CadenceMetricsPrefix + "decision-execution-failed"
4748
DecisionExecutionLatency = CadenceMetricsPrefix + "decision-execution-latency"

internal/internal_task_handlers.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,11 @@ func newHistory(task *workflowTask, eventsHandler *workflowExecutionEventHandler
169169
eventsHandler: eventsHandler,
170170
loadedEvents: task.task.History.Events,
171171
currentIndex: 0,
172-
lastEventID: task.task.GetStartedEventId(),
172+
// don't set lastEventID to task.GetNextEventId()
173+
// as for sticky query, the history in workflow task will be empty
174+
// and query will be run based on existing workflow state.
175+
// so the sanity check in verifyAllEventsProcessed will fail
176+
lastEventID: task.task.GetStartedEventId(),
173177
}
174178
if len(result.loadedEvents) > 0 {
175179
result.nextEventID = result.loadedEvents[0].GetEventId()

internal/internal_task_handlers_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,9 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_QueryWorkflow_NonSticky() {
487487
Logger: t.logger,
488488
}
489489

490+
// TODO: the following comment is not true, previousStartEventID is either 0 or points to a valid decisionTaskStartedID
491+
// we need to fix the test
492+
//
490493
// query after first decision task (notice the previousStartEventID is always the last eventID for query task)
491494
task := createQueryTask(testEvents[0:3], 3, "HelloWorld_Workflow", queryType)
492495
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, t.registry)

internal/internal_task_pollers.go

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,14 @@ type (
105105
}
106106

107107
historyIteratorImpl struct {
108-
iteratorFunc func(nextPageToken []byte) (*s.History, []byte, error)
109-
execution *s.WorkflowExecution
110-
nextPageToken []byte
111-
domain string
112-
service workflowserviceclient.Interface
113-
metricsScope tally.Scope
114-
maxEventID int64
108+
iteratorFunc func(nextPageToken []byte) (*s.History, []byte, error)
109+
execution *s.WorkflowExecution
110+
nextPageToken []byte
111+
domain string
112+
service workflowserviceclient.Interface
113+
metricsScope tally.Scope
114+
startedEventID int64
115+
maxEventID int64
115116
}
116117

117118
localActivityTaskPoller struct {
@@ -798,13 +799,25 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) {
798799
}
799800

800801
func (wtp *workflowTaskPoller) toWorkflowTask(response *s.PollForDecisionTaskResponse) *workflowTask {
802+
startEventID := response.GetStartedEventId()
803+
nextEventID := response.GetNextEventId()
804+
if nextEventID != 0 &&
805+
startEventID != 0 &&
806+
nextEventID-1 != startEventID {
807+
wtp.logger.Warn("Invalid PollForDecisionTaskResponse, nextEventID doesn't match startedEventID",
808+
zap.Int64("StartedEventID", startEventID),
809+
zap.Int64("NextEventID", nextEventID),
810+
)
811+
wtp.metricsScope.Counter(metrics.DecisionPollInvalidCounter).Inc(1)
812+
}
801813
historyIterator := &historyIteratorImpl{
802-
nextPageToken: response.NextPageToken,
803-
execution: response.WorkflowExecution,
804-
domain: wtp.domain,
805-
service: wtp.service,
806-
metricsScope: wtp.metricsScope,
807-
maxEventID: response.GetStartedEventId(),
814+
nextPageToken: response.NextPageToken,
815+
execution: response.WorkflowExecution,
816+
domain: wtp.domain,
817+
service: wtp.service,
818+
metricsScope: wtp.metricsScope,
819+
startedEventID: startEventID,
820+
maxEventID: nextEventID - 1,
808821
}
809822
task := &workflowTask{
810823
task: response,
@@ -820,6 +833,7 @@ func (h *historyIteratorImpl) GetNextPage() (*s.History, error) {
820833
h.service,
821834
h.domain,
822835
h.execution,
836+
h.startedEventID,
823837
h.maxEventID,
824838
h.metricsScope)
825839
}
@@ -846,6 +860,7 @@ func newGetHistoryPageFunc(
846860
domain string,
847861
execution *s.WorkflowExecution,
848862
atDecisionTaskCompletedEventID int64,
863+
maxEventID int64,
849864
metricsScope tally.Scope,
850865
) func(nextPageToken []byte) (*s.History, []byte, error) {
851866
return func(nextPageToken []byte) (*s.History, []byte, error) {
@@ -885,9 +900,12 @@ func newGetHistoryPageFunc(
885900
h = resp.History
886901
}
887902

888-
size := len(h.Events)
889-
if size > 0 && atDecisionTaskCompletedEventID > 0 &&
890-
h.Events[size-1].GetEventId() > atDecisionTaskCompletedEventID {
903+
// TODO: is this check valid/useful? atDecisionTaskCompletedEventID is startedEventID in pollForDecisionTaskResponse and
904+
// - For decision tasks, since there's only one inflight decision task, there won't be any event after startEventID.
905+
// Those events will be buffered. If there're too many buffer events, the current decision will be failed and events passed
906+
// startEventID may be returned. In that case, the last event after truncation is still decision task started event not completed.
907+
// - For query tasks startEventID is not assigned so this check is never executed.
908+
if shouldTruncateHistory(h, atDecisionTaskCompletedEventID) {
891909
first := h.Events[0].GetEventId() // eventIds start from 1
892910
h.Events = h.Events[:atDecisionTaskCompletedEventID-first+1]
893911
if h.Events[len(h.Events)-1].GetEventType() != s.EventTypeDecisionTaskCompleted {
@@ -896,10 +914,29 @@ func newGetHistoryPageFunc(
896914
}
897915
return h, nil, nil
898916
}
917+
918+
// TODO: Apply the check to decision tasks (remove the last condition)
919+
// after validating maxEventID always equal to atDecisionTaskCompletedEventID (startedEventID).
920+
// For now only apply to query task to be safe.
921+
if shouldTruncateHistory(h, maxEventID) && isQueryTask(atDecisionTaskCompletedEventID) {
922+
first := h.Events[0].GetEventId()
923+
h.Events = h.Events[:maxEventID-first+1]
924+
return h, nil, nil
925+
}
926+
899927
return h, resp.NextPageToken, nil
900928
}
901929
}
902930

931+
func shouldTruncateHistory(h *s.History, maxEventID int64) bool {
932+
size := len(h.Events)
933+
return size > 0 && maxEventID > 0 && h.Events[size-1].GetEventId() > maxEventID
934+
}
935+
936+
func isQueryTask(atDecisionTaskCompletedEventID int64) bool {
937+
return atDecisionTaskCompletedEventID == 0
938+
}
939+
903940
func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserviceclient.Interface,
904941
domain string, params workerExecutionParameters) *activityTaskPoller {
905942

internal/internal_workers_test.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() {
304304
StartedEventId: common.Int64Ptr(3),
305305
History: &m.History{Events: testEvents[0:3]},
306306
NextPageToken: nil,
307+
NextEventId: common.Int64Ptr(4),
307308
}
308309
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(task, nil).Times(1)
309310
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(&m.PollForDecisionTaskResponse{}, &m.InternalServiceError{}).AnyTimes()
@@ -359,6 +360,174 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() {
359360
s.Equal(2, localActivityCalledCount)
360361
}
361362

363+
func (s *WorkersTestSuite) TestQueryTask_WorkflowCacheEvicted() {
364+
domain := "testDomain"
365+
taskList := "query-task-cache-evicted-tl"
366+
workflowType := "query-task-cache-evicted-workflow"
367+
workflowID := "query-task-cache-evicted-workflow-id"
368+
runID := "query-task-cache-evicted-workflow-run-id"
369+
activityType := "query-task-cache-evicted-activity"
370+
queryType := "state"
371+
doneCh := make(chan struct{})
372+
373+
activityFn := func(ctx context.Context) error {
374+
return nil
375+
}
376+
377+
queryWorkflowFn := func(ctx Context) error {
378+
queryResult := "started"
379+
// setup query handler for query type "state"
380+
if err := SetQueryHandler(ctx, queryType, func(input []byte) (string, error) {
381+
return queryResult, nil
382+
}); err != nil {
383+
return err
384+
}
385+
386+
queryResult = "waiting on timer"
387+
NewTimer(ctx, time.Minute*2).Get(ctx, nil)
388+
389+
queryResult = "waiting on activity"
390+
ctx = WithActivityOptions(ctx, ActivityOptions{
391+
ScheduleToStartTimeout: 10 * time.Second,
392+
StartToCloseTimeout: 10 * time.Second,
393+
})
394+
if err := ExecuteActivity(ctx, activityFn).Get(ctx, nil); err != nil {
395+
return err
396+
}
397+
queryResult = "done"
398+
return nil
399+
}
400+
401+
testEvents := []*m.HistoryEvent{
402+
{
403+
EventId: common.Int64Ptr(1),
404+
EventType: common.EventTypePtr(m.EventTypeWorkflowExecutionStarted),
405+
WorkflowExecutionStartedEventAttributes: &m.WorkflowExecutionStartedEventAttributes{
406+
TaskList: &m.TaskList{Name: &taskList},
407+
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(180),
408+
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
409+
WorkflowType: &m.WorkflowType{Name: common.StringPtr(workflowType)},
410+
},
411+
},
412+
createTestEventDecisionTaskScheduled(2, &m.DecisionTaskScheduledEventAttributes{TaskList: &m.TaskList{Name: &taskList}}),
413+
createTestEventDecisionTaskStarted(3),
414+
createTestEventDecisionTaskCompleted(4, &m.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}),
415+
{
416+
EventId: common.Int64Ptr(5),
417+
EventType: common.EventTypePtr(m.EventTypeTimerStarted),
418+
TimerStartedEventAttributes: &m.TimerStartedEventAttributes{
419+
TimerId: common.StringPtr("0"),
420+
StartToFireTimeoutSeconds: common.Int64Ptr(120),
421+
DecisionTaskCompletedEventId: common.Int64Ptr(4),
422+
},
423+
},
424+
{
425+
EventId: common.Int64Ptr(6),
426+
EventType: common.EventTypePtr(m.EventTypeTimerFired),
427+
TimerFiredEventAttributes: &m.TimerFiredEventAttributes{
428+
TimerId: common.StringPtr("0"),
429+
StartedEventId: common.Int64Ptr(5),
430+
},
431+
},
432+
createTestEventDecisionTaskScheduled(7, &m.DecisionTaskScheduledEventAttributes{TaskList: &m.TaskList{Name: &taskList}}),
433+
createTestEventDecisionTaskStarted(8),
434+
createTestEventDecisionTaskCompleted(9, &m.DecisionTaskCompletedEventAttributes{ScheduledEventId: common.Int64Ptr(2)}),
435+
createTestEventActivityTaskScheduled(10, &m.ActivityTaskScheduledEventAttributes{
436+
ActivityId: common.StringPtr("1"),
437+
ActivityType: &m.ActivityType{
438+
Name: common.StringPtr(activityType),
439+
},
440+
Domain: common.StringPtr(domain),
441+
TaskList: &m.TaskList{
442+
Name: common.StringPtr(taskList),
443+
},
444+
ScheduleToStartTimeoutSeconds: common.Int32Ptr(10),
445+
StartToCloseTimeoutSeconds: common.Int32Ptr(10),
446+
DecisionTaskCompletedEventId: common.Int64Ptr(9),
447+
}),
448+
}
449+
450+
s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), callOptions...).Return(nil, nil).AnyTimes()
451+
task := &m.PollForDecisionTaskResponse{
452+
TaskToken: []byte("test-token"),
453+
WorkflowExecution: &m.WorkflowExecution{
454+
WorkflowId: common.StringPtr(workflowID),
455+
RunId: common.StringPtr(runID),
456+
},
457+
WorkflowType: &m.WorkflowType{
458+
Name: common.StringPtr(workflowType),
459+
},
460+
PreviousStartedEventId: common.Int64Ptr(0),
461+
StartedEventId: common.Int64Ptr(3),
462+
History: &m.History{Events: testEvents[0:3]},
463+
NextPageToken: nil,
464+
NextEventId: common.Int64Ptr(4),
465+
}
466+
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(task, nil).Times(1)
467+
s.service.EXPECT().RespondDecisionTaskCompleted(gomock.Any(), gomock.Any(), callOptions...).DoAndReturn(func(ctx context.Context, request *m.RespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption,
468+
) (success *m.RespondDecisionTaskCompletedResponse, err error) {
469+
s.Equal(1, len(request.Decisions))
470+
s.Equal(m.DecisionTypeStartTimer, request.Decisions[0].GetDecisionType())
471+
return &m.RespondDecisionTaskCompletedResponse{}, nil
472+
}).Times(1)
473+
queryTask := &m.PollForDecisionTaskResponse{
474+
TaskToken: []byte("test-token"),
475+
WorkflowExecution: &m.WorkflowExecution{
476+
WorkflowId: common.StringPtr(workflowID),
477+
RunId: common.StringPtr(runID),
478+
},
479+
WorkflowType: &m.WorkflowType{
480+
Name: common.StringPtr(workflowType),
481+
},
482+
PreviousStartedEventId: common.Int64Ptr(3),
483+
History: &m.History{}, // sticky query, so there's no history
484+
NextPageToken: nil,
485+
NextEventId: common.Int64Ptr(5),
486+
Query: &m.WorkflowQuery{
487+
QueryType: common.StringPtr(queryType),
488+
},
489+
}
490+
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).DoAndReturn(func(ctx context.Context, request *m.PollForDecisionTaskRequest, opts ...yarpc.CallOption,
491+
) (success *m.PollForDecisionTaskResponse, err error) {
492+
getWorkflowCache().Delete(runID) // force remove the workflow state
493+
return queryTask, nil
494+
}).Times(1)
495+
s.service.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), callOptions...).Return(&m.ResetStickyTaskListResponse{}, nil).AnyTimes()
496+
s.service.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any(), callOptions...).Return(&m.GetWorkflowExecutionHistoryResponse{
497+
History: &m.History{Events: testEvents}, // workflow has made progress, return all available events
498+
}, nil).Times(1)
499+
dc := getDefaultDataConverter()
500+
expectedResult, err := dc.ToData("waiting on timer")
501+
s.NoError(err)
502+
s.service.EXPECT().RespondQueryTaskCompleted(gomock.Any(), gomock.Any(), callOptions...).DoAndReturn(func(ctx context.Context, request *m.RespondQueryTaskCompletedRequest, opts ...yarpc.CallOption) error {
503+
s.Equal(m.QueryTaskCompletedTypeCompleted, request.GetCompletedType())
504+
s.Equal(expectedResult, request.GetQueryResult())
505+
close(doneCh)
506+
return nil
507+
}).Times(1)
508+
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(&m.PollForDecisionTaskResponse{}, &m.InternalServiceError{}).AnyTimes()
509+
510+
options := WorkerOptions{
511+
Logger: zap.NewNop(),
512+
DisableActivityWorker: true,
513+
Identity: "test-worker-identity",
514+
DataConverter: dc,
515+
}
516+
worker := newAggregatedWorker(s.service, domain, taskList, options)
517+
worker.RegisterWorkflowWithOptions(
518+
queryWorkflowFn,
519+
RegisterWorkflowOptions{Name: workflowType},
520+
)
521+
worker.RegisterActivityWithOptions(
522+
activityFn,
523+
RegisterActivityOptions{Name: activityType},
524+
)
525+
526+
worker.Start()
527+
<-doneCh
528+
worker.Stop()
529+
}
530+
362531
func (s *WorkersTestSuite) TestMultipleLocalActivities() {
363532
localActivityCalledCount := 0
364533
localActivitySleep := func(duration time.Duration) error {
@@ -441,6 +610,7 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() {
441610
StartedEventId: common.Int64Ptr(3),
442611
History: &m.History{Events: testEvents[0:3]},
443612
NextPageToken: nil,
613+
NextEventId: common.Int64Ptr(4),
444614
}
445615
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(task, nil).Times(1)
446616
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(&m.PollForDecisionTaskResponse{}, &m.InternalServiceError{}).AnyTimes()
@@ -554,6 +724,7 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() {
554724
StartedEventId: common.Int64Ptr(3),
555725
History: &m.History{Events: testEvents[0:3]},
556726
NextPageToken: nil,
727+
NextEventId: common.Int64Ptr(4),
557728
}
558729
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(task, nil).Times(1)
559730
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(&m.PollForDecisionTaskResponse{}, &m.InternalServiceError{}).AnyTimes()
@@ -666,6 +837,7 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() {
666837
StartedEventId: common.Int64Ptr(3),
667838
History: &m.History{Events: testEvents[0:3]},
668839
NextPageToken: nil,
840+
NextEventId: common.Int64Ptr(4),
669841
}
670842
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(task, nil).Times(1)
671843
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(&m.PollForDecisionTaskResponse{}, &m.InternalServiceError{}).AnyTimes()

internal/workflow_replayer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,12 +276,12 @@ func (r *WorkflowReplayer) replayWorkflowHistory(
276276

277277
metricScope := tally.NoopScope
278278
iterator := &historyIteratorImpl{
279-
nextPageToken: task.NextPageToken,
280-
execution: task.WorkflowExecution,
281-
domain: domain,
282-
service: service,
283-
metricsScope: metricScope,
284-
maxEventID: task.GetStartedEventId(),
279+
nextPageToken: task.NextPageToken,
280+
execution: task.WorkflowExecution,
281+
domain: domain,
282+
service: service,
283+
metricsScope: metricScope,
284+
startedEventID: task.GetStartedEventId(),
285285
}
286286
taskHandler := newWorkflowTaskHandler(domain, workerParams, nil, r.registry)
287287
resp, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task, historyIterator: iterator}, nil)

0 commit comments

Comments
 (0)