Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7008,7 +7008,6 @@ func (ms *MutableStateImpl) closeTransaction(
transactionPolicy,
eventBatches,
clearBuffer,
isStateDirty,
); err != nil {
return closeTransactionResult{}, err
}
Expand Down Expand Up @@ -7410,7 +7409,6 @@ func (ms *MutableStateImpl) closeTransactionPrepareTasks(
transactionPolicy historyi.TransactionPolicy,
eventBatches [][]*historypb.HistoryEvent,
clearBufferEvents bool,
isStateDirty bool,
) error {
if err := ms.closeTransactionHandleWorkflowResetTask(
transactionPolicy,
Expand All @@ -7424,7 +7422,7 @@ func (ms *MutableStateImpl) closeTransactionPrepareTasks(

ms.closeTransactionCollapseVisibilityTasks()

if err := ms.closeTransactionGenerateChasmRetentionTask(isStateDirty); err != nil {
if err := ms.closeTransactionGenerateChasmRetentionTask(transactionPolicy); err != nil {
return err
}

Expand All @@ -7441,22 +7439,26 @@ func (ms *MutableStateImpl) closeTransactionPrepareTasks(
}

func (ms *MutableStateImpl) closeTransactionGenerateChasmRetentionTask(
isStateDirty bool,
transactionPolicy historyi.TransactionPolicy,
) error {

if !isStateDirty ||
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fine to check isStateDirty as well (since state changed to close means the root component state must have changed), but not really necessary, we only need to look at current state and state in db.

ms.IsWorkflow() ||
if ms.IsWorkflow() ||
ms.executionState.State != enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED ||
transitionhistory.Compare(
ms.executionState.LastUpdateVersionedTransition,
ms.CurrentVersionedTransition(),
) != 0 {
ms.stateInDB == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
return nil
}

closeTime := ms.timeSource.Now()
ms.executionInfo.CloseTime = timestamppb.New(closeTime)
return ms.taskGenerator.GenerateDeleteHistoryEventTask(closeTime)
// Generate retention timer for chasm executions if it's currentely completed
// but state in DB is not completed, i.e. completing in this transaction.

if transactionPolicy == historyi.TransactionPolicyActive {
// TODO: consider setting CloseTime in ChasmTree closeTransaction() instead of here.
ms.executionInfo.CloseTime = timestamppb.New(ms.timeSource.Now())

// If passive cluster, the CloseTime field should already be populated by the replication stack.
}

return ms.taskGenerator.GenerateDeleteHistoryEventTask(ms.executionInfo.CloseTime.AsTime())
}

func (ms *MutableStateImpl) closeTransactionPrepareReplicationTasks(
Expand Down
63 changes: 62 additions & 1 deletion service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4915,7 +4915,7 @@ func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_Workf
s.Empty(mutation.Tasks[tasks.CategoryTimer])
}

func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_NonWorkflow() {
func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_NonWorkflow_Active() {
dbState := s.buildWorkflowMutableState()

mutableState, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123)
Expand Down Expand Up @@ -4943,13 +4943,74 @@ func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_NonWo
s.NoError(err)
s.Len(mutation.Tasks[tasks.CategoryTimer], 1)
s.Equal(enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, mutation.Tasks[tasks.CategoryTimer][0].GetType())
actualCloseTime, err := mutableState.GetWorkflowCloseTime(context.Background())
s.NoError(err)
s.False(actualCloseTime.IsZero())

// Already closed before, should not generate retention task again.
mutation, _, err = mutableState.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
s.NoError(err)
s.Empty(mutation.Tasks[tasks.CategoryTimer])
}

func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_NonWorkflow_Passive() {
dbState := s.buildWorkflowMutableState()

mutableState, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123)
s.NoError(err)

// First close transaction once to get rid of unrelated tasks like UserTimer and ActivityTimeout
_, err = mutableState.StartTransaction(s.namespaceEntry)
s.NoError(err)
_, _, err = mutableState.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive)
s.NoError(err)

// Switch to a mock CHASM tree
mockChasmTree := historyi.NewMockChasmTree(s.controller)
mutableState.chasmTree = mockChasmTree

mockChasmTree.EXPECT().IsStateDirty().Return(true).AnyTimes()
mockChasmTree.EXPECT().ArchetypeID().Return(chasm.WorkflowArchetypeID + 101).AnyTimes()
mockChasmTree.EXPECT().CloseTransaction().Return(chasm.NodesMutation{}, nil).AnyTimes()
mockChasmTree.EXPECT().ApplyMutation(gomock.Any()).Return(nil).AnyTimes()

// On standby side, multiple transactions can be applied at the same time,
// the latest VT may be larger than execution state's LastUpdateVT, we still need to
// generate retention task in this case.
updatedExecutionInfo := common.CloneProto(mutableState.GetExecutionInfo())
closeTime := time.Now()
currentTransitionCount := updatedExecutionInfo.TransitionHistory[0].TransitionCount
updatedExecutionInfo.TransitionHistory[0].TransitionCount += 10
updatedExecutionInfo.CloseTime = timestamppb.New(closeTime)

updatedExecutionState := common.CloneProto(mutableState.GetExecutionState())
updatedExecutionState.State = enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED
updatedExecutionState.Status = enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED
updatedExecutionState.LastUpdateVersionedTransition = &persistencespb.VersionedTransition{
NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(),
TransitionCount: currentTransitionCount + 1,
}

err = mutableState.ApplyMutation(&persistencespb.WorkflowMutableStateMutation{
ExecutionInfo: updatedExecutionInfo,
ExecutionState: updatedExecutionState,
})
s.NoError(err)

mutation, _, err := mutableState.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyPassive)
s.NoError(err)
s.Len(mutation.Tasks[tasks.CategoryTimer], 1)
s.Equal(enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, mutation.Tasks[tasks.CategoryTimer][0].GetType())
actualCloseTime, err := mutableState.GetWorkflowCloseTime(context.Background())
s.NoError(err)
s.True(actualCloseTime.Equal(closeTime))

// Already closed before, should not generate retention task again.
mutation, _, err = mutableState.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyPassive)
s.NoError(err)
s.Empty(mutation.Tasks[tasks.CategoryTimer])
}

func (s *mutableStateSuite) TestExecutionInfoClone() {
newInstance := reflect.New(reflect.TypeOf(s.mutableState.executionInfo).Elem()).Interface()
clone, ok := newInstance.(*persistencespb.WorkflowExecutionInfo)
Expand Down
18 changes: 17 additions & 1 deletion service/history/workflow/task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,23 @@ func (r *TaskRefresherImpl) Refresh(
return err
}

return mutableState.ChasmTree().RefreshTasks()
if err := mutableState.ChasmTree().RefreshTasks(); err != nil {
return err
}

if !mutableState.IsWorkflow() && mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
closeTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return err
}
taskGenerator := r.taskGeneratorProvider.NewTaskGenerator(
r.shard,
mutableState,
)
return taskGenerator.GenerateDeleteHistoryEventTask(closeTime)
}

return nil
}

func (r *TaskRefresherImpl) PartialRefresh(
Expand Down
Loading