Skip to content

Commit dd3ec45

Browse files
authored
Improve workflow replayer (#1082)
- Catch WorkflowPanicError instead of PanicError when processing workflow tasks - Check if the last event is ContinueAsNew when replay result is workflow completed - Correctly handle decision & event match check when replaying partial workflow history - Add support for data converter, context propagator, tracer, interceptor in workflow replayer and shadower. - Fetch next history page if possible
1 parent 65903f7 commit dd3ec45

11 files changed

+285
-98
lines changed

internal/internal_decision_state_machine_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -624,13 +624,13 @@ func Test_CancelExternalWorkflowStateMachine_Failed(t *testing.T) {
624624
require.NotNil(t, err)
625625
}
626626

627-
func runAndCatchPanic(f func()) (err *PanicError) {
627+
func runAndCatchPanic(f func()) (err *workflowPanicError) {
628628
// panic handler
629629
defer func() {
630630
if p := recover(); p != nil {
631631
topLine := "runAndCatchPanic [panic]:"
632632
st := getStackTraceRaw(topLine, 7, 0)
633-
err = newPanicError(p, st) // Fail decision on panic
633+
err = newWorkflowPanicError(p, st) // Fail decision on panic
634634
}
635635
}()
636636

internal/internal_task_handlers.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,10 @@ func (eh *history) NextDecisionEvents() (result []*s.HistoryEvent, markers []*s.
268268
return result, markers, checksum, err
269269
}
270270

271+
func (eh *history) HasNextDecisionEvents() bool {
272+
return len(eh.next) != 0 || eh.currentIndex != len(eh.loadedEvents) || eh.hasMoreEvents()
273+
}
274+
271275
func (eh *history) hasMoreEvents() bool {
272276
historyIterator := eh.workflowTask.historyIterator
273277
return historyIterator != nil && historyIterator.HasNextPage()
@@ -468,7 +472,7 @@ func (w *workflowExecutionContextImpl) Lock() {
468472

469473
func (w *workflowExecutionContextImpl) Unlock(err error) {
470474
if err != nil || w.err != nil || w.isWorkflowCompleted || (w.wth.disableStickyExecution && !w.hasPendingLocalActivityWork()) {
471-
// TODO: in case of closed, it asumes the close decision always succeed. need server side change to return
475+
// TODO: in case of closed, it assumes the close decision always succeed. need server side change to return
472476
// error to indicate the close failure case. This should be rear case. For now, always remove the cache, and
473477
// if the close decision failed, the next decision will have to rebuild the state.
474478
if getWorkflowCache().Exist(w.workflowInfo.WorkflowExecution.RunID) {
@@ -818,6 +822,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
818822
var respondEvents []*s.HistoryEvent
819823

820824
skipReplayCheck := w.skipReplayCheck()
825+
isReplayTest := task.GetPreviousStartedEventId() == replayPreviousStartedEventID
821826
// Process events
822827
ProcessEvents:
823828
for {
@@ -888,7 +893,8 @@ ProcessEvents:
888893
}
889894
}
890895
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
891-
if isReplay {
896+
lastDecisionEventsForReplayTest := isReplayTest && !reorderedHistory.HasNextDecisionEvents()
897+
if isReplay && !lastDecisionEventsForReplayTest {
892898
eventDecisions := eventHandler.decisionsHelper.getDecisions(true)
893899
if len(eventDecisions) > 0 && !skipReplayCheck {
894900
replayDecisions = append(replayDecisions, eventDecisions...)
@@ -904,14 +910,14 @@ ProcessEvents:
904910
// the replay of that event will panic on the decision state machine and the workflow will be marked as completed
905911
// with the panic error.
906912
var nonDeterministicErr error
907-
if !skipReplayCheck && !w.isWorkflowCompleted {
913+
if !skipReplayCheck && !w.isWorkflowCompleted || isReplayTest {
908914
// check if decisions from reply matches to the history events
909915
if err := matchReplayWithHistory(replayDecisions, respondEvents); err != nil {
910916
nonDeterministicErr = err
911917
}
912918
}
913919
if nonDeterministicErr == nil && w.err != nil {
914-
if panicErr, ok := w.err.(*PanicError); ok && panicErr.value != nil {
920+
if panicErr, ok := w.err.(*workflowPanicError); ok && panicErr.value != nil {
915921
if _, isStateMachinePanic := panicErr.value.(stateMachineIllegalStatePanic); isStateMachinePanic {
916922
nonDeterministicErr = panicErr
917923
}
@@ -1445,7 +1451,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
14451451
// for query task
14461452
if task.Query != nil {
14471453
queryCompletedRequest := &s.RespondQueryTaskCompletedRequest{TaskToken: task.TaskToken}
1448-
if panicErr, ok := workflowContext.err.(*PanicError); ok {
1454+
if panicErr, ok := workflowContext.err.(*workflowPanicError); ok {
14491455
queryCompletedRequest.CompletedType = common.QueryTaskCompletedTypePtr(s.QueryTaskCompletedTypeFailed)
14501456
queryCompletedRequest.ErrorMessage = common.StringPtr("Workflow panic: " + panicErr.Error())
14511457
return queryCompletedRequest

internal/internal_task_handlers_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ func createTestEventWorkflowExecutionCompleted(eventID int64, attr *s.WorkflowEx
132132
return &s.HistoryEvent{EventId: common.Int64Ptr(eventID), EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionCompleted), WorkflowExecutionCompletedEventAttributes: attr}
133133
}
134134

135+
func createTestEventWorkflowExecutionContinuedAsNew(eventID int64, attr *s.WorkflowExecutionContinuedAsNewEventAttributes) *s.HistoryEvent {
136+
return &s.HistoryEvent{EventId: common.Int64Ptr(eventID), EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionContinuedAsNew), WorkflowExecutionContinuedAsNewEventAttributes: attr}
137+
}
138+
135139
func createTestEventWorkflowExecutionStarted(eventID int64, attr *s.WorkflowExecutionStartedEventAttributes) *s.HistoryEvent {
136140
return &s.HistoryEvent{EventId: common.Int64Ptr(eventID), EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionStarted), WorkflowExecutionStartedEventAttributes: attr}
137141
}
@@ -629,8 +633,7 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() {
629633
previousStartedEventID int64
630634
isResultErr bool
631635
}{
632-
{0, 0, false},
633-
{0, 3, false},
636+
{0, 6, false},
634637
{10, 0, true},
635638
{10, 6, true},
636639
}
@@ -648,7 +651,7 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() {
648651
t.Error(err, "testcase %v failed", i)
649652
t.Nil(request)
650653
t.Contains(err.Error(), "premature end of stream")
651-
t.EqualValues(getWorkflowCache().Size(), 0)
654+
t.EqualValues(getWorkflowCache().Size(), 1)
652655
continue
653656
}
654657

internal/worker.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ type (
184184
// default: false
185185
EnableSessionWorker bool
186186

187-
// Uncomment this option when we support automatic restablish failed sessions.
187+
// Uncomment this option when we support automatic reestablish failed sessions.
188188
// Optional: The identifier of the resource consumed by sessions.
189189
// It's the user's responsibility to ensure there's only one worker using this resourceID.
190190
// For now, if user doesn't specify one, a new uuid will be used as the resourceID.
@@ -207,8 +207,14 @@ type (
207207
Tracer opentracing.Tracer
208208

209209
// Optional: Enable worker for running shadowing workflows to replay existing workflows
210-
// If set to true, worker will run in shadow mode and all other workers (decision, activity, session)
210+
// If set to true:
211+
// 1. Worker will run in shadow mode and all other workers (decision, activity, session)
211212
// will be disabled to prevent them from updating existing workflow states.
213+
// 2. DataConverter, WorkflowInterceptorChainFactories, ContextPropagators, Tracer will be
214+
// used as ReplayOptions and forwarded to the underlying WorkflowReplayer.
215+
// The actual shadower activity worker will not use them.
216+
// 3. TaskList will become Domain-TaskList, to prevent conflict across domains as there's
217+
// only one shadowing domain which is responsible for shadowing workflows for all domains.
212218
// default: false
213219
EnableShadowWorker bool
214220

@@ -236,16 +242,8 @@ const (
236242
// Whereas default does *NOT* reply anything back to the server, fail workflow replies back with a request
237243
// to fail the workflow execution.
238244
NonDeterministicWorkflowPolicyFailWorkflow
239-
240-
// ReplayDomainName is domainName for replay because startEvent doesn't contain it
241-
ReplayDomainName = "ReplayDomain"
242245
)
243246

244-
// IsReplayDomain checks if the domainName is from replay
245-
func IsReplayDomain(dn string) bool {
246-
return ReplayDomainName == dn
247-
}
248-
249247
// NewWorker creates an instance of worker for managing workflow and activity executions.
250248
// service - thrift connection to the cadence server.
251249
// domain - the name of the cadence domain.

0 commit comments

Comments
 (0)