diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index ed7c448d5..1c53a05d2 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -131,6 +131,7 @@ type ( contextPropagators []ContextPropagator tracer opentracing.Tracer workflowInterceptorFactories []WorkflowInterceptorFactory + disableStrictNonDeterminism bool } activityProvider func(name string) activity @@ -388,7 +389,7 @@ func newWorkflowTaskHandler( registry *registry, ) WorkflowTaskHandler { ensureRequiredParams(¶ms) - return &workflowTaskHandlerImpl{ + wth := &workflowTaskHandlerImpl{ domain: domain, logger: params.Logger, ppMgr: ppMgr, @@ -402,7 +403,16 @@ func newWorkflowTaskHandler( contextPropagators: params.ContextPropagators, tracer: params.Tracer, workflowInterceptorFactories: params.WorkflowInterceptorChainFactories, + disableStrictNonDeterminism: params.WorkerBugPorts.DisableStrictNonDeterminismCheck, } + + traceLog(func() { + wth.logger.Debug("Workflow task handler is created.", + zap.String(tagDomain, wth.domain), + zap.Bool("disableStrictNonDeterminism", wth.disableStrictNonDeterminism)) + }) + + return wth } // TODO: need a better eviction policy based on memory usage diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 04dbd1550..75beddda3 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1041,7 +1041,7 @@ func newAggregatedWorker( var workflowWorker *workflowWorker if !wOptions.DisableWorkflowWorker { testTags := getTestTags(wOptions.BackgroundActivityContext) - if testTags != nil && len(testTags) > 0 { + if len(testTags) > 0 { workflowWorker = newWorkflowWorkerWithPressurePoints( service, domain, diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 72c52a994..764efd111 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -196,7 +196,7 @@ func testActivityMultipleArgsWithStruct(ctx context.Context, i int, s testActivi } func (s *internalWorkerTestSuite) TestCreateWorker() { - worker := createWorkerWithThrottle(s.T(), s.service, float64(500.0), WorkerOptions{}) + worker := createWorkerWithThrottle(s.T(), s.service, 500, WorkerOptions{}) err := worker.Start() require.NoError(s.T(), err) time.Sleep(time.Millisecond * 200) @@ -227,6 +227,14 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithAutoScaler() { worker.Stop() } +func (s *internalWorkerTestSuite) TestCreateWorker_WithStrictNonDeterminism() { + worker := createWorkerWithStrictNonDeterminismDisabled(s.T(), s.service) + err := worker.Start() + require.NoError(s.T(), err) + time.Sleep(time.Millisecond * 200) + worker.Stop() +} + func (s *internalWorkerTestSuite) TestCreateWorker_WithHost() { worker := createWorkerWithHost(s.T(), s.service) err := worker.Start() @@ -348,7 +356,7 @@ func createWorker( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{}) } func createShadowWorker( @@ -356,7 +364,7 @@ func createShadowWorker( service *workflowservicetest.MockClient, shadowOptions *ShadowOptions, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{ + return createWorkerWithThrottle(t, service, 0, WorkerOptions{ EnableShadowWorker: true, ShadowOptions: *shadowOptions, }) @@ -415,21 +423,28 @@ func createWorkerWithDataConverter( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0.0), WorkerOptions{DataConverter: newTestDataConverter()}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter()}) } func createWorkerWithAutoscaler( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}}) +} + +func createWorkerWithStrictNonDeterminismDisabled( + t *testing.T, + service *workflowservicetest.MockClient, +) *aggregatedWorker { + return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{DisableStrictNonDeterminismCheck: true}}) } func createWorkerWithHost( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, float64(0), WorkerOptions{Host: "test_host"}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"}) } func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) { diff --git a/internal/worker.go b/internal/worker.go index c5225bed2..abef02a5b 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -266,6 +266,30 @@ type ( // Optional: Host is just string on the machine running the client // default: empty string Host string + + // Optional: See WorkerBugPorts for more details + // + // Deprecated: All bugports are always deprecated and may be removed at any time. + WorkerBugPorts WorkerBugPorts + } + + // WorkerBugPorts allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily + // emulating old behavior until a fix is deployed. + // By default, bugs (especially rarely-occurring ones) are fixed and all users are opted into the new behavior. + // Back-ported buggy behavior *may* be available via these flags. + // + // Bugports are always deprecated and may be removed in future versions. + // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to + // allow cleaning up the additional code complexity that they cause. + // Deprecated: All bugports are always deprecated and may be removed at any time + WorkerBugPorts struct { + // Optional: Disable strict non-determinism checks for workflow. + // There are some non-determinism cases which are missed by original implementation and a fix is on the way. + // The fix will be toggleable by this parameter. + // Default: false, which means strict non-determinism checks are enabled. + // + // Deprecated: All bugports are always deprecated and may be removed at any time + DisableStrictNonDeterminismCheck bool } ) diff --git a/internal/workflow.go b/internal/workflow.go index b762c24bb..b39a7cc59 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -481,7 +481,7 @@ type ( // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to // allow cleaning up the additional code complexity that they cause. // - // deprecated + // Deprecated: All bugports are always deprecated and may be removed at any time. Bugports Bugports } @@ -500,7 +500,7 @@ type ( // Generally speaking they will *likely* remain in place for one minor version, and then they may be removed to // allow cleaning up the additional code complexity that they cause. // - // deprecated + // DEPRECATED: All bugports are always deprecated and may be removed at any time. Bugports struct { // StartChildWorkflowsOnCanceledContext allows emulating older, buggy behavior that existed prior to v0.18.4. // @@ -530,7 +530,7 @@ type ( // // Added in 0.18.4, this may be removed in or after v0.19.0, so please migrate off of it ASAP. // - // deprecated + // Deprecated: All bugports are always deprecated and may be removed at any time. StartChildWorkflowsOnCanceledContext bool } )