Skip to content

Commit d37cafd

Browse files
committed
Increment activity stamp on each retry attempt
1 parent 59a0543 commit d37cafd

File tree

9 files changed

+93
-6
lines changed

9 files changed

+93
-6
lines changed

api/historyservice/v1/request_response.pb.go

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ message IsActivityTaskValidRequest {
412412
temporal.api.common.v1.WorkflowExecution execution = 2;
413413
temporal.server.api.clock.v1.VectorClock clock = 3;
414414
int64 scheduled_event_id = 4;
415+
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
416+
int32 stamp = 5;
415417
}
416418

417419
message IsActivityTaskValidResponse {

service/history/api/isactivitytaskvalid/api.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func Invoke(
2727
req.Execution.RunId,
2828
),
2929
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
30-
isTaskValid, err := isActivityTaskValid(workflowLease, req.ScheduledEventId)
30+
isTaskValid, err := isActivityTaskValid(workflowLease, req.ScheduledEventId, req.GetStamp())
3131
if err != nil {
3232
return nil, err
3333
}
@@ -49,14 +49,15 @@ func Invoke(
4949
func isActivityTaskValid(
5050
workflowLease api.WorkflowLease,
5151
scheduledEventID int64,
52+
stamp int32,
5253
) (bool, error) {
5354
mutableState := workflowLease.GetMutableState()
5455
if !mutableState.IsWorkflowExecutionRunning() {
5556
return false, consts.ErrWorkflowCompleted
5657
}
5758

5859
ai, ok := mutableState.GetActivityInfo(scheduledEventID)
59-
if ok && ai.StartedEventId == common.EmptyEventID {
60+
if ok && ai.StartedEventId == common.EmptyEventID && ai.GetStamp() == stamp {
6061
return true, nil
6162
}
6263
return false, nil

service/history/api/isactivitytaskvalid/api_test.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,33 +51,51 @@ func (s *apiSuite) TeardownTest() {
5151
func (s *apiSuite) TestWorkflowCompleted() {
5252
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false)
5353

54-
_, err := isActivityTaskValid(s.workflowLease, rand.Int63())
54+
_, err := isActivityTaskValid(s.workflowLease, rand.Int63(), rand.Int31())
5555
s.Error(err)
5656
s.IsType(&serviceerror.NotFound{}, err)
5757
}
5858

5959
func (s *apiSuite) TestWorkflowRunning_ActivityTaskNotStarted() {
6060
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
6161
activityScheduleEventID := rand.Int63()
62+
stamp := rand.Int31()
6263
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
6364
ScheduledEventId: activityScheduleEventID,
6465
StartedEventId: common.EmptyEventID,
66+
Stamp: stamp,
6567
}, true)
6668

67-
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID)
69+
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, stamp)
6870
s.NoError(err)
6971
s.True(valid)
7072
}
7173

7274
func (s *apiSuite) TestWorkflowRunning_ActivityTaskStarted() {
7375
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
7476
activityScheduleEventID := rand.Int63()
77+
stamp := rand.Int31()
7578
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
7679
ScheduledEventId: activityScheduleEventID,
7780
StartedEventId: activityScheduleEventID + 1,
81+
Stamp: stamp,
7882
}, true)
7983

80-
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID)
84+
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, stamp)
85+
s.NoError(err)
86+
s.False(valid)
87+
}
88+
89+
func (s *apiSuite) TestWorkflowRunning_ActivityTaskStampMismatch() {
90+
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
91+
activityScheduleEventID := rand.Int63()
92+
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
93+
ScheduledEventId: activityScheduleEventID,
94+
StartedEventId: common.EmptyEventID,
95+
Stamp: rand.Int31(),
96+
}, true)
97+
98+
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, rand.Int31())
8199
s.NoError(err)
82100
s.False(valid)
83101
}
@@ -87,7 +105,7 @@ func (s *apiSuite) TestWorkflowRunning_ActivityTaskMissing() {
87105
activityScheduleEventID := rand.Int63()
88106
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(nil, false)
89107

90-
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID)
108+
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, rand.Int31())
91109
s.NoError(err)
92110
s.False(valid)
93111
}

service/history/workflow/activity.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func UpdateActivityInfoForRetries(
7575
failure *failurepb.Failure,
7676
nextScheduledTime *timestamppb.Timestamp,
7777
) *persistencespb.ActivityInfo {
78+
previousAttempt := ai.Attempt
7879
ai.Attempt = attempt
7980
ai.Version = version
8081
ai.ScheduledTime = nextScheduledTime
@@ -96,6 +97,10 @@ func UpdateActivityInfoForRetries(
9697
ai.ActivityReset = false
9798
ai.ResetHeartbeats = false
9899

100+
if attempt > previousAttempt {
101+
ai.Stamp++
102+
}
103+
99104
return ai
100105
}
101106

service/history/workflow/mutable_state_impl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5758,6 +5758,7 @@ func (ms *MutableStateImpl) RetryActivity(
57585758
activityInfo.RequestId = ""
57595759
activityInfo.RetryLastFailure = ms.truncateRetryableActivityFailure(activityFailure)
57605760
activityInfo.Attempt++
5761+
activityInfo.Stamp++
57615762
return nil
57625763
}); err != nil {
57635764
return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err

service/history/workflow/mutable_state_impl_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2431,16 +2431,62 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() {
24312431
}
24322432
s.Greater(activityFailure.Size(), failureSizeErrorLimit)
24332433

2434+
prevStamp := activityInfo.Stamp
2435+
24342436
retryState, err := s.mutableState.RetryActivity(activityInfo, activityFailure)
24352437
s.NoError(err)
24362438
s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState)
24372439

