Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions api/historyservice/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ message IsActivityTaskValidRequest {
temporal.api.common.v1.WorkflowExecution execution = 2;
temporal.server.api.clock.v1.VectorClock clock = 3;
int64 scheduled_event_id = 4;
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
int32 stamp = 5;
}

message IsActivityTaskValidResponse {
Expand Down
5 changes: 3 additions & 2 deletions service/history/api/isactivitytaskvalid/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Invoke(
req.Execution.RunId,
),
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
isTaskValid, err := isActivityTaskValid(workflowLease, req.ScheduledEventId)
isTaskValid, err := isActivityTaskValid(workflowLease, req.ScheduledEventId, req.GetStamp())
if err != nil {
return nil, err
}
Expand All @@ -49,14 +49,15 @@ func Invoke(
func isActivityTaskValid(
workflowLease api.WorkflowLease,
scheduledEventID int64,
stamp int32,
) (bool, error) {
mutableState := workflowLease.GetMutableState()
if !mutableState.IsWorkflowExecutionRunning() {
return false, consts.ErrWorkflowCompleted
}

ai, ok := mutableState.GetActivityInfo(scheduledEventID)
if ok && ai.StartedEventId == common.EmptyEventID {
if ok && ai.StartedEventId == common.EmptyEventID && ai.GetStamp() == stamp {
return true, nil
}
return false, nil
Expand Down
41 changes: 37 additions & 4 deletions service/history/api/isactivitytaskvalid/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,43 +51,76 @@ func (s *apiSuite) TeardownTest() {
func (s *apiSuite) TestWorkflowCompleted() {
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false)

_, err := isActivityTaskValid(s.workflowLease, rand.Int63())
_, err := isActivityTaskValid(s.workflowLease, rand.Int63(), rand.Int31())
s.Error(err)
s.IsType(&serviceerror.NotFound{}, err)
}

func (s *apiSuite) TestWorkflowRunning_ActivityTaskNotStarted() {
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
activityScheduleEventID := rand.Int63()
stamp := rand.Int31()
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
ScheduledEventId: activityScheduleEventID,
StartedEventId: common.EmptyEventID,
Stamp: stamp,
}, true)

valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID)
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, stamp)
s.NoError(err)
s.True(valid)
}

func (s *apiSuite) TestWorkflowRunning_ActivityTaskStarted() {
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
activityScheduleEventID := rand.Int63()
stamp := rand.Int31()
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
ScheduledEventId: activityScheduleEventID,
StartedEventId: activityScheduleEventID + 1,
Stamp: stamp,
}, true)

valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID)
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, stamp)
s.NoError(err)
s.False(valid)
}

func (s *apiSuite) TestWorkflowRunning_ActivityTaskStampMismatch() {
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
activityScheduleEventID := rand.Int63()
const storedStamp = int32(456)
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
ScheduledEventId: activityScheduleEventID,
StartedEventId: common.EmptyEventID,
Stamp: storedStamp,
}, true)

valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, storedStamp+1)
s.NoError(err)
s.False(valid)
}

func (s *apiSuite) TestWorkflowRunning_ActivityTaskStampLegacy() {
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
activityScheduleEventID := rand.Int63()
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
ScheduledEventId: activityScheduleEventID,
StartedEventId: common.EmptyEventID,
Stamp: 0,
}, true)

valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, 0)
s.NoError(err)
s.True(valid)
}

func (s *apiSuite) TestWorkflowRunning_ActivityTaskMissing() {
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
activityScheduleEventID := rand.Int63()
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(nil, false)

valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID)
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, rand.Int31())
s.NoError(err)
s.False(valid)
}
6 changes: 5 additions & 1 deletion service/history/transfer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,14 @@ func (t *transferQueueStandbyTaskExecutor) pushActivity(
return nil
}

