Skip to content

Commit 4cf8503

Browse files
Honor non-determinism fail workflow policy (#1287)
What changed? Users can specify NonDeterministicWorkflowPolicy in worker options. If the FailWorkflow policy is chosen the workflow is expected to terminate as soon as it ends up with a nondeterministic state (e.g. activity order changed). However this wasn't honored for a category of nondeterminism cases. This PR addresses it and workflows fail once any nondeterminism scenario is encountered. There are two categories of nondeterminism cases in terms of how they get detected by client library: 1. Issue bubbles up as illegal state panic to the task handler. Most actual prod cases. 2. Issue is caught when comparing replay decisions with history. Replay test scenarios and a subset of prod cases. FailWorkflow policy was honored for 2 but not for 1. Why? To make NonDeterministicWorkflowPolicy feature correct/complete. How did you test it? Added an integration test to simulate this scenario.
1 parent ae76f5f commit 4cf8503

File tree

8 files changed

+171
-84
lines changed

8 files changed

+171
-84
lines changed

internal/common/metrics/scope.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func (s *replayAwareScope) Capabilities() tally.Capabilities {
195195
}
196196

197197
// GetTaggedScope return a scope with one or multiple tags,
198-
// input should be key value pairs like: GetTaggedScope(scope, tag1, val1, tag2, val2).
198+
// input should be key value pairs like: GetTaggedScope(tag1, val1, tag2, val2).
199199
func (ts *TaggedScope) GetTaggedScope(keyValueinPairs ...string) tally.Scope {
200200
if len(keyValueinPairs)%2 != 0 {
201201
panic("GetTaggedScope key value are not in pairs")

internal/common/metrics/service_wrapper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (w *workflowServiceMetricsWrapper) getOperationScope(scopeName string) *ope
116116
}
117117

118118
func (s *operationScope) handleError(err error) {
119-
s.scope.Timer(CadenceLatency).Record(time.Now().Sub(s.startTime))
119+
s.scope.Timer(CadenceLatency).Record(time.Since(s.startTime))
120120
if err != nil {
121121
switch err.(type) {
122122
case *shared.EntityNotExistsError,

internal/internal_logging_tags.go

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,34 @@
2121
package internal
2222

2323
const (
24-
tagActivityID = "ActivityID"
25-
tagActivityType = "ActivityType"
26-
tagDomain = "Domain"
27-
tagEventID = "EventID"
28-
tagEventType = "EventType"
29-
tagRunID = "RunID"
30-
tagTaskList = "TaskList"
31-
tagTimerID = "TimerID"
32-
tagWorkflowID = "WorkflowID"
33-
tagWorkflowType = "WorkflowType"
34-
tagWorkerID = "WorkerID"
35-
tagWorkerType = "WorkerType"
36-
tagSideEffectID = "SideEffectID"
37-
tagChildWorkflowID = "ChildWorkflowID"
38-
tagLocalActivityType = "LocalActivityType"
39-
tagLocalActivityID = "LocalActivityID"
40-
tagQueryType = "QueryType"
41-
tagVisibilityQuery = "VisibilityQuery"
42-
tagPanicError = "PanicError"
43-
tagPanicStack = "PanicStack"
44-
causeTag = "pollerrorcause"
45-
tagworkflowruntimelength = "workflowruntimelength"
24+
tagActivityID = "ActivityID"
25+
tagActivityType = "ActivityType"
26+
tagDomain = "Domain"
27+
tagEventID = "EventID"
28+
tagEventType = "EventType"
29+
tagRunID = "RunID"
30+
tagTaskList = "TaskList"
31+
tagTimerID = "TimerID"
32+
tagWorkflowID = "WorkflowID"
33+
tagWorkflowType = "WorkflowType"
34+
tagWorkerID = "WorkerID"
35+
tagWorkerType = "WorkerType"
36+
tagSideEffectID = "SideEffectID"
37+
tagChildWorkflowID = "ChildWorkflowID"
38+
tagLocalActivityType = "LocalActivityType"
39+
tagLocalActivityID = "LocalActivityID"
40+
tagQueryType = "QueryType"
41+
tagVisibilityQuery = "VisibilityQuery"
42+
tagPanicError = "PanicError"
43+
tagPanicStack = "PanicStack"
44+
causeTag = "pollerrorcause"
45+
tagWorkflowRuntimeLength = "workflowruntimelength"
46+
tagNonDeterminismDetectionType = "NonDeterminismDetectionType"
47+
)
48+
49+
type nonDeterminismDetectionType string
50+
51+
const (
52+
nonDeterminismDetectionTypeIllegalStatePanic nonDeterminismDetectionType = "illegalstatepanic"
53+
nonDeterminismDetectionTypeReplayComparison nonDeterminismDetectionType = "replaycomparison"
4654
)

internal/internal_task_handlers.go

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"time"
3636

3737
"github.com/opentracing/opentracing-go"
38+
"github.com/uber-go/tally"
3839
"go.uber.org/zap"
3940

4041
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
@@ -674,12 +675,12 @@ func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(
674675
historyIterator HistoryIterator,
675676
) (workflowContext *workflowExecutionContextImpl, err error) {
676677
metricsScope := wth.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName())
677-
defer func() {
678+
defer func(metricsScope tally.Scope) {
678679
if err == nil && workflowContext != nil && workflowContext.laTunnel == nil {
679680
workflowContext.laTunnel = wth.laTunnel
680681
}
681682
metricsScope.Gauge(metrics.StickyCacheSize).Update(float64(getWorkflowCache().Size()))
682-
}()
683+
}(metricsScope)
683684

684685
runID := task.WorkflowExecution.GetRunId()
685686

@@ -694,14 +695,13 @@ func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(
694695
if workflowContext != nil {
695696
workflowContext.Lock()
696697
// add new tag on metrics scope with workflow runtime length category
697-
executionRuntimeType := workflowCategorizedByTimeout(workflowContext.workflowInfo.ExecutionStartToCloseTimeoutSeconds)
698-
metricsScope = metricsScope.Tagged(map[string]string{tagworkflowruntimelength: executionRuntimeType})
698+
scope := metricsScope.Tagged(map[string]string{tagWorkflowRuntimeLength: workflowCategorizedByTimeout(workflowContext)})
699699
if task.Query != nil && !isFullHistory {
700700
// query task and we have a valid cached state
701-
metricsScope.Counter(metrics.StickyCacheHit).Inc(1)
701+
scope.Counter(metrics.StickyCacheHit).Inc(1)
702702
} else if history.Events[0].GetEventId() == workflowContext.previousStartedEventID+1 {
703703
// non query task and we have a valid cached state
704-
metricsScope.Counter(metrics.StickyCacheHit).Inc(1)
704+
scope.Counter(metrics.StickyCacheHit).Inc(1)
705705
} else {
706706
// non query task and cached state is missing events, we need to discard the cached state and rebuild one.
707707
workflowContext.ResetIfStale(task, historyIterator)
@@ -949,39 +949,31 @@ ProcessEvents:
949949
// the replay of that event will panic on the decision state machine and the workflow will be marked as completed
950950
// with the panic error.
951951
var nonDeterministicErr error
952+
var nonDeterminismType nonDeterminismDetectionType
952953
if !skipReplayCheck && !w.isWorkflowCompleted || isReplayTest {
953954
// check if decisions from reply matches to the history events
954955
if err := matchReplayWithHistory(w.workflowInfo, replayDecisions, respondEvents); err != nil {
955956
nonDeterministicErr = err
957+
nonDeterminismType = nonDeterminismDetectionTypeReplayComparison
956958
}
957-
}
958-
if nonDeterministicErr == nil && w.err != nil {
959-
if panicErr, ok := w.err.(*workflowPanicError); ok && panicErr.value != nil {
960-
if _, isStateMachinePanic := panicErr.value.(stateMachineIllegalStatePanic); isStateMachinePanic {
961-
if isReplayTest {
962-
// NOTE: we should do this regardless if it's in replay test or not
963-
// but since previously we checked the wrong error type, it may break existing customers workflow
964-
// the issue is that we change the error type and that we change the error message, the customers
965-
// are checking the error string - we plan to wrap all errors to avoid this issue in client v2
966-
nonDeterministicErr = panicErr
967-
} else {
968-
// Since we know there is an error, we do the replay check to give more context in the log
969-
replayErr := matchReplayWithHistory(w.workflowInfo, replayDecisions, respondEvents)
970-
w.wth.logger.Error("Ignored workflow panic error",
971-
zap.String(tagWorkflowType, task.WorkflowType.GetName()),
972-
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
973-
zap.String(tagRunID, task.WorkflowExecution.GetRunId()),
974-
zap.Error(nonDeterministicErr),
975-
zap.NamedError("ReplayError", replayErr),
976-
)
977-
}
978-
}
979-
}
959+
} else if panicErr, ok := w.getWorkflowPanicIfIllegaleStatePanic(); ok {
960+
// This is a nondeterministic execution which ended up panicking
961+
nonDeterministicErr = panicErr
962+
nonDeterminismType = nonDeterminismDetectionTypeIllegalStatePanic
963+
// Since we know there is an error, we do the replay check to give more context in the log
964+
replayErr := matchReplayWithHistory(w.workflowInfo, replayDecisions, respondEvents)
965+
w.wth.logger.Error("Illegal state caused panic",
966+
zap.String(tagWorkflowType, task.WorkflowType.GetName()),
967+
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
968+
zap.String(tagRunID, task.WorkflowExecution.GetRunId()),
969+
zap.Error(nonDeterministicErr),
970+
zap.NamedError("ReplayError", replayErr),
971+
)
980972
}
981973

982974
if nonDeterministicErr != nil {
983-
984-
w.wth.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName()).Counter(metrics.NonDeterministicError).Inc(1)
975+
scope := w.wth.metricsScope.GetTaggedScope(tagWorkflowType, task.WorkflowType.GetName(), tagNonDeterminismDetectionType, string(nonDeterminismType))
976+
scope.Counter(metrics.NonDeterministicError).Inc(1)
985977
w.wth.logger.Error("non-deterministic-error",
986978
zap.String(tagWorkflowType, task.WorkflowType.GetName()),
987979
zap.String(tagWorkflowID, task.WorkflowExecution.GetWorkflowId()),
@@ -998,7 +990,7 @@ ProcessEvents:
998990
// workflow timeout.
999991
return nil, nonDeterministicErr
1000992
default:
1001-
panic(fmt.Sprintf("unknown mismatched workflow history policy."))
993+
panic("unknown mismatched workflow history policy.")
1002994
}
1003995
}
1004996

@@ -1205,6 +1197,24 @@ func (w *workflowExecutionContextImpl) GetDecisionTimeout() time.Duration {
12051197
return time.Second * time.Duration(w.workflowInfo.TaskStartToCloseTimeoutSeconds)
12061198
}
12071199

1200+
func (w *workflowExecutionContextImpl) getWorkflowPanicIfIllegaleStatePanic() (*workflowPanicError, bool) {
1201+
if !w.isWorkflowCompleted || w.err == nil {
1202+
return nil, false
1203+
}
1204+
1205+
panicErr, ok := w.err.(*workflowPanicError)
1206+
if !ok || panicErr.value == nil {
1207+
return nil, false
1208+
}
1209+
1210+
_, ok = panicErr.value.(stateMachineIllegalStatePanic)
1211+
if !ok {
1212+
return nil, false
1213+
}
1214+
1215+
return panicErr, true
1216+
}
1217+
12081218
func (wth *workflowTaskHandlerImpl) completeWorkflow(
12091219
eventHandler *workflowExecutionEventHandlerImpl,
12101220
task *s.PollForDecisionTaskResponse,
@@ -1312,7 +1322,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
13121322

13131323
if closeDecision != nil {
13141324
decisions = append(decisions, closeDecision)
1315-
elapsed := time.Now().Sub(workflowContext.workflowStartTime)
1325+
elapsed := time.Since(workflowContext.workflowStartTime)
13161326
metricsScope.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed)
13171327
forceNewDecision = false
13181328
}
@@ -1845,7 +1855,8 @@ func traceLog(fn func()) {
18451855
}
18461856
}
18471857

1848-
func workflowCategorizedByTimeout(executionTimeout int32) string {
1858+
func workflowCategorizedByTimeout(wfContext *workflowExecutionContextImpl) string {
1859+
executionTimeout := wfContext.workflowInfo.ExecutionStartToCloseTimeoutSeconds
18491860
if executionTimeout <= defaultInstantLivedWorkflowTimeoutUpperLimitInSec {
18501861
return "instant"
18511862
} else if executionTimeout <= defaultShortLivedWorkflowTimeoutUpperLimitInSec {

internal/internal_task_handlers_test.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -869,28 +869,20 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicLogNonexistingI
869869
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, t.registry)
870870
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
871871

872-
t.NotNil(request)
873-
response := request.(*s.RespondDecisionTaskFailedRequest)
874-
875-
// NOTE: we might acctually want to return an error
876-
// but since previously we checked the wrong error type, it may break existing customers workflow
877-
// The issue is that we change the error type and that we change the error message, the customers
878-
// are checking the error string - we plan to wrap all errors to avoid this issue in client v2
879-
t.NoError(err)
880-
t.NotNil(response)
872+
t.Nil(request)
873+
t.ErrorContains(err, "nondeterministic workflow")
881874

882875
// Check that the error was logged
883-
ignoredWorkflowLogs := logs.FilterMessage("Ignored workflow panic error")
884-
require.Len(t.T(), ignoredWorkflowLogs.All(), 1)
876+
illegalPanicLogs := logs.FilterMessage("Illegal state caused panic")
877+
require.Len(t.T(), illegalPanicLogs.All(), 1)
885878

886-
replayErrorField := findLogField(ignoredWorkflowLogs.All()[0], "ReplayError")
879+
replayErrorField := findLogField(illegalPanicLogs.All()[0], "ReplayError")
887880
require.NotNil(t.T(), replayErrorField)
888881
require.Equal(t.T(), zapcore.ErrorType, replayErrorField.Type)
889882
require.ErrorContains(t.T(), replayErrorField.Interface.(error),
890883
"nondeterministic workflow: "+
891884
"history event is ActivityTaskScheduled: (ActivityId:NotAnActivityID, ActivityType:(Name:pkg.Greeter_Activity), TaskList:(Name:taskList), Input:[]), "+
892885
"replay decision is ScheduleActivityTask: (ActivityId:0, ActivityType:(Name:Greeter_Activity), TaskList:(Name:taskList)")
893-
894886
}
895887

896888
func (t *TaskHandlersTestSuite) TestWorkflowTask_WorkflowReturnsPanicError() {

test/activity_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,7 @@ func (a *Activities) invoked() []string {
105105
a.mu.Lock()
106106
defer a.mu.Unlock()
107107
result := make([]string, len(a.invocations))
108-
for i := range a.invocations {
109-
result[i] = a.invocations[i]
110-
}
108+
copy(result, a.invocations)
111109
return result
112110
}
113111

test/integration_test.go

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package test
2323

2424
import (
2525
"context"
26+
"errors"
2627
"fmt"
2728
"net"
2829
"strings"
@@ -148,18 +149,24 @@ func (ts *IntegrationTestSuite) SetupTest() {
148149
ts.seq++
149150
ts.activities.clearInvoked()
150151
ts.taskListName = fmt.Sprintf("tl-%v", ts.seq)
151-
ts.worker = worker.New(ts.rpcClient.Interface, domainName, ts.taskListName, worker.Options{
152-
DisableStickyExecution: ts.config.IsStickyOff,
153-
Logger: zaptest.NewLogger(ts.T()),
154-
ContextPropagators: []workflow.ContextPropagator{NewStringMapPropagator([]string{testContextKey})},
155-
})
156152
ts.tracer = newtracingInterceptorFactory()
153+
}
154+
155+
func (ts *IntegrationTestSuite) BeforeTest(suiteName, testName string) {
157156
options := worker.Options{
158157
DisableStickyExecution: ts.config.IsStickyOff,
159158
Logger: zaptest.NewLogger(ts.T()),
160159
WorkflowInterceptorChainFactories: []interceptors.WorkflowInterceptorFactory{ts.tracer},
161160
ContextPropagators: []workflow.ContextPropagator{NewStringMapPropagator([]string{testContextKey})},
162161
}
162+
163+
if testName == "TestNonDeterministicWorkflowQuery" || testName == "TestNonDeterministicWorkflowFailPolicy" {
164+
options.NonDeterministicWorkflowPolicy = worker.NonDeterministicWorkflowPolicyFailWorkflow
165+
166+
// disable sticky executon so each workflow yield will require rerunning it from beginning
167+
options.DisableStickyExecution = true
168+
}
169+
163170
ts.worker = worker.New(ts.rpcClient.Interface, domainName, ts.taskListName, options)
164171
ts.registerWorkflowsAndActivities(ts.worker)
165172
ts.Nil(ts.worker.Start())
@@ -270,7 +277,7 @@ func (ts *IntegrationTestSuite) TestStackTraceQuery() {
270277
ts.NoError(err)
271278
ts.NotNil(value)
272279
var trace string
273-
ts.Nil(value.Get(&trace))
280+
ts.NoError(value.Get(&trace))
274281
ts.True(strings.Contains(trace, "go.uber.org/cadence/test.(*Workflows).Basic"))
275282
}
276283

@@ -303,7 +310,7 @@ func (ts *IntegrationTestSuite) TestConsistentQuery() {
303310
ts.NotNil(value.QueryResult)
304311
ts.Nil(value.QueryRejected)
305312
var queryResult string
306-
ts.Nil(value.QueryResult.Get(&queryResult))
313+
ts.NoError(value.QueryResult.Get(&queryResult))
307314
ts.Equal("signal-input", queryResult)
308315
}
309316

@@ -428,7 +435,6 @@ func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyTerminate() {
428435
resp, err := ts.libClient.DescribeWorkflowExecution(context.Background(), childWorkflowID, "")
429436
ts.NoError(err)
430437
ts.True(resp.WorkflowExecutionInfo.GetCloseTime() > 0)
431-
432438
}
433439

434440
func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyAbandon() {
@@ -437,7 +443,7 @@ func (ts *IntegrationTestSuite) TestChildWFWithParentClosePolicyAbandon() {
437443
ts.NoError(err)
438444
resp, err := ts.libClient.DescribeWorkflowExecution(context.Background(), childWorkflowID, "")
439445
ts.NoError(err)
440-
ts.True(resp.WorkflowExecutionInfo.GetCloseTime() == 0)
446+
ts.Zerof(resp.WorkflowExecutionInfo.GetCloseTime(), "Expected close time to be zero, got %d. Describe response: %#v", resp.WorkflowExecutionInfo.GetCloseTime(), resp)
441447
}
442448

443449
func (ts *IntegrationTestSuite) TestChildWFCancel() {
@@ -512,6 +518,33 @@ func (ts *IntegrationTestSuite) TestDomainUpdate() {
512518
ts.Equal(description, *domain.DomainInfo.Description)
513519
}
514520

521+
func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowFailPolicy() {
522+
err := ts.executeWorkflow("test-nondeterminism-failpolicy", ts.workflows.NonDeterminismSimulatorWorkflow, nil)
523+
var customErr *internal.CustomError
524+
ok := errors.As(err, &customErr)
525+
ts.Truef(ok, "expected CustomError but got %T", err)
526+
ts.Equal("NonDeterministicWorkflowPolicyFailWorkflow", customErr.Reason())
527+
}
528+
529+
func (ts *IntegrationTestSuite) TestNonDeterministicWorkflowQuery() {
530+
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
531+
defer cancel()
532+
run, err := ts.libClient.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-nondeterministic-query"), ts.workflows.NonDeterminismSimulatorWorkflow)
533+
ts.Nil(err)
534+
err = run.Get(ctx, nil)
535+
var customErr *internal.CustomError
536+
ok := errors.As(err, &customErr)
537+
ts.Truef(ok, "expected CustomError but got %T", err)
538+
ts.Equal("NonDeterministicWorkflowPolicyFailWorkflow", customErr.Reason())
539+
540+
// query failed workflow should still work
541+
value, err := ts.libClient.QueryWorkflow(ctx, "test-nondeterministic-query", run.GetRunID(), "__stack_trace")
542+
ts.NoError(err)
543+
ts.NotNil(value)
544+
var trace string
545+
ts.NoError(value.Get(&trace))
546+
}
547+
515548
func (ts *IntegrationTestSuite) registerDomain() {
516549
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
517550
defer cancel()

0 commit comments

Comments
 (0)