Skip to content

Commit 2875316

Browse files
authored
Handle illegal StateMachine transition as Non-Deterministic error (#597)
1 parent 167c85a commit 2875316

File tree

4 files changed

+52
-28
lines changed

4 files changed

+52
-28
lines changed

internal/error.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ type (
113113

114114
// PanicError contains information about panicked workflow/activity.
115115
PanicError struct {
116-
value string
116+
value interface{}
117117
stackTrace string
118118
}
119119

@@ -296,12 +296,12 @@ func (e *CanceledError) Details(d ...interface{}) error {
296296
}
297297

298298
func newPanicError(value interface{}, stackTrace string) *PanicError {
299-
return &PanicError{value: fmt.Sprintf("%v", value), stackTrace: stackTrace}
299+
return &PanicError{value: value, stackTrace: stackTrace}
300300
}
301301

302302
// Error from error interface
303303
func (e *PanicError) Error() string {
304-
return e.value
304+
return fmt.Sprintf("%v", e.value)
305305
}
306306

307307
// StackTrace return stack trace of the panic

internal/internal_decision_state_machine.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ type (
111111
scheduledEventIDToCancellationID map[int64]string
112112
scheduledEventIDToSignalID map[int64]string
113113
}
114+
115+
// panic when decision state machine is in illegal state
116+
stateMachineIllegalStatePanic struct {
117+
message string
118+
}
114119
)
115120

116121
const (
@@ -306,9 +311,17 @@ func (d *decisionStateMachineBase) moveState(newState decisionState, event strin
306311
}
307312
}
308313

314+
func (d stateMachineIllegalStatePanic) String() string {
315+
return d.message
316+
}
317+
318+
func panicIllegalState(message string) {
319+
panic(stateMachineIllegalStatePanic{message: message})
320+
}
321+
309322
func (d *decisionStateMachineBase) failStateTransition(event string) {
310323
// this is when we detect illegal state transition, likely due to ill history sequence or nondeterministic decider code
311-
panic(fmt.Sprintf("invalid state transition: attempt to %v, %v", event, d))
324+
panicIllegalState(fmt.Sprintf("invalid state transition: attempt to %v, %v", event, d))
312325
}
313326

314327
func (d *decisionStateMachineBase) handleDecisionSent() {
@@ -660,7 +673,7 @@ func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine {
660673
if !ok {
661674
panicMsg := fmt.Sprintf("unknown decision %v, possible causes are nondeterministic workflow definition code"+
662675
" or incompatible change in the workflow definition", id)
663-
panic(panicMsg)
676+
panicIllegalState(panicMsg)
664677
}
665678
// Move the last update decision state machine to the back of the list.
666679
// Otherwise decisions (like timer cancellations) can end up out of order.
@@ -671,7 +684,7 @@ func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine {
671684
func (h *decisionsHelper) addDecision(decision decisionStateMachine) {
672685
if _, ok := h.decisions[decision.getID()]; ok {
673686
panicMsg := fmt.Sprintf("adding duplicate decision %v", decision)
674-
panic(panicMsg)
687+
panicIllegalState(panicMsg)
675688
}
676689
element := h.orderedDecisions.PushBack(decision)
677690
h.decisions[decision.getID()] = element
@@ -730,12 +743,12 @@ func (h *decisionsHelper) getActivityID(event *s.HistoryEvent) string {
730743
case s.EventTypeActivityTaskTimedOut:
731744
scheduledEventID = event.ActivityTaskTimedOutEventAttributes.GetScheduledEventId()
732745
default:
733-
panic(fmt.Sprintf("unexpected event type %v", event.GetEventType()))
746+
panicIllegalState(fmt.Sprintf("unexpected event type %v", event.GetEventType()))
734747
}
735748

736749
activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID]
737750
if !ok {
738-
panic(fmt.Sprintf("unable to find activity ID for the event %v", util.HistoryEventToString(event)))
751+
panicIllegalState(fmt.Sprintf("unable to find activity ID for the event %v", util.HistoryEventToString(event)))
739752
}
740753
return activityID
741754
}

internal/internal_task_handlers.go

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,14 @@ ProcessEvents:
719719
}
720720
}
721721

722+
// Non-deterministic error could happen in 2 different places:
723+
// 1) the replay decisions does not match to history events. This is usually due to non backwards compatible code
724+
// change to decider logic. For example, change calling one activity to a different activity.
725+
// 2) the decision state machine is trying to make illegal state transition while replay a history event (like
726+
// activity task completed), but the corresponding decider code that start the event has been removed. In that case
727+
// the replay of that event will panic on the decision state machine and the workflow will be marked as completed
728+
// with the panic error.
729+
var nonDeterministicErr error
722730
if !skipReplayCheck && !w.isWorkflowCompleted {
723731
// check if decisions from reply matches to the history events
724732
if err := matchReplayWithHistory(replayDecisions, respondEvents); err != nil {
@@ -728,29 +736,32 @@ ProcessEvents:
728736
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
729737
zap.String(tagRunID, task.WorkflowExecution.GetRunId()),
730738
zap.Error(err))
731-
732-
// Whether or not we store the error in workflowContext.err makes
733-
// a significant difference, to the point that it affects client's observable
734-
// behavior as far as handling non-deterministic workflows.
735-
//
736-
// If we store it in workflowContext.err, the decision task completion code
737-
// will pick up the error and correctly wrap it in the response request we sent back
738-
// to the server, which in this case will contain a request to fail the workflow.
739-
//
740-
// If we simply return the error, the decision task completion code path will not
741-
// execute at all, therefore, no response is sent back to the server and we will
742-
// look like a decision task time out.
743-
switch w.wth.nonDeterministicWorkflowPolicy {
744-
case NonDeterministicWorkflowPolicyFailWorkflow:
745-
eventHandler.Complete(nil, NewCustomError("nondeterministic workflow", err.Error()))
746-
case NonDeterministicWorkflowPolicyBlockWorkflow:
747-
return nil, err
748-
default:
749-
panic(fmt.Sprintf("unknown mismatched workflow history policy."))
739+
nonDeterministicErr = err
740+
}
741+
}
742+
if nonDeterministicErr == nil && w.err != nil {
743+
if panicErr, ok := w.err.(*PanicError); ok && panicErr.value != nil {
744+
if _, isStateMachinePanic := panicErr.value.(stateMachineIllegalStatePanic); isStateMachinePanic {
745+
nonDeterministicErr = panicErr
750746
}
751747
}
752748
}
753749

750+
if nonDeterministicErr != nil {
751+
switch w.wth.nonDeterministicWorkflowPolicy {
752+
case NonDeterministicWorkflowPolicyFailWorkflow:
753+
// complete workflow with custom error will fail the workflow
754+
eventHandler.Complete(nil, NewCustomError("NonDeterministicWorkflowPolicyFailWorkflow", nonDeterministicErr.Error()))
755+
case NonDeterministicWorkflowPolicyBlockWorkflow:
756+
// return error here will be convert to DecisionTaskFailed for the first time, and ignored for subsequent
757+
// attempts which will cause DecisionTaskTimeout and server will retry forever until issue got fixed or
758+
// workflow timeout.
759+
return nil, nonDeterministicErr
760+
default:
761+
panic(fmt.Sprintf("unknown mismatched workflow history policy."))
762+
}
763+
}
764+
754765
return w.CompleteDecisionTask(workflowTask, true), nil
755766
}
756767

internal/internal_task_handlers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
463463
t.True(len(response.Decisions) > 0)
464464
closeDecision := response.Decisions[len(response.Decisions)-1]
465465
t.Equal(*closeDecision.DecisionType, s.DecisionTypeFailWorkflowExecution)
466-
t.Contains(*closeDecision.FailWorkflowExecutionDecisionAttributes.Reason, "nondeterministic")
466+
t.Contains(*closeDecision.FailWorkflowExecutionDecisionAttributes.Reason, "NonDeterministicWorkflowPolicyFailWorkflow")
467467

468468
// now with different package name to activity type
469469
testEvents[4].ActivityTaskScheduledEventAttributes.ActivityType.Name = common.StringPtr("new-package.Greeter_Activity")

0 commit comments

Comments
 (0)