activityTask, ok := task.(*tasks.ActivityTask)
if !ok {
return serviceerror.NewInternal("task is not an ActivityTask")
}
pushActivityInfo := postActionInfo.(*activityTaskPostActionInfo)
return t.transferQueueTaskExecutorBase.pushActivity(
ctx,
task.(*tasks.ActivityTask),
activityTask,
pushActivityInfo.activityTaskScheduleToStartTimeout,
pushActivityInfo.versionDirective,
pushActivityInfo.priority,
Expand Down
5 changes: 5 additions & 0 deletions service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func UpdateActivityInfoForRetries(
failure *failurepb.Failure,
nextScheduledTime *timestamppb.Timestamp,
) *persistencespb.ActivityInfo {
previousAttempt := ai.Attempt
ai.Attempt = attempt
ai.Version = version
ai.ScheduledTime = nextScheduledTime
Expand All @@ -96,6 +97,10 @@ func UpdateActivityInfoForRetries(
ai.ActivityReset = false
ai.ResetHeartbeats = false

if attempt > previousAttempt {
ai.Stamp++
}

return ai
}

Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5764,6 +5764,7 @@ func (ms *MutableStateImpl) RetryActivity(
activityInfo.RequestId = ""
activityInfo.RetryLastFailure = ms.truncateRetryableActivityFailure(activityFailure)
activityInfo.Attempt++
activityInfo.Stamp++
return nil
}); err != nil {
return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err
Expand Down
46 changes: 46 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2432,16 +2432,62 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() {
}
s.Greater(activityFailure.Size(), failureSizeErrorLimit)

prevStamp := activityInfo.Stamp

retryState, err := s.mutableState.RetryActivity(activityInfo, activityFailure)
s.NoError(err)
s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState)

activityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
s.True(ok)
s.Greater(activityInfo.Stamp, prevStamp)
s.Equal(int32(2), activityInfo.Attempt)
s.LessOrEqual(activityInfo.RetryLastFailure.Size(), failureSizeErrorLimit)
s.Equal(activityFailure.GetMessage(), activityInfo.RetryLastFailure.Cause.GetMessage())
}

func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() {
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

workflowTaskCompletedEventID := int64(4)
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
workflowTaskCompletedEventID,
&commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "6",
ActivityType: &commonpb.ActivityType{Name: "activity-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
RetryPolicy: &commonpb.RetryPolicy{
InitialInterval: timestamp.DurationFromSeconds(1),
},
},
false,
)
s.NoError(err)

_, err = s.mutableState.AddActivityTaskStartedEvent(
activityInfo,
activityInfo.ScheduledEventId,
uuid.New(),
"worker-identity",
nil,
nil,
nil,
)
s.NoError(err)

activityInfo.Paused = true
prevStamp := activityInfo.Stamp

retryState, err := s.mutableState.RetryActivity(activityInfo, &failurepb.Failure{Message: "activity failure"})
s.NoError(err)
s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState)

updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
s.True(ok)
s.Greater(updatedActivityInfo.Stamp, prevStamp)
s.Equal(int32(2), updatedActivityInfo.Attempt)
}

func (s *mutableStateSuite) TestupdateBuildIdsAndDeploymentSearchAttributes() {
versioned := func(buildId string) *commonpb.WorkerVersionStamp {
return &commonpb.WorkerVersionStamp{BuildId: buildId, UseVersioning: true}
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ func (r *TaskGeneratorImpl) GenerateActivityTasks(
TaskQueue: activityInfo.TaskQueue,
ScheduledEventID: activityInfo.ScheduledEventId,
Version: activityInfo.Version,
Stamp: activityInfo.Stamp,
})

return nil
Expand Down
1 change: 1 addition & 0 deletions service/matching/task_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (v *taskValidatorImpl) isTaskValid(
},
Clock: task.Data.Clock,
ScheduledEventId: task.Data.ScheduledEventId,
Stamp: task.Data.GetStamp(),
})
switch err.(type) {
case nil:
Expand Down
4 changes: 4 additions & 0 deletions service/matching/task_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (s *taskValidatorSuite) SetupTest() {
RunId: s.runID,
ScheduledEventId: s.scheduleEventID,
CreateTime: timestamp.TimeNowPtrUtc(),
Stamp: rand.Int31(),
},
}

Expand Down Expand Up @@ -194,6 +195,7 @@ func (s *taskValidatorSuite) TestIsTaskValid_ActivityTask_Valid() {
},
Clock: s.task.Data.Clock,
ScheduledEventId: s.task.Data.ScheduledEventId,
Stamp: s.task.Data.GetStamp(),
}).Return(&historyservice.IsActivityTaskValidResponse{IsValid: true}, nil)

valid, err := s.taskValidator.isTaskValid(s.task, taskType)
Expand All @@ -212,6 +214,7 @@ func (s *taskValidatorSuite) TestIsTaskValid_ActivityTask_NotFound() {
},
Clock: s.task.Data.Clock,
ScheduledEventId: s.task.Data.ScheduledEventId,
Stamp: s.task.Data.GetStamp(),
}).Return(nil, &serviceerror.NotFound{})

valid, err := s.taskValidator.isTaskValid(s.task, taskType)
Expand All @@ -230,6 +233,7 @@ func (s *taskValidatorSuite) TestIsTaskValid_ActivityTask_Error() {
},
Clock: s.task.Data.Clock,
ScheduledEventId: s.task.Data.ScheduledEventId,
Stamp: s.task.Data.GetStamp(),
}).Return(nil, &serviceerror.Unavailable{})

_, err := s.taskValidator.isTaskValid(s.task, taskType)
Expand Down
Loading