24382440
activityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
24392441
s.True(ok)
2442+
s.Greater(activityInfo.Stamp, prevStamp)
2443+
s.Equal(int32(2), activityInfo.Attempt)
24402444
s.LessOrEqual(activityInfo.RetryLastFailure.Size(), failureSizeErrorLimit)
24412445
s.Equal(activityFailure.GetMessage(), activityInfo.RetryLastFailure.Cause.GetMessage())
24422446
}
24432447

2448+
func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() {
2449+
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
2450+
2451+
workflowTaskCompletedEventID := int64(4)
2452+
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
2453+
workflowTaskCompletedEventID,
2454+
&commandpb.ScheduleActivityTaskCommandAttributes{
2455+
ActivityId: "6",
2456+
ActivityType: &commonpb.ActivityType{Name: "activity-type"},
2457+
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
2458+
RetryPolicy: &commonpb.RetryPolicy{
2459+
InitialInterval: timestamp.DurationFromSeconds(1),
2460+
},
2461+
},
2462+
false,
2463+
)
2464+
s.NoError(err)
2465+
2466+
_, err = s.mutableState.AddActivityTaskStartedEvent(
2467+
activityInfo,
2468+
activityInfo.ScheduledEventId,
2469+
uuid.New(),
2470+
"worker-identity",
2471+
nil,
2472+
nil,
2473+
nil,
2474+
)
2475+
s.NoError(err)
2476+
2477+
activityInfo.Paused = true
2478+
prevStamp := activityInfo.Stamp
2479+
2480+
retryState, err := s.mutableState.RetryActivity(activityInfo, &failurepb.Failure{Message: "activity failure"})
2481+
s.NoError(err)
2482+
s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState)
2483+
2484+
updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
2485+
s.True(ok)
2486+
s.Greater(updatedActivityInfo.Stamp, prevStamp)
2487+
s.Equal(int32(2), updatedActivityInfo.Attempt)
2488+
}
2489+
24442490
func (s *mutableStateSuite) TestupdateBuildIdsAndDeploymentSearchAttributes() {
24452491
versioned := func(buildId string) *commonpb.WorkerVersionStamp {
24462492
return &commonpb.WorkerVersionStamp{BuildId: buildId, UseVersioning: true}

service/matching/task_validation.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (v *taskValidatorImpl) isTaskValid(
174174
},
175175
Clock: task.Data.Clock,
176176
ScheduledEventId: task.Data.ScheduledEventId,
177+
Stamp: task.Data.GetStamp(),
177178
})
178179
switch err.(type) {
179180
case nil:

service/matching/task_validation_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func (s *taskValidatorSuite) SetupTest() {
6666
RunId: s.runID,
6767
ScheduledEventId: s.scheduleEventID,
6868
CreateTime: timestamp.TimeNowPtrUtc(),
69+
Stamp: rand.Int31(),
6970
},
7071
}
7172

@@ -194,6 +195,7 @@ func (s *taskValidatorSuite) TestIsTaskValid_ActivityTask_Valid() {
194195
},
195196
Clock: s.task.Data.Clock,
196197
ScheduledEventId: s.task.Data.ScheduledEventId,
198+
Stamp: s.task.Data.GetStamp(),
197199
}).Return(&historyservice.IsActivityTaskValidResponse{IsValid: true}, nil)
198200

199201
valid, err := s.taskValidator.isTaskValid(s.task, taskType)
@@ -212,6 +214,7 @@ func (s *taskValidatorSuite) TestIsTaskValid_ActivityTask_NotFound() {
212214
},
213215
Clock: s.task.Data.Clock,
214216
ScheduledEventId: s.task.Data.ScheduledEventId,
217+
Stamp: s.task.Data.GetStamp(),
215218
}).Return(nil, &serviceerror.NotFound{})
216219

217220
valid, err := s.taskValidator.isTaskValid(s.task, taskType)
@@ -230,6 +233,7 @@ func (s *taskValidatorSuite) TestIsTaskValid_ActivityTask_Error() {
230233
},
231234
Clock: s.task.Data.Clock,
232235
ScheduledEventId: s.task.Data.ScheduledEventId,
236+
Stamp: s.task.Data.GetStamp(),
233237
}).Return(nil, &serviceerror.Unavailable{})
234238

235239
_, err := s.taskValidator.isTaskValid(s.task, taskType)

0 commit comments

Comments
 (0)