diff --git a/chasm/engine.go b/chasm/engine.go index f1348b03a50..62f77faffca 100644 --- a/chasm/engine.go +++ b/chasm/engine.go @@ -55,9 +55,8 @@ type BusinessIDConflictPolicy int const ( BusinessIDConflictPolicyFail BusinessIDConflictPolicy = iota - BusinessIDConflictPolicyTermiateExisting - // TODO: Do we want to support UseExisting conflict policy? - // BusinessIDConflictPolicyUseExisting + BusinessIDConflictPolicyTerminateExisting + BusinessIDConflictPolicyUseExisting ) type TransitionOptions struct { @@ -82,7 +81,9 @@ func WithSpeculative() TransitionOption { } } -// this only applies to NewEntity and UpdateWithNewEntity +// WithBusinessIDPolicy sets the businessID reuse and conflict policy +// used in the transition when creating a new entity. +// This option only applies to NewEntity() and UpdateWithNewEntity(). func WithBusinessIDPolicy( reusePolicy BusinessIDReusePolicy, conflictPolicy BusinessIDConflictPolicy, @@ -93,7 +94,8 @@ func WithBusinessIDPolicy( } } -// this only applies to NewEntity and UpdateWithNewEntity +// WithRequestID sets the requestID used when creating a new entity. +// This option only applies to NewEntity() and UpdateWithNewEntity(). func WithRequestID( requestID string, ) TransitionOption { diff --git a/chasm/errors.go b/chasm/errors.go new file mode 100644 index 00000000000..f3eace657ba --- /dev/null +++ b/chasm/errors.go @@ -0,0 +1,21 @@ +package chasm + +type ExecutionAlreadyStartedError struct { + Message string + CurrentRequestID string + CurrentRunID string +} + +func NewExecutionAlreadyStartedErr( + message, currentRequestID, currentRunID string, +) *ExecutionAlreadyStartedError { + return &ExecutionAlreadyStartedError{ + Message: message, + CurrentRequestID: currentRequestID, + CurrentRunID: currentRunID, + } +} + +func (e *ExecutionAlreadyStartedError) Error() string { + return e.Message +} diff --git a/chasm/lib/tests/handler.go b/chasm/lib/tests/handler.go index 1ae04d9905b..d20e4239450 100644 --- a/chasm/lib/tests/handler.go +++ b/chasm/lib/tests/handler.go @@ -12,8 +12,10 @@ import ( type ( NewPayloadStoreRequest struct { - NamespaceID namespace.ID - StoreID string + NamespaceID namespace.ID + StoreID string + IDReusePolicy chasm.BusinessIDReusePolicy + IDConflictPolicy chasm.BusinessIDConflictPolicy } NewPayloadStoreResponse struct { @@ -84,6 +86,7 @@ func NewPayloadStoreHandler( return store, nil, err }, nil, + chasm.WithBusinessIDPolicy(request.IDReusePolicy, request.IDConflictPolicy), ) if err != nil { return NewPayloadStoreResponse{}, err diff --git a/service/history/chasm_engine.go b/service/history/chasm_engine.go index 795742d0fd0..8abe0cf29b0 100644 --- a/service/history/chasm_engine.go +++ b/service/history/chasm_engine.go @@ -447,7 +447,7 @@ func (e *ChasmEngine) handleConflictPolicy( ) (chasm.EntityKey, []byte, error) { switch conflictPolicy { case chasm.BusinessIDConflictPolicyFail: - return chasm.EntityKey{}, nil, serviceerror.NewWorkflowExecutionAlreadyStarted( + return chasm.EntityKey{}, nil, chasm.NewExecutionAlreadyStartedErr( fmt.Sprintf( "CHASM execution still running. BusinessID: %s, RunID: %s, ID Conflict Policy: %v", newEntityParams.entityRef.EntityKey.BusinessID, @@ -457,11 +457,26 @@ func (e *ChasmEngine) handleConflictPolicy( currentRunInfo.createRequestID, currentRunInfo.RunID, ) - case chasm.BusinessIDConflictPolicyTermiateExisting: - // TODO: handle BusinessIDConflictPolicyTermiateExisting + case chasm.BusinessIDConflictPolicyTerminateExisting: + // TODO: handle BusinessIDConflictPolicyTerminateExisting and update TestNewEntity_ConflictPolicy_TerminateExisting. + // + // Today's state-based replication logic can not existly handle this policy correctly + // (or any operation that close and starts a new run in one transaction). + // The termination and creation of new run can not be replicated transactionally. + // + // The main blocker is that state-based replication works on the current state, + // and we may have a chain of runs all created via TerminateExisting policy, meaning + // replication has to replicated all of them transactionally. + // We need a way to break this chain into consistent pieces and replicate them one by one. return chasm.EntityKey{}, nil, serviceerror.NewUnimplemented("ID Conflict Policy Terminate Existing is not yet supported") - // case chasm.BusinessIDConflictPolicyUseExisting: - // return chasm.EntityKey{}, nil, serviceerror.NewUnimplemented("ID Conflict Policy Use Existing is not yet supported") + case chasm.BusinessIDConflictPolicyUseExisting: + existingEntityRef := newEntityParams.entityRef + existingEntityRef.EntityID = currentRunInfo.RunID + serializedRef, err := existingEntityRef.Serialize(e.registry) + if err != nil { + return chasm.EntityKey{}, nil, err + } + return existingEntityRef.EntityKey, serializedRef, nil default: return chasm.EntityKey{}, nil, serviceerror.NewInternal( fmt.Sprintf("unknown business ID conflict policy for newEntity: %v", conflictPolicy), @@ -482,7 +497,7 @@ func (e *ChasmEngine) handleReusePolicy( // Fallthrough to persist the new entity as current run. case chasm.BusinessIDReusePolicyAllowDuplicateFailedOnly: if _, ok := consts.FailedWorkflowStatuses[currentRunInfo.Status]; !ok { - return chasm.EntityKey{}, nil, serviceerror.NewWorkflowExecutionAlreadyStarted( + return chasm.EntityKey{}, nil, chasm.NewExecutionAlreadyStartedErr( fmt.Sprintf( "CHASM execution already completed successfully. BusinessID: %s, RunID: %s, ID Reuse Policy: %v", newEntityParams.entityRef.EntityKey.BusinessID, @@ -495,7 +510,7 @@ func (e *ChasmEngine) handleReusePolicy( } // Fallthrough to persist the new entity as current run. case chasm.BusinessIDReusePolicyRejectDuplicate: - return chasm.EntityKey{}, nil, serviceerror.NewWorkflowExecutionAlreadyStarted( + return chasm.EntityKey{}, nil, chasm.NewExecutionAlreadyStartedErr( fmt.Sprintf( "CHASM execution already finished. BusinessID: %s, RunID: %s, ID Reuse Policy: %v", newEntityParams.entityRef.EntityKey.BusinessID, diff --git a/service/history/chasm_engine_test.go b/service/history/chasm_engine_test.go index bbfde594637..7e859358283 100644 --- a/service/history/chasm_engine_test.go +++ b/service/history/chasm_engine_test.go @@ -349,7 +349,7 @@ func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_FailedOnly_Fail() { chasm.BusinessIDConflictPolicyFail, ), ) - s.IsType(&serviceerror.WorkflowExecutionAlreadyStarted{}, err) + s.ErrorAs(err, new(*chasm.ExecutionAlreadyStartedError)) } func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_RejectDuplicate() { @@ -383,7 +383,87 @@ func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_RejectDuplicate() { chasm.BusinessIDConflictPolicyFail, ), ) - s.IsType(&serviceerror.WorkflowExecutionAlreadyStarted{}, err) + s.ErrorAs(err, new(*chasm.ExecutionAlreadyStartedError)) +} + +func (s *chasmEngineSuite) TestNewEntity_ConflictPolicy_UseExisting() { + tv := testvars.New(s.T()) + tv = tv.WithRunID(tv.Any().RunID()) + + ref := chasm.NewComponentRef[*testComponent]( + chasm.EntityKey{ + NamespaceID: string(tests.NamespaceID), + BusinessID: tv.WorkflowID(), + EntityID: "", + }, + ) + newActivityID := tv.ActivityID() + // Current run is still running, conflict policy will be used. + currentRunConditionFailedErr := s.currentRunConditionFailedErr( + tv, + enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + ) + + s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).Return( + nil, + currentRunConditionFailedErr, + ).Times(1) + + entityKey, serializedRef, err := s.engine.NewEntity( + context.Background(), + ref, + s.newTestEntityFn(newActivityID), + chasm.WithBusinessIDPolicy( + chasm.BusinessIDReusePolicyAllowDuplicate, + chasm.BusinessIDConflictPolicyUseExisting, + ), + ) + s.NoError(err) + + expectedEntityKey := chasm.EntityKey{ + NamespaceID: string(tests.NamespaceID), + BusinessID: tv.WorkflowID(), + EntityID: tv.RunID(), + } + s.Equal(expectedEntityKey, entityKey) + s.validateNewEntityResponseRef(serializedRef, expectedEntityKey) +} + +func (s *chasmEngineSuite) TestNewEntity_ConflictPolicy_TerminateExisting() { + tv := testvars.New(s.T()) + tv = tv.WithRunID(tv.Any().RunID()) + + ref := chasm.NewComponentRef[*testComponent]( + chasm.EntityKey{ + NamespaceID: string(tests.NamespaceID), + BusinessID: tv.WorkflowID(), + EntityID: "", + }, + ) + newActivityID := tv.ActivityID() + // Current run is still running, conflict policy will be used. + currentRunConditionFailedErr := s.currentRunConditionFailedErr( + tv, + enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + ) + + s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).Return( + nil, + currentRunConditionFailedErr, + ).Times(1) + + _, _, err := s.engine.NewEntity( + context.Background(), + ref, + s.newTestEntityFn(newActivityID), + chasm.WithBusinessIDPolicy( + chasm.BusinessIDReusePolicyAllowDuplicate, + chasm.BusinessIDConflictPolicyTerminateExisting, + ), + ) + s.ErrorAs(err, new(*serviceerror.Unimplemented)) } func (s *chasmEngineSuite) newTestEntityFn( diff --git a/tests/chasm_test.go b/tests/chasm_test.go index 7739d0a0794..fee3adc3f8d 100644 --- a/tests/chasm_test.go +++ b/tests/chasm_test.go @@ -56,11 +56,58 @@ func (s *ChasmTestSuite) TestNewPayloadStore() { _, err := tests.NewPayloadStoreHandler( chasm.NewEngineContext(ctx, s.chasmEngine), tests.NewPayloadStoreRequest{ - NamespaceID: s.NamespaceID(), - StoreID: tv.Any().String(), + NamespaceID: s.NamespaceID(), + StoreID: tv.Any().String(), + IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate, + IDConflictPolicy: chasm.BusinessIDConflictPolicyFail, + }, + ) + s.NoError(err) +} + +func (s *ChasmTestSuite) TestNewPayloadStore_ConflictPolicy() { + tv := testvars.New(s.T()) + + ctx, cancel := context.WithTimeout(context.Background(), chasmTestTimeout) + defer cancel() + + storeID := tv.Any().String() + + resp, err := tests.NewPayloadStoreHandler( + chasm.NewEngineContext(ctx, s.chasmEngine), + tests.NewPayloadStoreRequest{ + NamespaceID: s.NamespaceID(), + StoreID: storeID, + IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate, + IDConflictPolicy: chasm.BusinessIDConflictPolicyFail, + }, + ) + s.NoError(err) + + currentRunID := resp.RunID + + resp, err = tests.NewPayloadStoreHandler( + chasm.NewEngineContext(ctx, s.chasmEngine), + tests.NewPayloadStoreRequest{ + NamespaceID: s.NamespaceID(), + StoreID: storeID, + IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate, + IDConflictPolicy: chasm.BusinessIDConflictPolicyFail, + }, + ) + s.ErrorAs(err, new(*chasm.ExecutionAlreadyStartedError)) + + resp, err = tests.NewPayloadStoreHandler( + chasm.NewEngineContext(ctx, s.chasmEngine), + tests.NewPayloadStoreRequest{ + NamespaceID: s.NamespaceID(), + StoreID: storeID, + IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate, + IDConflictPolicy: chasm.BusinessIDConflictPolicyUseExisting, }, ) s.NoError(err) + s.Equal(currentRunID, resp.RunID) } func (s *ChasmTestSuite) TestPayloadStoreVisibility() {