Skip to content

Commit 392f51b

Browse files
yimincmichaely520
andauthored
Validate activity task stamps before reuse (#8536)
## Summary - add the activity task stamp to `IsActivityTaskValidRequest` and generated API stubs - require history's activity validation logic to match the stored stamp - plumb the stamp from matching task validation and cover the new behavior with tests - increment activity stamps for each retry attempt and extend tests for paused retries ## Testing - `GOTOOLCHAIN=local go test -tags test_dep ./service/history/workflow -run TestRetryActivity` *(fails: go.mod requires go >= 1.25.0 and local toolchain is go1.24.3)* ------ https://chatgpt.com/codex/tasks/task_b_68faffae045c832c9d8220bcbe50bac7 --------- Co-authored-by: Michael Youssef <[email protected]>
1 parent 810b373 commit 392f51b

File tree

11 files changed

+119
-11
lines changed

11 files changed

+119
-11
lines changed

api/historyservice/v1/request_response.pb.go

Lines changed: 14 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,8 @@ message IsActivityTaskValidRequest {
417417
temporal.api.common.v1.WorkflowExecution execution = 2;
418418
temporal.server.api.clock.v1.VectorClock clock = 3;
419419
int64 scheduled_event_id = 4;
420+
// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
421+
int32 stamp = 5;
420422
}
421423

422424
message IsActivityTaskValidResponse {

service/history/api/isactivitytaskvalid/api.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func Invoke(
2727
req.Execution.RunId,
2828
),
2929
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
30-
isTaskValid, err := isActivityTaskValid(workflowLease, req.ScheduledEventId)
30+
isTaskValid, err := isActivityTaskValid(workflowLease, req.ScheduledEventId, req.GetStamp())
3131
if err != nil {
3232
return nil, err
3333
}
@@ -49,14 +49,15 @@ func Invoke(
4949
func isActivityTaskValid(
5050
workflowLease api.WorkflowLease,
5151
scheduledEventID int64,
52+
stamp int32,
5253
) (bool, error) {
5354
mutableState := workflowLease.GetMutableState()
5455
if !mutableState.IsWorkflowExecutionRunning() {
5556
return false, consts.ErrWorkflowCompleted
5657
}
5758

5859
ai, ok := mutableState.GetActivityInfo(scheduledEventID)
59-
if ok && ai.StartedEventId == common.EmptyEventID {
60+
if ok && ai.StartedEventId == common.EmptyEventID && ai.GetStamp() == stamp {
6061
return true, nil
6162
}
6263
return false, nil

service/history/api/isactivitytaskvalid/api_test.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,43 +51,76 @@ func (s *apiSuite) TeardownTest() {
5151
func (s *apiSuite) TestWorkflowCompleted() {
5252
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false)
5353

54-
_, err := isActivityTaskValid(s.workflowLease, rand.Int63())
54+
_, err := isActivityTaskValid(s.workflowLease, rand.Int63(), rand.Int31())
5555
s.Error(err)
5656
s.IsType(&serviceerror.NotFound{}, err)
5757
}
5858

5959
func (s *apiSuite) TestWorkflowRunning_ActivityTaskNotStarted() {
6060
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
6161
activityScheduleEventID := rand.Int63()
62+
stamp := rand.Int31()
6263
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
6364
ScheduledEventId: activityScheduleEventID,
6465
StartedEventId: common.EmptyEventID,
66+
Stamp: stamp,
6567
}, true)
6668

67-
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID)
69+
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, stamp)
6870
s.NoError(err)
6971
s.True(valid)
7072
}
7173

7274
func (s *apiSuite) TestWorkflowRunning_ActivityTaskStarted() {
7375
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
7476
activityScheduleEventID := rand.Int63()
77+
stamp := rand.Int31()
7578
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
7679
ScheduledEventId: activityScheduleEventID,
7780
StartedEventId: activityScheduleEventID + 1,
81+
Stamp: stamp,
7882
}, true)
7983

80-
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID)
84+
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, stamp)
8185
s.NoError(err)
8286
s.False(valid)
8387
}
8488

89+
func (s *apiSuite) TestWorkflowRunning_ActivityTaskStampMismatch() {
90+
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
91+
activityScheduleEventID := rand.Int63()
92+
const storedStamp = int32(456)
93+
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
94+
ScheduledEventId: activityScheduleEventID,
95+
StartedEventId: common.EmptyEventID,
96+
Stamp: storedStamp,
97+
}, true)
98+
99+
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, storedStamp+1)
100+
s.NoError(err)
101+
s.False(valid)
102+
}
103+
104+
func (s *apiSuite) TestWorkflowRunning_ActivityTaskStampLegacy() {
105+
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
106+
activityScheduleEventID := rand.Int63()
107+
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(&persistencespb.ActivityInfo{
108+
ScheduledEventId: activityScheduleEventID,
109+
StartedEventId: common.EmptyEventID,
110+
Stamp: 0,
111+
}, true)
112+
113+
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, 0)
114+
s.NoError(err)
115+
s.True(valid)
116+
}
117+
85118
func (s *apiSuite) TestWorkflowRunning_ActivityTaskMissing() {
86119
s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
87120
activityScheduleEventID := rand.Int63()
88121
s.mutableState.EXPECT().GetActivityInfo(activityScheduleEventID).Return(nil, false)
89122

90-
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID)
123+
valid, err := isActivityTaskValid(s.workflowLease, activityScheduleEventID, rand.Int31())
91124
s.NoError(err)
92125
s.False(valid)
93126
}

