Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions chasm/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ type BusinessIDConflictPolicy int

const (
BusinessIDConflictPolicyFail BusinessIDConflictPolicy = iota
BusinessIDConflictPolicyTermiateExisting
// TODO: Do we want to support UseExisting conflict policy?
// BusinessIDConflictPolicyUseExisting
BusinessIDConflictPolicyTerminateExisting
BusinessIDConflictPolicyUseExisting
)

type TransitionOptions struct {
Expand All @@ -82,7 +81,9 @@ func WithSpeculative() TransitionOption {
}
}

// this only applies to NewEntity and UpdateWithNewEntity
// WithBusinessIDPolicy sets the businessID reuse and conflict policy
// used in the transition when creating a new entity.
// This option only applies to NewEntity() and UpdateWithNewEntity().
func WithBusinessIDPolicy(
reusePolicy BusinessIDReusePolicy,
conflictPolicy BusinessIDConflictPolicy,
Expand All @@ -93,7 +94,8 @@ func WithBusinessIDPolicy(
}
}

// this only applies to NewEntity and UpdateWithNewEntity
// WithRequestID sets the requestID used when creating a new entity.
// This option only applies to NewEntity() and UpdateWithNewEntity().
func WithRequestID(
requestID string,
) TransitionOption {
Expand Down
21 changes: 21 additions & 0 deletions chasm/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package chasm

type ExecutionAlreadyStartedError struct {
Message string
CurrentRequestID string
CurrentRunID string
}

func NewExecutionAlreadyStartedErr(
message, currentRequestID, currentRunID string,
) *ExecutionAlreadyStartedError {
return &ExecutionAlreadyStartedError{
Message: message,
CurrentRequestID: currentRequestID,
CurrentRunID: currentRunID,
}
}

func (e *ExecutionAlreadyStartedError) Error() string {
return e.Message
}
7 changes: 5 additions & 2 deletions chasm/lib/tests/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (

type (
NewPayloadStoreRequest struct {
NamespaceID namespace.ID
StoreID string
NamespaceID namespace.ID
StoreID string
IDReusePolicy chasm.BusinessIDReusePolicy
IDConflictPolicy chasm.BusinessIDConflictPolicy
}

NewPayloadStoreResponse struct {
Expand Down Expand Up @@ -84,6 +86,7 @@ func NewPayloadStoreHandler(
return store, nil, err
},
nil,
chasm.WithBusinessIDPolicy(request.IDReusePolicy, request.IDConflictPolicy),
)
if err != nil {
return NewPayloadStoreResponse{}, err
Expand Down
29 changes: 22 additions & 7 deletions service/history/chasm_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (e *ChasmEngine) handleConflictPolicy(
) (chasm.EntityKey, []byte, error) {
switch conflictPolicy {
case chasm.BusinessIDConflictPolicyFail:
return chasm.EntityKey{}, nil, serviceerror.NewWorkflowExecutionAlreadyStarted(
return chasm.EntityKey{}, nil, chasm.NewExecutionAlreadyStartedErr(
fmt.Sprintf(
"CHASM execution still running. BusinessID: %s, RunID: %s, ID Conflict Policy: %v",
newEntityParams.entityRef.EntityKey.BusinessID,
Expand All @@ -457,11 +457,26 @@ func (e *ChasmEngine) handleConflictPolicy(
currentRunInfo.createRequestID,
currentRunInfo.RunID,
)
case chasm.BusinessIDConflictPolicyTermiateExisting:
// TODO: handle BusinessIDConflictPolicyTermiateExisting
case chasm.BusinessIDConflictPolicyTerminateExisting:
// TODO: handle BusinessIDConflictPolicyTerminateExisting and update TestNewEntity_ConflictPolicy_TerminateExisting.
//
// Today's state-based replication logic can not existly handle this policy correctly
// (or any operation that close and starts a new run in one transaction).
// The termination and creation of new run can not be replicated transactionally.
//
// The main blocker is that state-based replication works on the current state,
// and we may have a chain of runs all created via TerminateExisting policy, meaning
// replication has to replicated all of them transactionally.
// We need a way to break this chain into consistent pieces and replicate them one by one.
return chasm.EntityKey{}, nil, serviceerror.NewUnimplemented("ID Conflict Policy Terminate Existing is not yet supported")
// case chasm.BusinessIDConflictPolicyUseExisting:
// return chasm.EntityKey{}, nil, serviceerror.NewUnimplemented("ID Conflict Policy Use Existing is not yet supported")
case chasm.BusinessIDConflictPolicyUseExisting:
existingEntityRef := newEntityParams.entityRef
existingEntityRef.EntityID = currentRunInfo.RunID
serializedRef, err := existingEntityRef.Serialize(e.registry)
if err != nil {
return chasm.EntityKey{}, nil, err
}
return existingEntityRef.EntityKey, serializedRef, nil
default:
return chasm.EntityKey{}, nil, serviceerror.NewInternal(
fmt.Sprintf("unknown business ID conflict policy for newEntity: %v", conflictPolicy),
Expand All @@ -482,7 +497,7 @@ func (e *ChasmEngine) handleReusePolicy(
// Fallthrough to persist the new entity as current run.
case chasm.BusinessIDReusePolicyAllowDuplicateFailedOnly:
if _, ok := consts.FailedWorkflowStatuses[currentRunInfo.Status]; !ok {
return chasm.EntityKey{}, nil, serviceerror.NewWorkflowExecutionAlreadyStarted(
return chasm.EntityKey{}, nil, chasm.NewExecutionAlreadyStartedErr(
fmt.Sprintf(
"CHASM execution already completed successfully. BusinessID: %s, RunID: %s, ID Reuse Policy: %v",
newEntityParams.entityRef.EntityKey.BusinessID,
Expand All @@ -495,7 +510,7 @@ func (e *ChasmEngine) handleReusePolicy(
}
// Fallthrough to persist the new entity as current run.
case chasm.BusinessIDReusePolicyRejectDuplicate:
return chasm.EntityKey{}, nil, serviceerror.NewWorkflowExecutionAlreadyStarted(
return chasm.EntityKey{}, nil, chasm.NewExecutionAlreadyStartedErr(
fmt.Sprintf(
"CHASM execution already finished. BusinessID: %s, RunID: %s, ID Reuse Policy: %v",
newEntityParams.entityRef.EntityKey.BusinessID,
Expand Down
84 changes: 82 additions & 2 deletions service/history/chasm_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_FailedOnly_Fail() {
chasm.BusinessIDConflictPolicyFail,
),
)
s.IsType(&serviceerror.WorkflowExecutionAlreadyStarted{}, err)
s.ErrorAs(err, new(*chasm.ExecutionAlreadyStartedError))
}

func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_RejectDuplicate() {
Expand Down Expand Up @@ -383,7 +383,87 @@ func (s *chasmEngineSuite) TestNewEntity_ReusePolicy_RejectDuplicate() {
chasm.BusinessIDConflictPolicyFail,
),
)
s.IsType(&serviceerror.WorkflowExecutionAlreadyStarted{}, err)
s.ErrorAs(err, new(*chasm.ExecutionAlreadyStartedError))
}

func (s *chasmEngineSuite) TestNewEntity_ConflictPolicy_UseExisting() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())

ref := chasm.NewComponentRef[*testComponent](
chasm.EntityKey{
NamespaceID: string(tests.NamespaceID),
BusinessID: tv.WorkflowID(),
EntityID: "",
},
)
newActivityID := tv.ActivityID()
// Current run is still running, conflict policy will be used.
currentRunConditionFailedErr := s.currentRunConditionFailedErr(
tv,
enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
)

s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).Return(
nil,
currentRunConditionFailedErr,
).Times(1)

entityKey, serializedRef, err := s.engine.NewEntity(
context.Background(),
ref,
s.newTestEntityFn(newActivityID),
chasm.WithBusinessIDPolicy(
chasm.BusinessIDReusePolicyAllowDuplicate,
chasm.BusinessIDConflictPolicyUseExisting,
),
)
s.NoError(err)

expectedEntityKey := chasm.EntityKey{
NamespaceID: string(tests.NamespaceID),
BusinessID: tv.WorkflowID(),
EntityID: tv.RunID(),
}
s.Equal(expectedEntityKey, entityKey)
s.validateNewEntityResponseRef(serializedRef, expectedEntityKey)
}

func (s *chasmEngineSuite) TestNewEntity_ConflictPolicy_TerminateExisting() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())

ref := chasm.NewComponentRef[*testComponent](
chasm.EntityKey{
NamespaceID: string(tests.NamespaceID),
BusinessID: tv.WorkflowID(),
EntityID: "",
},
)
newActivityID := tv.ActivityID()
// Current run is still running, conflict policy will be used.
currentRunConditionFailedErr := s.currentRunConditionFailedErr(
tv,
enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
)

s.mockExecutionManager.EXPECT().CreateWorkflowExecution(gomock.Any(), gomock.Any()).Return(
nil,
currentRunConditionFailedErr,
).Times(1)

_, _, err := s.engine.NewEntity(
context.Background(),
ref,
s.newTestEntityFn(newActivityID),
chasm.WithBusinessIDPolicy(
chasm.BusinessIDReusePolicyAllowDuplicate,
chasm.BusinessIDConflictPolicyTerminateExisting,
),
)
s.ErrorAs(err, new(*serviceerror.Unimplemented))
}

func (s *chasmEngineSuite) newTestEntityFn(
Expand Down
51 changes: 49 additions & 2 deletions tests/chasm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,58 @@ func (s *ChasmTestSuite) TestNewPayloadStore() {
_, err := tests.NewPayloadStoreHandler(
chasm.NewEngineContext(ctx, s.chasmEngine),
tests.NewPayloadStoreRequest{
NamespaceID: s.NamespaceID(),
StoreID: tv.Any().String(),
NamespaceID: s.NamespaceID(),
StoreID: tv.Any().String(),
IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate,
IDConflictPolicy: chasm.BusinessIDConflictPolicyFail,
},
)
s.NoError(err)
}

func (s *ChasmTestSuite) TestNewPayloadStore_ConflictPolicy() {
tv := testvars.New(s.T())

ctx, cancel := context.WithTimeout(context.Background(), chasmTestTimeout)
defer cancel()

storeID := tv.Any().String()

resp, err := tests.NewPayloadStoreHandler(
chasm.NewEngineContext(ctx, s.chasmEngine),
tests.NewPayloadStoreRequest{
NamespaceID: s.NamespaceID(),
StoreID: storeID,
IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate,
IDConflictPolicy: chasm.BusinessIDConflictPolicyFail,
},
)
s.NoError(err)

currentRunID := resp.RunID

resp, err = tests.NewPayloadStoreHandler(
chasm.NewEngineContext(ctx, s.chasmEngine),
tests.NewPayloadStoreRequest{
NamespaceID: s.NamespaceID(),
StoreID: storeID,
IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate,
IDConflictPolicy: chasm.BusinessIDConflictPolicyFail,
},
)
s.ErrorAs(err, new(*chasm.ExecutionAlreadyStartedError))

resp, err = tests.NewPayloadStoreHandler(
chasm.NewEngineContext(ctx, s.chasmEngine),
tests.NewPayloadStoreRequest{
NamespaceID: s.NamespaceID(),
StoreID: storeID,
IDReusePolicy: chasm.BusinessIDReusePolicyRejectDuplicate,
IDConflictPolicy: chasm.BusinessIDConflictPolicyUseExisting,
},
)
s.NoError(err)
s.Equal(currentRunID, resp.RunID)
}

func (s *ChasmTestSuite) TestPayloadStoreVisibility() {
Expand Down
Loading