diff --git a/api/historyservice/v1/request_response.pb.go b/api/historyservice/v1/request_response.pb.go index 5edbffc98b9..ac88b9ef45c 100644 --- a/api/historyservice/v1/request_response.pb.go +++ b/api/historyservice/v1/request_response.pb.go @@ -2585,8 +2585,10 @@ type IsActivityTaskValidRequest struct { Execution *v14.WorkflowExecution `protobuf:"bytes,2,opt,name=execution,proto3" json:"execution,omitempty"` Clock *v17.VectorClock `protobuf:"bytes,3,opt,name=clock,proto3" json:"clock,omitempty"` ScheduledEventId int64 `protobuf:"varint,4,opt,name=scheduled_event_id,json=scheduledEventId,proto3" json:"scheduled_event_id,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Stamp represents the internal “version” of the activity options and can/will be changed with Activity API. + Stamp int32 `protobuf:"varint,5,opt,name=stamp,proto3" json:"stamp,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *IsActivityTaskValidRequest) Reset() { @@ -2647,6 +2649,13 @@ func (x *IsActivityTaskValidRequest) GetScheduledEventId() int64 { return 0 } +func (x *IsActivityTaskValidRequest) GetStamp() int32 { + if x != nil { + return x.Stamp + } + return 0 +} + type IsActivityTaskValidResponse struct { state protoimpl.MessageState `protogen:"open.v1"` // whether matching service can call history service to start the activity task @@ -10065,12 +10074,13 @@ const file_temporal_server_api_historyservice_v1_request_response_proto_rawDesc "\"RespondActivityTaskCanceledRequest\x12!\n" + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12j\n" + "\x0ecancel_request\x18\x02 \x01(\v2C.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequestR\rcancelRequest:\x1f\x92\xc4\x03\x1b2\x19cancel_request.task_token\"%\n" + - "#RespondActivityTaskCanceledResponse\"\x94\x02\n" + + "#RespondActivityTaskCanceledResponse\"\xaa\x02\n" + "\x1aIsActivityTaskValidRequest\x12!\n" + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12G\n" + "\texecution\x18\x02 \x01(\v2).temporal.api.common.v1.WorkflowExecutionR\texecution\x12?\n" + "\x05clock\x18\x03 \x01(\v2).temporal.server.api.clock.v1.VectorClockR\x05clock\x12,\n" + - "\x12scheduled_event_id\x18\x04 \x01(\x03R\x10scheduledEventId:\x1b\x92\xc4\x03\x17*\x15execution.workflow_id\"8\n" + + "\x12scheduled_event_id\x18\x04 \x01(\x03R\x10scheduledEventId\x12\x14\n" + + "\x05stamp\x18\x05 \x01(\x05R\x05stamp:\x1b\x92\xc4\x03\x17*\x15execution.workflow_id\"8\n" + "\x1bIsActivityTaskValidResponse\x12\x19\n" + "\bis_valid\x18\x01 \x01(\bR\aisValid\"\xfb\x02\n" + "\x1eSignalWorkflowExecutionRequest\x12!\n" + diff --git a/proto/internal/temporal/server/api/historyservice/v1/request_response.proto b/proto/internal/temporal/server/api/historyservice/v1/request_response.proto index 7323854bd6b..9a59895f067 100644 --- a/proto/internal/temporal/server/api/historyservice/v1/request_response.proto +++ b/proto/internal/temporal/server/api/historyservice/v1/request_response.proto @@ -417,6 +417,8 @@ message IsActivityTaskValidRequest { temporal.api.common.v1.WorkflowExecution execution = 2; temporal.server.api.clock.v1.VectorClock clock = 3; int64 scheduled_event_id = 4; + // Stamp represents the internal “version” of the activity options and can/will be changed with Activity API. + int32 stamp = 5; } message IsActivityTaskValidResponse { diff --git a/service/history/api/isactivitytaskvalid/api.go b/service/history/api/isactivitytaskvalid/api.go index d6268dde5bd..c9eae15e3f1 100644 --- a/service/history/api/isactivitytaskvalid/api.go +++ b/service/history/api/isactivitytaskvalid/api.go @@ -27,7 +27,7 @@ func Invoke( req.Execution.RunId, ), func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) { - isTaskValid, err := isActivityTaskValid(workflowLease, req.ScheduledEventId) + isTaskValid, err := isActivityTaskValid(workflowLease, req.ScheduledEventId, req.GetStamp()) if err != nil { return nil, err } @@ -49,6 +49,7 @@ func Invoke( func isActivityTaskValid( workflowLease api.WorkflowLease, scheduledEventID int64, + stamp int32, ) (bool, error) { mutableState := workflowLease.GetMutableState() if !mutableState.IsWorkflowExecutionRunning() { @@ -56,7 +57,7 @@ func isActivityTaskValid( } ai, ok := mutableState.GetActivityInfo(scheduledEventID) - if ok && ai.StartedEventId == common.EmptyEventID { + if ok && ai.StartedEventId == common.EmptyEventID && ai.GetStamp() == stamp { return true, nil } return false, nil diff --git a/service/history/api/isactivitytaskvalid/api_test.go b/service/history/api/isactivitytaskvalid/api_test.go index 35e0e20971d..19cfe15de76 100644 --- a/service/history/api/isactivitytaskvalid/api_test.go +++ b/service/history/api/isactivitytaskvalid/api_test.go @@ -51,7 +51,7 @@ func (s *apiSuite) TeardownTest() { func (s *apiSuite) TestWorkflowCompleted() { s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false) - _, err := isActivityTaskValid(s.workflowLease, rand.Int63()) + _, err := isActivityTaskValid(s.workflowLease, rand.Int63(), rand.Int31()) s.Error(err) s.IsType(&serviceerror.NotFound{}, err) } @@ -59,12 +59,14 @@ func (s *apiSuite) TestWorkflowCompleted() { func (s *apiSuite) TestWorkflowRunning_ActivityTaskNotStarted() { s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) activityScheduleEventID := rand.Int63() + stamp := rand.Int31() s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{ ScheduledEventId: activityScheduleEventID, StartedEventId: common.EmptyEventID, + Stamp: stamp, }, true) - valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID) + valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, stamp) s.NoError(err) s.True(valid) } @@ -72,22 +74,53 @@ func (s *apiSuite) TestWorkflowRunning_ActivityTaskNotStarted() { func (s *apiSuite) TestWorkflowRunning_ActivityTaskStarted() { s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) activityScheduleEventID := rand.Int63() + stamp := rand.Int31() s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{ ScheduledEventId: activityScheduleEventID, StartedEventId: activityScheduleEventID + 1, + Stamp: stamp, }, true) - valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID) + valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, stamp) s.NoError(err) s.False(valid) } +func (s *apiSuite) TestWorkflowRunning_ActivityTaskStampMismatch() { + s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) + activityScheduleEventID := rand.Int63() + const storedStamp = int32(456) + s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{ + ScheduledEventId: activityScheduleEventID, + StartedEventId: common.EmptyEventID, + Stamp: storedStamp, + }, true) + + valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, storedStamp+1) + s.NoError(err) + s.False(valid) +} + +func (s *apiSuite) TestWorkflowRunning_ActivityTaskStampLegacy() { + s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) + activityScheduleEventID := rand.Int63() + s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{ + ScheduledEventId: activityScheduleEventID, + StartedEventId: common.EmptyEventID, + Stamp: 0, + }, true) + + valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, 0) + s.NoError(err) + s.True(valid) +} + func (s *apiSuite) TestWorkflowRunning_ActivityTaskMissing() { s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) activityScheduleEventID := rand.Int63() s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(nil, false) - valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID) + valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, rand.Int31()) s.NoError(err) s.False(valid) } diff --git a/service/history/transfer_queue_standby_task_executor.go b/service/history/transfer_queue_standby_task_executor.go index 466b3c114db..69b11b7035f 100644 --- a/service/history/transfer_queue_standby_task_executor.go +++ b/service/history/transfer_queue_standby_task_executor.go @@ -564,10 +564,14 @@ func (t *transferQueueStandbyTaskExecutor) pushActivity( return nil } + activityTask, ok := task.(*tasks.ActivityTask) + if !ok { + return serviceerror.NewInternal("task is not an ActivityTask") + } pushActivityInfo := postActionInfo.(*activityTaskPostActionInfo) return t.transferQueueTaskExecutorBase.pushActivity( ctx, - task.(*tasks.ActivityTask), + activityTask, pushActivityInfo.activityTaskScheduleToStartTimeout, pushActivityInfo.versionDirective, pushActivityInfo.priority, diff --git a/service/history/workflow/activity.go b/service/history/workflow/activity.go index 96f66aeea24..05fcf95e967 100644 --- a/service/history/workflow/activity.go +++ b/service/history/workflow/activity.go @@ -75,6 +75,7 @@ func UpdateActivityInfoForRetries( failure *failurepb.Failure, nextScheduledTime *timestamppb.Timestamp, ) *persistencespb.ActivityInfo { + previousAttempt := ai.Attempt ai.Attempt = attempt ai.Version = version ai.ScheduledTime = nextScheduledTime @@ -96,6 +97,10 @@ func UpdateActivityInfoForRetries( ai.ActivityReset = false ai.ResetHeartbeats = false + if attempt > previousAttempt { + ai.Stamp++ + } + return ai } diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 5c4aec2f62f..f7bcad07a25 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -5764,6 +5764,7 @@ func (ms *MutableStateImpl) RetryActivity( activityInfo.RequestId = "" activityInfo.RetryLastFailure = ms.truncateRetryableActivityFailure(activityFailure) activityInfo.Attempt++ + activityInfo.Stamp++ return nil }); err != nil { return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 64cb3b7fd6b..bd52df4ee65 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -2432,16 +2432,62 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() { } s.Greater(activityFailure.Size(), failureSizeErrorLimit) + prevStamp := activityInfo.Stamp + retryState, err := s.mutableState.RetryActivity(activityInfo, activityFailure) s.NoError(err) s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState) activityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId) s.True(ok) + s.Greater(activityInfo.Stamp, prevStamp) + s.Equal(int32(2), activityInfo.Attempt) s.LessOrEqual(activityInfo.RetryLastFailure.Size(), failureSizeErrorLimit) s.Equal(activityFailure.GetMessage(), activityInfo.RetryLastFailure.Cause.GetMessage()) } +func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() { + s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() + + workflowTaskCompletedEventID := int64(4) + _, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent( + workflowTaskCompletedEventID, + &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: "6", + ActivityType: &commonpb.ActivityType{Name: "activity-type"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"}, + RetryPolicy: &commonpb.RetryPolicy{ + InitialInterval: timestamp.DurationFromSeconds(1), + }, + }, + false, + ) + s.NoError(err) + + _, err = s.mutableState.AddActivityTaskStartedEvent( + activityInfo, + activityInfo.ScheduledEventId, + uuid.New(), + "worker-identity", + nil, + nil, + nil, + ) + s.NoError(err) + + activityInfo.Paused = true + prevStamp := activityInfo.Stamp + + retryState, err := s.mutableState.RetryActivity(activityInfo, &failurepb.Failure{Message: "activity failure"}) + s.NoError(err) + s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState) + + updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId) + s.True(ok) + s.Greater(updatedActivityInfo.Stamp, prevStamp) + s.Equal(int32(2), updatedActivityInfo.Attempt) +} + func (s *mutableStateSuite) TestupdateBuildIdsAndDeploymentSearchAttributes() { versioned := func(buildId string) *commonpb.WorkerVersionStamp { return &commonpb.WorkerVersionStamp{BuildId: buildId, UseVersioning: true} diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 4cf08cd9509..86956096815 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -552,6 +552,7 @@ func (r *TaskGeneratorImpl) GenerateActivityTasks( TaskQueue: activityInfo.TaskQueue, ScheduledEventID: activityInfo.ScheduledEventId, Version: activityInfo.Version, + Stamp: activityInfo.Stamp, }) return nil diff --git a/service/matching/task_validation.go b/service/matching/task_validation.go index b628660588f..f843b0dbb06 100644 --- a/service/matching/task_validation.go +++ b/service/matching/task_validation.go @@ -174,6 +174,7 @@ func (v *taskValidatorImpl) isTaskValid( }, Clock: task.Data.Clock, ScheduledEventId: task.Data.ScheduledEventId, + Stamp: task.Data.GetStamp(), }) switch err.(type) { case nil: diff --git a/service/matching/task_validation_test.go b/service/matching/task_validation_test.go index 681cd8c3fa7..a16f7e09672 100644 --- a/service/matching/task_validation_test.go +++ b/service/matching/task_validation_test.go @@ -66,6 +66,7 @@ func (s *taskValidatorSuite) SetupTest() { RunId: s.runID, ScheduledEventId: s.scheduleEventID, CreateTime: timestamp.TimeNowPtrUtc(), + Stamp: rand.Int31(), }, } @@ -194,6 +195,7 @@ func (s *taskValidatorSuite) TestIsTaskValid_ActivityTask_Valid() { }, Clock: s.task.Data.Clock, ScheduledEventId: s.task.Data.ScheduledEventId, + Stamp: s.task.Data.GetStamp(), }).Return(&historyservice.IsActivityTaskValidResponse{IsValid: true}, nil) valid, err := s.taskValidator.isTaskValid(s.task, taskType) @@ -212,6 +214,7 @@ func (s *taskValidatorSuite) TestIsTaskValid_ActivityTask_NotFound() { }, Clock: s.task.Data.Clock, ScheduledEventId: s.task.Data.ScheduledEventId, + Stamp: s.task.Data.GetStamp(), }).Return(nil, &serviceerror.NotFound{}) valid, err := s.taskValidator.isTaskValid(s.task, taskType) @@ -230,6 +233,7 @@ func (s *taskValidatorSuite) TestIsTaskValid_ActivityTask_Error() { }, Clock: s.task.Data.Clock, ScheduledEventId: s.task.Data.ScheduledEventId, + Stamp: s.task.Data.GetStamp(), }).Return(nil, &serviceerror.Unavailable{}) _, err := s.taskValidator.isTaskValid(s.task, taskType)