service/history/transfer_queue_standby_task_executor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,10 +564,14 @@ func (t *transferQueueStandbyTaskExecutor) pushActivity(
564564
return nil
565565
}
566566

567+
activityTask, ok := task.(*tasks.ActivityTask)
568+
if !ok {
569+
return serviceerror.NewInternal("task is not an ActivityTask")
570+
}
567571
pushActivityInfo := postActionInfo.(*activityTaskPostActionInfo)
568572
return t.transferQueueTaskExecutorBase.pushActivity(
569573
ctx,
570-
task.(*tasks.ActivityTask),
574+
activityTask,
571575
pushActivityInfo.activityTaskScheduleToStartTimeout,
572576
pushActivityInfo.versionDirective,
573577
pushActivityInfo.priority,

service/history/workflow/activity.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func UpdateActivityInfoForRetries(
7575
failure *failurepb.Failure,
7676
nextScheduledTime *timestamppb.Timestamp,
7777
) *persistencespb.ActivityInfo {
78+
previousAttempt := ai.Attempt
7879
ai.Attempt = attempt
7980
ai.Version = version
8081
ai.ScheduledTime = nextScheduledTime
@@ -96,6 +97,10 @@ func UpdateActivityInfoForRetries(
9697
ai.ActivityReset = false
9798
ai.ResetHeartbeats = false
9899

100+
if attempt > previousAttempt {
101+
ai.Stamp++
102+
}
103+
99104
return ai
100105
}
101106

service/history/workflow/mutable_state_impl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5764,6 +5764,7 @@ func (ms *MutableStateImpl) RetryActivity(
57645764
activityInfo.RequestId = ""
57655765
activityInfo.RetryLastFailure = ms.truncateRetryableActivityFailure(activityFailure)
57665766
activityInfo.Attempt++
5767+
activityInfo.Stamp++
57675768
return nil
57685769
}); err != nil {
57695770
return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err

service/history/workflow/mutable_state_impl_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2432,16 +2432,62 @@ func (s *mutableStateSuite) TestRetryActivity_TruncateRetryableFailure() {
24322432
}
24332433
s.Greater(activityFailure.Size(), failureSizeErrorLimit)
24342434

2435+
prevStamp := activityInfo.Stamp
2436+
24352437
retryState, err := s.mutableState.RetryActivity(activityInfo, activityFailure)
24362438
s.NoError(err)
24372439
s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState)
24382440

24392441
activityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
24402442
s.True(ok)
2443+
s.Greater(activityInfo.Stamp, prevStamp)
2444+
s.Equal(int32(2), activityInfo.Attempt)
24412445
s.LessOrEqual(activityInfo.RetryLastFailure.Size(), failureSizeErrorLimit)
24422446
s.Equal(activityFailure.GetMessage(), activityInfo.RetryLastFailure.Cause.GetMessage())
24432447
}
24442448

2449+
func (s *mutableStateSuite) TestRetryActivity_PausedIncrementsStamp() {
2450+
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
2451+
2452+
workflowTaskCompletedEventID := int64(4)
2453+
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
2454+
workflowTaskCompletedEventID,
2455+
&commandpb.ScheduleActivityTaskCommandAttributes{
2456+
ActivityId: "6",
2457+
ActivityType: &commonpb.ActivityType{Name: "activity-type"},
2458+
TaskQueue: &taskqueuepb.TaskQueue{Name: "task-queue"},
2459+
RetryPolicy: &commonpb.RetryPolicy{
2460+
InitialInterval: timestamp.DurationFromSeconds(1),
2461+
},
2462+
},
2463+
false,
2464+
)
2465+
s.NoError(err)
2466+
2467+
_, err = s.mutableState.AddActivityTaskStartedEvent(
2468+
activityInfo,
2469+
activityInfo.ScheduledEventId,
2470+
uuid.New(),
2471+
"worker-identity",
2472+
nil,
2473+
nil,
2474+
nil,
2475+
)
2476+
s.NoError(err)
2477+
2478+
activityInfo.Paused = true
2479+
prevStamp := activityInfo.Stamp
2480+
2481+
retryState, err := s.mutableState.RetryActivity(activityInfo, &failurepb.Failure{Message: "activity failure"})
2482+
s.NoError(err)
2483+
s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState)
2484+
2485+
updatedActivityInfo, ok := s.mutableState.GetActivityInfo(activityInfo.ScheduledEventId)
2486+
s.True(ok)
2487+
s.Greater(updatedActivityInfo.Stamp, prevStamp)
2488+
s.Equal(int32(2), updatedActivityInfo.Attempt)
2489+
}
2490+
24452491
func (s *mutableStateSuite) TestupdateBuildIdsAndDeploymentSearchAttributes() {
24462492
versioned := func(buildId string) *commonpb.WorkerVersionStamp {
24472493
return &commonpb.WorkerVersionStamp{BuildId: buildId, UseVersioning: true}

service/history/workflow/task_generator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,7 @@ func (r *TaskGeneratorImpl) GenerateActivityTasks(
552552
TaskQueue: activityInfo.TaskQueue,
553553
ScheduledEventID: activityInfo.ScheduledEventId,
554554
Version: activityInfo.Version,
555+
Stamp: activityInfo.Stamp,
555556
})
556557

557558
return nil

service/matching/task_validation.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (v *taskValidatorImpl) isTaskValid(
174174
},
175175
Clock: task.Data.Clock,
176176
ScheduledEventId: task.Data.ScheduledEventId,
177+
Stamp: task.Data.GetStamp(),
177178
})
178179
switch err.(type) {
179180
case nil:

0 commit comments

Comments
 (0)