diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 71c5cd1d3a..45d8c2a335 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -7008,7 +7008,6 @@ func (ms *MutableStateImpl) closeTransaction( transactionPolicy, eventBatches, clearBuffer, - isStateDirty, ); err != nil { return closeTransactionResult{}, err } @@ -7410,7 +7409,6 @@ func (ms *MutableStateImpl) closeTransactionPrepareTasks( transactionPolicy historyi.TransactionPolicy, eventBatches [][]*historypb.HistoryEvent, clearBufferEvents bool, - isStateDirty bool, ) error { if err := ms.closeTransactionHandleWorkflowResetTask( transactionPolicy, @@ -7424,7 +7422,7 @@ func (ms *MutableStateImpl) closeTransactionPrepareTasks( ms.closeTransactionCollapseVisibilityTasks() - if err := ms.closeTransactionGenerateChasmRetentionTask(isStateDirty); err != nil { + if err := ms.closeTransactionGenerateChasmRetentionTask(transactionPolicy); err != nil { return err } @@ -7441,22 +7439,26 @@ func (ms *MutableStateImpl) closeTransactionPrepareTasks( } func (ms *MutableStateImpl) closeTransactionGenerateChasmRetentionTask( - isStateDirty bool, + transactionPolicy historyi.TransactionPolicy, ) error { - if !isStateDirty || - ms.IsWorkflow() || + if ms.IsWorkflow() || ms.executionState.State != enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED || - transitionhistory.Compare( - ms.executionState.LastUpdateVersionedTransition, - ms.CurrentVersionedTransition(), - ) != 0 { + ms.stateInDB == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED { return nil } - closeTime := ms.timeSource.Now() - ms.executionInfo.CloseTime = timestamppb.New(closeTime) - return ms.taskGenerator.GenerateDeleteHistoryEventTask(closeTime) + // Generate retention timer for chasm executions if it's currentely completed + // but state in DB is not completed, i.e. completing in this transaction. + + if transactionPolicy == historyi.TransactionPolicyActive { + // TODO: consider setting CloseTime in ChasmTree closeTransaction() instead of here. + ms.executionInfo.CloseTime = timestamppb.New(ms.timeSource.Now()) + + // If passive cluster, the CloseTime field should already be populated by the replication stack. + } + + return ms.taskGenerator.GenerateDeleteHistoryEventTask(ms.executionInfo.CloseTime.AsTime()) } func (ms *MutableStateImpl) closeTransactionPrepareReplicationTasks( diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 90225c3a0c..778f091abe 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -4915,7 +4915,7 @@ func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_Workf s.Empty(mutation.Tasks[tasks.CategoryTimer]) } -func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_NonWorkflow() { +func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_NonWorkflow_Active() { dbState := s.buildWorkflowMutableState() mutableState, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123) @@ -4943,6 +4943,9 @@ func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_NonWo s.NoError(err) s.Len(mutation.Tasks[tasks.CategoryTimer], 1) s.Equal(enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, mutation.Tasks[tasks.CategoryTimer][0].GetType()) + actualCloseTime, err := mutableState.GetWorkflowCloseTime(context.Background()) + s.NoError(err) + s.False(actualCloseTime.IsZero()) // Already closed before, should not generate retention task again. mutation, _, err = mutableState.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive) @@ -4950,6 +4953,64 @@ func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_NonWo s.Empty(mutation.Tasks[tasks.CategoryTimer]) } +func (s *mutableStateSuite) TestCloseTransactionGenerateCHASMRetentionTask_NonWorkflow_Passive() { + dbState := s.buildWorkflowMutableState() + + mutableState, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123) + s.NoError(err) + + // First close transaction once to get rid of unrelated tasks like UserTimer and ActivityTimeout + _, err = mutableState.StartTransaction(s.namespaceEntry) + s.NoError(err) + _, _, err = mutableState.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive) + s.NoError(err) + + // Switch to a mock CHASM tree + mockChasmTree := historyi.NewMockChasmTree(s.controller) + mutableState.chasmTree = mockChasmTree + + mockChasmTree.EXPECT().IsStateDirty().Return(true).AnyTimes() + mockChasmTree.EXPECT().ArchetypeID().Return(chasm.WorkflowArchetypeID + 101).AnyTimes() + mockChasmTree.EXPECT().CloseTransaction().Return(chasm.NodesMutation{}, nil).AnyTimes() + mockChasmTree.EXPECT().ApplyMutation(gomock.Any()).Return(nil).AnyTimes() + + // On standby side, multiple transactions can be applied at the same time, + // the latest VT may be larger than execution state's LastUpdateVT, we still need to + // generate retention task in this case. + updatedExecutionInfo := common.CloneProto(mutableState.GetExecutionInfo()) + closeTime := time.Now() + currentTransitionCount := updatedExecutionInfo.TransitionHistory[0].TransitionCount + updatedExecutionInfo.TransitionHistory[0].TransitionCount += 10 + updatedExecutionInfo.CloseTime = timestamppb.New(closeTime) + + updatedExecutionState := common.CloneProto(mutableState.GetExecutionState()) + updatedExecutionState.State = enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED + updatedExecutionState.Status = enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED + updatedExecutionState.LastUpdateVersionedTransition = &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(), + TransitionCount: currentTransitionCount + 1, + } + + err = mutableState.ApplyMutation(&persistencespb.WorkflowMutableStateMutation{ + ExecutionInfo: updatedExecutionInfo, + ExecutionState: updatedExecutionState, + }) + s.NoError(err) + + mutation, _, err := mutableState.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyPassive) + s.NoError(err) + s.Len(mutation.Tasks[tasks.CategoryTimer], 1) + s.Equal(enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, mutation.Tasks[tasks.CategoryTimer][0].GetType()) + actualCloseTime, err := mutableState.GetWorkflowCloseTime(context.Background()) + s.NoError(err) + s.True(actualCloseTime.Equal(closeTime)) + + // Already closed before, should not generate retention task again. + mutation, _, err = mutableState.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyPassive) + s.NoError(err) + s.Empty(mutation.Tasks[tasks.CategoryTimer]) +} + func (s *mutableStateSuite) TestExecutionInfoClone() { newInstance := reflect.New(reflect.TypeOf(s.mutableState.executionInfo).Elem()).Interface() clone, ok := newInstance.(*persistencespb.WorkflowExecutionInfo) diff --git a/service/history/workflow/task_refresher.go b/service/history/workflow/task_refresher.go index 1a5019e6ca..0523a88aed 100644 --- a/service/history/workflow/task_refresher.go +++ b/service/history/workflow/task_refresher.go @@ -77,7 +77,23 @@ func (r *TaskRefresherImpl) Refresh( return err } - return mutableState.ChasmTree().RefreshTasks() + if err := mutableState.ChasmTree().RefreshTasks(); err != nil { + return err + } + + if !mutableState.IsWorkflow() && mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED { + closeTime, err := mutableState.GetWorkflowCloseTime(ctx) + if err != nil { + return err + } + taskGenerator := r.taskGeneratorProvider.NewTaskGenerator( + r.shard, + mutableState, + ) + return taskGenerator.GenerateDeleteHistoryEventTask(closeTime) + } + + return nil } func (r *TaskRefresherImpl) PartialRefresh(