Skip to content

Commit c93ab4f

Browse files
authored
Add non-deterministic workflow option (#475)
This commit adds an option to configure how worker execution should handle detection of a non-deterministic workflow. Currently, when non-determinism is detected in execution history, the task handler on the client simply logs the error to console *without* replying back to the cadence server, which leads the server to think the decision task has timed out and thus reschedules a new decsion task. The currently behavior is less than satisfactory for 2 reasons: 1. From server side there's no indication of the non-determinism error. It simply looks like a timeout. 2. From the client side, there's no recover option in that the client will just pull off the new rescheduled decision task and fail again and again. This commit adds a new option to cadence workers to explicit fail the workflow where a mismatched history has been detected. This should yield a more robust debugging experience for customers whose workflows fail due to non-determinism.
1 parent 82c6add commit c93ab4f

File tree

5 files changed

+99
-10
lines changed

5 files changed

+99
-10
lines changed

internal/internal_task_handlers.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,16 @@ type (
9898

9999
// workflowTaskHandlerImpl is the implementation of WorkflowTaskHandler
100100
workflowTaskHandlerImpl struct {
101-
domain string
102-
metricsScope tally.Scope
103-
ppMgr pressurePointMgr
104-
logger *zap.Logger
105-
identity string
106-
enableLoggingInReplay bool
107-
disableStickyExecution bool
108-
hostEnv *hostEnvImpl
109-
laTunnel *localActivityTunnel
101+
domain string
102+
metricsScope tally.Scope
103+
ppMgr pressurePointMgr
104+
logger *zap.Logger
105+
identity string
106+
enableLoggingInReplay bool
107+
disableStickyExecution bool
108+
hostEnv *hostEnvImpl
109+
laTunnel *localActivityTunnel
110+
nonDeterministicWorkflowPolicy NonDeterministicWorkflowPolicy
110111
}
111112

112113
activityProvider func(name string) activity
@@ -307,6 +308,7 @@ func newWorkflowTaskHandler(
307308
enableLoggingInReplay: params.EnableLoggingInReplay,
308309
disableStickyExecution: params.DisableStickyExecution,
309310
hostEnv: hostEnv,
311+
nonDeterministicWorkflowPolicy: params.NonDeterministicWorkflowPolicy,
310312
}
311313
}
312314

@@ -653,7 +655,26 @@ ProcessEvents:
653655
// check if decisions from reply matches to the history events
654656
if err := matchReplayWithHistory(replayDecisions, respondEvents); err != nil {
655657
wth.logger.Error("Replay and history mismatch.", zap.Error(err))
656-
return nil, "", err
658+
659+
// Whether or not we store the error in workflowContext.err makes
660+
// a significant difference, to the point that it affects client's observable
661+
// behavior as far as handling non-deterministic workflows.
662+
//
663+
// If we store it in workflowContext.err, the decision task completion code
664+
// will pick up the error and correctly wrap it in the response request we sent back
665+
// to the server, which in this case will contain a request to fail the workflow.
666+
//
667+
// If we simply return the error, the decision task completion code path will not
668+
// execute at all, therefore, no response is sent back to the server and we will
669+
// look like a decision task time out.
670+
switch wth.nonDeterministicWorkflowPolicy {
671+
case NonDeterministicWorkflowPolicyFailWorkflow:
672+
eventHandler.Complete(nil, NewCustomError("nondeterministic workflow", err.Error()))
673+
case NonDeterministicWorkflowPolicyBlockWorkflow:
674+
return nil, "", err
675+
default:
676+
panic(fmt.Sprintf("unknown mismatched workflow history policy."))
677+
}
657678
}
658679
}
659680

internal/internal_task_handlers_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
364364
TaskList: taskList,
365365
Identity: "test-id-1",
366366
Logger: zap.NewNop(),
367+
NonDeterministicWorkflowPolicy: NonDeterministicWorkflowPolicyBlockWorkflow,
367368
}
368369
taskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
369370
request, _, err := taskHandler.ProcessWorkflowTask(task, nil, false)
@@ -380,6 +381,26 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
380381
t.Nil(request)
381382
t.Contains(err.Error(), "nondeterministic")
382383

384+
// now, create a new task handler with fail nondeterministic workflow policy
385+
// and verify that it handles the mismatching history correctly.
386+
params.NonDeterministicWorkflowPolicy = NonDeterministicWorkflowPolicyFailWorkflow
387+
failOnNondeterminismTaskHandler := newWorkflowTaskHandler(testDomain, params, nil, getHostEnvironment())
388+
task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow")
389+
request, _, err = failOnNondeterminismTaskHandler.ProcessWorkflowTask(task, nil, false)
390+
// When FailWorkflow policy is set, task handler does not return an error,
391+
// because it will indicate non determinism in the request.
392+
t.NoError(err)
393+
// Verify that request is a RespondDecisionTaskCompleteRequest
394+
response, ok := request.(*s.RespondDecisionTaskCompletedRequest)
395+
t.True(ok)
396+
// Verify there's at least 1 decision
397+
// and the last last decision is to fail workflow
398+
// and contains proper justification.(i.e. nondeterminism).
399+
t.True(len(response.Decisions) > 0)
400+
closeDecision := response.Decisions[len(response.Decisions)-1]
401+
t.Equal(*closeDecision.DecisionType, s.DecisionTypeFailWorkflowExecution)
402+
t.Contains(*closeDecision.FailWorkflowExecutionDecisionAttributes.Reason, "nondeterministic")
403+
383404
// now with different package name to activity type
384405
testEvents[4].ActivityTaskScheduledEventAttributes.ActivityType.Name = common.StringPtr("new-package.Greeter_Activity")
385406
task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow")

