Skip to content

Commit b0dfcd0

Browse files
Introduce EnableStrictNonDeterminismCheck worker option
1 parent e18e46c commit b0dfcd0

File tree

4 files changed

+48
-2
lines changed

4 files changed

+48
-2
lines changed

internal/internal_task_handlers.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ type (
131131
contextPropagators []ContextPropagator
132132
tracer opentracing.Tracer
133133
workflowInterceptorFactories []WorkflowInterceptorFactory
134+
enableStrictNonDeterminism bool
134135
}
135136

136137
activityProvider func(name string) activity
@@ -388,7 +389,7 @@ func newWorkflowTaskHandler(
388389
registry *registry,
389390
) WorkflowTaskHandler {
390391
ensureRequiredParams(&params)
391-
return &workflowTaskHandlerImpl{
392+
wth := &workflowTaskHandlerImpl{
392393
domain: domain,
393394
logger: params.Logger,
394395
ppMgr: ppMgr,
@@ -402,7 +403,16 @@ func newWorkflowTaskHandler(
402403
contextPropagators: params.ContextPropagators,
403404
tracer: params.Tracer,
404405
workflowInterceptorFactories: params.WorkflowInterceptorChainFactories,
406+
enableStrictNonDeterminism: params.WorkerBugPorts.EnableStrictNonDeterminismCheck,
405407
}
408+
409+
traceLog(func() {
410+
wth.logger.Debug("Workflow task handler is created.",
411+
zap.String(tagDomain, wth.domain),
412+
zap.Bool("EnableStrictNonDeterminism", wth.enableStrictNonDeterminism))
413+
})
414+
415+
return wth
406416
}
407417

408418
// TODO: need a better eviction policy based on memory usage

internal/internal_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,7 @@ func newAggregatedWorker(
10411041
var workflowWorker *workflowWorker
10421042
if !wOptions.DisableWorkflowWorker {
10431043
testTags := getTestTags(wOptions.BackgroundActivityContext)
1044-
if testTags != nil && len(testTags) > 0 {
1044+
if len(testTags) > 0 {
10451045
workflowWorker = newWorkflowWorkerWithPressurePoints(
10461046
service,
10471047
domain,

internal/internal_worker_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,14 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithAutoScaler() {
227227
worker.Stop()
228228
}
229229

230+
func (s *internalWorkerTestSuite) TestCreateWorker_WithStrictNonDeterminism() {
231+
worker := createWorkerWithStrictNonDeterminismOption(s.T(), s.service)
232+
err := worker.Start()
233+
require.NoError(s.T(), err)
234+
time.Sleep(time.Millisecond * 200)
235+
worker.Stop()
236+
}
237+
230238
func (s *internalWorkerTestSuite) TestCreateWorker_WithHost() {
231239
worker := createWorkerWithHost(s.T(), s.service)
232240
err := worker.Start()
@@ -425,6 +433,13 @@ func createWorkerWithAutoscaler(
425433
return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}})
426434
}
427435

436+
func createWorkerWithStrictNonDeterminismOption(
437+
t *testing.T,
438+
service *workflowservicetest.MockClient,
439+
) *aggregatedWorker {
440+
return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{WorkerBugPorts: WorkerBugPorts{EnableStrictNonDeterminismCheck: true}})
441+
}
442+
428443
func createWorkerWithHost(
429444
t *testing.T,
430445
service *workflowservicetest.MockClient,

internal/worker.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,27 @@ type (
266266
// Optional: Host is just string on the machine running the client
267267
// default: empty string
268268
Host string
269+
270+
// Optional: See WorkerBugPorts for more details
271+
WorkerBugPorts WorkerBugPorts
272+
}
273+
274+
// WorkerBugPorts allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
275+
// emulating old behavior until a fix is deployed.
276+
// By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior.
277+
// Back-ported buggy behavior *may* be available via these flags.
278+
//
279+
// Bugports are always deprecated and may be removed in future versions.
280+
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
281+
// allow cleaning up the additional code complexity that they cause.
282+
WorkerBugPorts struct {
283+
// Optional: Enable strict non-determinism checks for workflow.
284+
// There are some non-determinism cases which are missed by original implementation and a fix is on the way.
285+
// The fix will be activated by this option which basicakky accuracy of the non-determinism checks.
286+
// Exposing this as bugport for now to avoid breaking existing workflows which are actually non-deterministic but users depend on this.
287+
// Once we identify such cases and notify users, we can enable this by default.
288+
// default: false
289+
EnableStrictNonDeterminismCheck bool
269290
}
270291
)
271292

0 commit comments

Comments
 (0)