Skip to content

Commit e537cbf

Browse files
Introduce DisableStrictNonDeterminismCheck worker option (#1288)
1 parent e18e46c commit e537cbf

File tree

5 files changed

+60
-11
lines changed

5 files changed

+60
-11
lines changed

internal/internal_task_handlers.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ type (
131131
contextPropagators []ContextPropagator
132132
tracer opentracing.Tracer
133133
workflowInterceptorFactories []WorkflowInterceptorFactory
134+
disableStrictNonDeterminism bool
134135
}
135136

136137
activityProvider func(name string) activity
@@ -388,7 +389,7 @@ func newWorkflowTaskHandler(
388389
registry *registry,
389390
) WorkflowTaskHandler {
390391
ensureRequiredParams(&params)
391-
return &workflowTaskHandlerImpl{
392+
wth := &workflowTaskHandlerImpl{
392393
domain: domain,
393394
logger: params.Logger,
394395
ppMgr: ppMgr,
@@ -402,7 +403,16 @@ func newWorkflowTaskHandler(
402403
contextPropagators: params.ContextPropagators,
403404
tracer: params.Tracer,
404405
workflowInterceptorFactories: params.WorkflowInterceptorChainFactories,
406+
disableStrictNonDeterminism: params.WorkerBugPorts.DisableStrictNonDeterminismCheck,
405407
}
408+
409+
traceLog(func() {
410+
wth.logger.Debug("Workflow task handler is created.",
411+
zap.String(tagDomain, wth.domain),
412+
zap.Bool("disableStrictNonDeterminism", wth.disableStrictNonDeterminism))
413+
})
414+
415+
return wth
406416
}
407417

408418
// TODO: need a better eviction policy based on memory usage

internal/internal_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,7 @@ func newAggregatedWorker(
10411041
var workflowWorker *workflowWorker
10421042
if !wOptions.DisableWorkflowWorker {
10431043
testTags := getTestTags(wOptions.BackgroundActivityContext)
1044-
if testTags != nil && len(testTags) > 0 {
1044+
if len(testTags) > 0 {
10451045
workflowWorker = newWorkflowWorkerWithPressurePoints(
10461046
service,
10471047
domain,

internal/internal_worker_test.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func testActivityMultipleArgsWithStruct(ctx context.Context, i int, s testActivi
196196
}
197197

198198
func (s *internalWorkerTestSuite) TestCreateWorker() {
199-
worker := createWorkerWithThrottle(s.T(), s.service, float64(500.0), WorkerOptions{})
199+
worker := createWorkerWithThrottle(s.T(), s.service, 500, WorkerOptions{})
200200
err := worker.Start()
201201
require.NoError(s.T(), err)
202202
time.Sleep(time.Millisecond * 200)
@@ -227,6 +227,14 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithAutoScaler() {
227227
worker.Stop()
228228
}
229229

230+
func (s *internalWorkerTestSuite) TestCreateWorker_WithStrictNonDeterminism() {
231+
worker := createWorkerWithStrictNonDeterminismDisabled(s.T(), s.service)
232+
err := worker.Start()
233+
require.NoError(s.T(), err)
234+
time.Sleep(time.Millisecond * 200)
235+
worker.Stop()
236+
}
237+
230238
func (s *internalWorkerTestSuite) TestCreateWorker_WithHost() {
231239
worker := createWorkerWithHost(s.T(), s.service)
232240
err := worker.Start()
@@ -348,15 +356,15 @@ func createWorker(
348356
t *testing.T,
349357
service *workflowservicetest.MockClient,
350358
) *aggregatedWorker {
351-
return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{})
359+
return createWorkerWithThrottle(t, service, 0, WorkerOptions{})
352360
}
353361

354362
func createShadowWorker(
355363
t *testing.T,
356364
service *workflowservicetest.MockClient,
357365
shadowOptions *ShadowOptions,
358366
) *aggregatedWorker {
359-
return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{
367+
return createWorkerWithThrottle(t, service, 0, WorkerOptions{
360368
EnableShadowWorker: true,
361369
ShadowOptions: *shadowOptions,
362370
})
@@ -415,21 +423,28 @@ func createWorkerWithDataConverter(
415423
t *testing.T,
416424
service *workflowservicetest.MockClient,
417425
) *aggregatedWorker {
418-
return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{DataConverter: newTestDataConverter()})
426+
return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter()})
419427
}
420428

421429
func createWorkerWithAutoscaler(
422430
t *testing.T,
423431
service *workflowservicetest.MockClient,
424432
) *aggregatedWorker {
425-
return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}})
433+
return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}})
434+
}
435+
436+
func createWorkerWithStrictNonDeterminismDisabled(
437+
t *testing.T,
438+
service *workflowservicetest.MockClient,
439+
) *aggregatedWorker {
440+
return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{DisableStrictNonDeterminismCheck: true}})
426441
}
427442

428443
func createWorkerWithHost(
429444
t *testing.T,
430445
service *workflowservicetest.MockClient,
431446
) *aggregatedWorker {
432-
return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{Host: "test_host"})
447+
return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"})
433448
}
434449

435450
func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) {

internal/worker.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,30 @@ type (
266266
// Optional: Host is just string on the machine running the client
267267
// default: empty string
268268
Host string
269+
270+
// Optional: See WorkerBugPorts for more details
271+
//
272+
// Deprecated: All bugports are always deprecated and may be removed at any time.
273+
WorkerBugPorts WorkerBugPorts
274+
}
275+
276+
// WorkerBugPorts allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily
277+
// emulating old behavior until a fix is deployed.
278+
// By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior.
279+
// Back-ported buggy behavior *may* be available via these flags.
280+
//
281+
// Bugports are always deprecated and may be removed in future versions.
282+
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
283+
// allow cleaning up the additional code complexity that they cause.
284+
// Deprecated: All bugports are always deprecated and may be removed at any time
285+
WorkerBugPorts struct {
286+
// Optional: Disable strict non-determinism checks for workflow.
287+
// There are some non-determinism cases which are missed by original implementation and a fix is on the way.
288+
// The fix will be toggleable by this parameter.
289+
// Default: false, which means strict non-determinism checks are enabled.
290+
//
291+
// Deprecated: All bugports are always deprecated and may be removed at any time
292+
DisableStrictNonDeterminismCheck bool
269293
}
270294
)
271295

internal/workflow.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ type (
481481
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
482482
// allow cleaning up the additional code complexity that they cause.
483483
//
484-
// deprecated
484+
// Deprecated: All bugports are always deprecated and may be removed at any time.
485485
Bugports Bugports
486486
}
487487

@@ -500,7 +500,7 @@ type (
500500
// Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to
501501
// allow cleaning up the additional code complexity that they cause.
502502
//
503-
// deprecated
503+
// DEPRECATED: All bugports are always deprecated and may be removed at any time.
504504
Bugports struct {
505505
// StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4.
506506
//
@@ -530,7 +530,7 @@ type (
530530
//
531531
// Added in 0.18.4, this may be removed in or after v0.19.0, so please migrate off of it ASAP.
532532
//
533-
// deprecated
533+
// Deprecated: All bugports are always deprecated and may be removed at any time.
534534
StartChildWorkflowsOnCanceledContext bool
535535
}
536536
)

0 commit comments

Comments
 (0)