internal/internal_worker.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ type (
141141
DisableStickyExecution bool
142142

143143
StickyScheduleToStartTimeout time.Duration
144+
145+
// NonDeterministicWorkflowPolicy is used for configuring how client's decision task handler deals with
146+
// mismatched history events (presumably arising from non-deterministic workflow definitions).
147+
NonDeterministicWorkflowPolicy NonDeterministicWorkflowPolicy
144148
}
145149
)
146150

@@ -1017,6 +1021,7 @@ func newAggregatedWorker(
10171021
DisableStickyExecution: wOptions.DisableStickyExecution,
10181022
StickyScheduleToStartTimeout: wOptions.StickyScheduleToStartTimeout,
10191023
TaskListActivitiesPerSecond: wOptions.TaskListActivitiesPerSecond,
1024+
NonDeterministicWorkflowPolicy: wOptions.NonDeterministicWorkflowPolicy,
10201025
}
10211026

10221027
ensureRequiredParams(&workerParams)

internal/worker.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,32 @@ type (
137137
// Optional: sets context for activity. The context can be used to pass any configuration to activity
138138
// like common logger for all activities.
139139
BackgroundActivityContext context.Context
140+
141+
// NonDeterministicWorkflowPolicy is used for configuring how decision worker deals with non-deterministic history events
142+
// (presumably arising from non-deterministic workflow definitions or non-backward compatible workflow definition changes).
143+
NonDeterministicWorkflowPolicy NonDeterministicWorkflowPolicy
140144
}
141145
)
142146

147+
// NonDeterministicWorkflowPolicy is an enum for configuring how client's decision task handler deals with
148+
// mismatched history events (presumably arising from non-deterministic workflow definitions).
149+
type NonDeterministicWorkflowPolicy int
150+
151+
const (
152+
// NonDeterministicWorkflowPolicyBlockWorkflow is the default policy for handling detected non-determinism.
153+
// This option simply logs to console with an error message that non-determinism is detected, but
154+
// does *NOT* reply anything back to the server.
155+
// It is chosen as default for backward compatibility reasons because it preserves the old behavior
156+
// for handling non-determinism that we had before NonDeterministicWorkflowPolicy type was added to
157+
// allow more configurability.
158+
NonDeterministicWorkflowPolicyBlockWorkflow NonDeterministicWorkflowPolicy = iota
159+
// NonDeterministicWorkflowPolicyFailWorkflow behaves exactly the same as Ignore, up until the very
160+
// end of processing a decision task.
161+
// Whereas default does *NOT* reply anything back to the server, fail workflow replies back with a request
162+
// to fail the workflow execution.
163+
NonDeterministicWorkflowPolicyFailWorkflow
164+
)
165+
143166
// NewWorker creates an instance of worker for managing workflow and activity executions.
144167
// service - thrift connection to the cadence server.
145168
// domain - the name of the cadence domain.

worker/worker.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,25 @@ type (
3737

3838
// Options is used to configure a worker instance.
3939
Options = internal.WorkerOptions
40+
41+
// NonDeterministicWorkflowPolicy is an enum for configuring how client's decision task handler deals with
42+
// mismatched history events (presumably arising from non-deterministic workflow definitions).
43+
NonDeterministicWorkflowPolicy = internal.NonDeterministicWorkflowPolicy
44+
)
45+
46+
const (
47+
// NonDeterministicWorkflowPolicyBlockWorkflow is the default policy for handling detected non-determinism.
48+
// This option simply logs to console with an error message that non-determinism is detected, but
49+
// does *NOT* reply anything back to the server.
50+
// It is chosen as default for backward compatibility reasons because it preserves the old behavior
51+
// for handling non-determinism that we had before NonDeterministicWorkflowPolicy type was added to
52+
// allow more configurability.
53+
NonDeterministicWorkflowPolicyBlockWorkflow = internal.NonDeterministicWorkflowPolicyBlockWorkflow
54+
// NonDeterministicWorkflowPolicyFailWorkflow behaves exactly the same as Ignore, up until the very
55+
// end of processing a decision task.
56+
// Whereas default does *NOT* reply anything back to the server, fail workflow replies back with a request
57+
// to fail the workflow execution.
58+
NonDeterministicWorkflowPolicyFailWorkflow = internal.NonDeterministicWorkflowPolicyFailWorkflow
4059
)
4160

4261
// New creates an instance of worker for managing workflow and activity executions.

0 commit comments

Comments
 (0)