Skip to content

Commit 6f4ed06

Browse files
authored
Revert "Remove timeouts during activity retry (#8509)" (#8526)
This reverts commit 051fb08. ## What changed? Revert activity timeout change ## Why? Reverting activity timeout timer improvment as it might have caused lost heartbeat timer. ## How did you test it? - [x] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s)
1 parent 6b1c5f7 commit 6f4ed06

File tree

2 files changed

+32
-125
lines changed

2 files changed

+32
-125
lines changed

service/history/workflow/mutable_state_impl.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5816,11 +5816,6 @@ func (ms *MutableStateImpl) RetryActivity(
58165816
ai.Attempt+1,
58175817
activityFailure)
58185818

5819-
// Delete old per-attempt timeout tasks since they're invalidated by the retry.
5820-
ms.deleteActivityTimeoutTask(ai.ScheduledEventId, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, ai)
5821-
ms.deleteActivityTimeoutTask(ai.ScheduledEventId, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, ai)
5822-
ms.deleteActivityTimeoutTask(ai.ScheduledEventId, enumspb.TIMEOUT_TYPE_HEARTBEAT, ai)
5823-
58245819
if err := ms.taskGenerator.GenerateActivityRetryTasks(ai); err != nil {
58255820
return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err
58265821
}
@@ -6261,12 +6256,6 @@ func (ms *MutableStateImpl) deleteActivityTimeoutTask(
62616256
case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START:
62626257
task = timeoutTasks.ScheduleToStartTimeoutTask
62636258
timeoutTasks.ScheduleToStartTimeoutTask = nil
6264-
case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
6265-
task = timeoutTasks.ScheduleToCloseTimeoutTask
6266-
timeoutTasks.ScheduleToCloseTimeoutTask = nil
6267-
case enumspb.TIMEOUT_TYPE_START_TO_CLOSE:
6268-
task = timeoutTasks.StartToCloseTimeoutTask
6269-
timeoutTasks.StartToCloseTimeoutTask = nil
62706259
case enumspb.TIMEOUT_TYPE_HEARTBEAT:
62716260
task = timeoutTasks.HeartbeatTimeoutTask
62726261
timeoutTasks.HeartbeatTimeoutTask = nil
@@ -6295,11 +6284,39 @@ func (ms *MutableStateImpl) recordActivityTimeoutTasksForDeletion(
62956284
return
62966285
}
62976286

6287+
timeoutTasks, exists := ms.activityTimeoutTasks[scheduledEventID]
6288+
if !exists {
6289+
return
6290+
}
6291+
62986292
// Delete all timeout tasks for this activity
6299-
ms.deleteActivityTimeoutTask(scheduledEventID, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, activityInfo)
6300-
ms.deleteActivityTimeoutTask(scheduledEventID, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, activityInfo)
6301-
ms.deleteActivityTimeoutTask(scheduledEventID, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, activityInfo)
6302-
ms.deleteActivityTimeoutTask(scheduledEventID, enumspb.TIMEOUT_TYPE_HEARTBEAT, activityInfo)
6293+
if task := timeoutTasks.ScheduleToStartTimeoutTask; task != nil {
6294+
ms.BestEffortDeleteTasks[tasks.CategoryTimer] = append(
6295+
ms.BestEffortDeleteTasks[tasks.CategoryTimer],
6296+
task.GetKey(),
6297+
)
6298+
}
6299+
6300+
if task := timeoutTasks.ScheduleToCloseTimeoutTask; task != nil {
6301+
ms.BestEffortDeleteTasks[tasks.CategoryTimer] = append(
6302+
ms.BestEffortDeleteTasks[tasks.CategoryTimer],
6303+
task.GetKey(),
6304+
)
6305+
}
6306+
6307+
if task := timeoutTasks.StartToCloseTimeoutTask; task != nil {
6308+
ms.BestEffortDeleteTasks[tasks.CategoryTimer] = append(
6309+
ms.BestEffortDeleteTasks[tasks.CategoryTimer],
6310+
task.GetKey(),
6311+
)
6312+
}
6313+
6314+
if task := timeoutTasks.HeartbeatTimeoutTask; task != nil {
6315+
ms.BestEffortDeleteTasks[tasks.CategoryTimer] = append(
6316+
ms.BestEffortDeleteTasks[tasks.CategoryTimer],
6317+
task.GetKey(),
6318+
)
6319+
}
63036320

63046321
delete(ms.activityTimeoutTasks, scheduledEventID)
63056322
}

service/history/workflow/mutable_state_impl_test.go

Lines changed: 0 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,116 +1159,6 @@ func (s *mutableStateSuite) TestActivityTimeoutDeletion_AllTimersDeletedOnComple
11591159
s.True(foundHeartbeat, "Heartbeat should be deleted on completion (if still exists)")
11601160
}
11611161

1162-
func (s *mutableStateSuite) TestActivityTimeoutDeletion_PerAttemptTimersDeletedOnRetry() {
1163-
// Test that when activity is retried, per-attempt timeout tasks (ScheduleToStart, StartToClose, Heartbeat)
1164-
// are deleted but ScheduleToClose is preserved since it spans all retry attempts.
1165-
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
1166-
1167-
// Schedule and start an activity with retry policy
1168-
_, activityInfo, err := s.mutableState.AddActivityTaskScheduledEvent(
1169-
int64(2), // workflowTaskCompletedEventID
1170-
&commandpb.ScheduleActivityTaskCommandAttributes{
1171-
ActivityId: "activity",
1172-
ActivityType: &commonpb.ActivityType{Name: "type"},
1173-
TaskQueue: &taskqueuepb.TaskQueue{Name: "tq"},
1174-
RetryPolicy: &commonpb.RetryPolicy{InitialInterval: durationpb.New(time.Second)},
1175-
},
1176-
false,
1177-
)
1178-
s.NoError(err)
1179-
1180-
_, err = s.mutableState.AddActivityTaskStartedEvent(activityInfo, activityInfo.ScheduledEventId, "", "", nil, nil, nil)
1181-
s.NoError(err)
1182-
1183-
// Create mock timeout tasks for all types
1184-
workflowKey := s.mutableState.GetWorkflowKey()
1185-
scheduledEventID := activityInfo.ScheduledEventId
1186-
1187-
mockScheduleToCloseTask := &tasks.ActivityTimeoutTask{
1188-
WorkflowKey: workflowKey,
1189-
TaskID: 200,
1190-
TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
1191-
EventID: scheduledEventID,
1192-
Stamp: activityInfo.Stamp,
1193-
}
1194-
mockScheduleToStartTask := &tasks.ActivityTimeoutTask{
1195-
WorkflowKey: workflowKey,
1196-
TaskID: 201,
1197-
TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START,
1198-
EventID: scheduledEventID,
1199-
Stamp: activityInfo.Stamp,
1200-
}
1201-
mockStartToCloseTask := &tasks.ActivityTimeoutTask{
1202-
WorkflowKey: workflowKey,
1203-
TaskID: 202,
1204-
TimeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE,
1205-
EventID: scheduledEventID,
1206-
Stamp: activityInfo.Stamp,
1207-
}
1208-
mockHeartbeatTask := &tasks.ActivityTimeoutTask{
1209-
WorkflowKey: workflowKey,
1210-
TaskID: 203,
1211-
TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT,
1212-
EventID: scheduledEventID,
1213-
Stamp: activityInfo.Stamp,
1214-
}
1215-
1216-
// Store all tasks
1217-
if s.mutableState.activityTimeoutTasks == nil {
1218-
s.mutableState.activityTimeoutTasks = make(map[int64]*activityTimeoutTasks)
1219-
}
1220-
s.mutableState.activityTimeoutTasks[scheduledEventID] = &activityTimeoutTasks{
1221-
ScheduleToStartTimeoutTask: mockScheduleToStartTask,
1222-
ScheduleToCloseTimeoutTask: mockScheduleToCloseTask,
1223-
StartToCloseTimeoutTask: mockStartToCloseTask,
1224-
HeartbeatTimeoutTask: mockHeartbeatTask,
1225-
}
1226-
1227-
// Retry the activity
1228-
retryState, err := s.mutableState.RetryActivity(activityInfo, &failurepb.Failure{})
1229-
s.NoError(err)
1230-
s.Equal(enumspb.RETRY_STATE_IN_PROGRESS, retryState)
1231-
1232-
// Verify that per-attempt timeout tasks are in the deletion list
1233-
del := s.mutableState.BestEffortDeleteTasks
1234-
s.Contains(del, tasks.CategoryTimer)
1235-
1236-
foundScheduleToClose := false
1237-
foundStartToClose := false
1238-
foundScheduleToStart := false
1239-
foundHeartbeat := false
1240-
for _, key := range del[tasks.CategoryTimer] {
1241-
if key == mockScheduleToCloseTask.GetKey() {
1242-
foundScheduleToClose = true
1243-
}
1244-
if key == mockStartToCloseTask.GetKey() {
1245-
foundStartToClose = true
1246-
}
1247-
if key == mockScheduleToStartTask.GetKey() {
1248-
foundScheduleToStart = true
1249-
}
1250-
if key == mockHeartbeatTask.GetKey() {
1251-
foundHeartbeat = true
1252-
}
1253-
}
1254-
1255-
// Per-attempt timers should be deleted
1256-
s.True(foundStartToClose, "StartToClose should be deleted on retry")
1257-
s.True(foundScheduleToStart, "ScheduleToStart should be deleted on retry")
1258-
s.True(foundHeartbeat, "Heartbeat should be deleted on retry")
1259-
1260-
// ScheduleToClose should NOT be deleted (it spans all retry attempts)
1261-
s.False(foundScheduleToClose, "ScheduleToClose should NOT be deleted on retry (spans all attempts)")
1262-
1263-
// Verify the in-memory storage reflects the deletions
1264-
timeoutTasks := s.mutableState.activityTimeoutTasks[scheduledEventID]
1265-
s.NotNil(timeoutTasks, "Activity timeout tasks map entry should still exist after retry")
1266-
s.Nil(timeoutTasks.ScheduleToStartTimeoutTask, "ScheduleToStart task reference should be nil")
1267-
s.Nil(timeoutTasks.StartToCloseTimeoutTask, "StartToClose task reference should be nil")
1268-
s.Nil(timeoutTasks.HeartbeatTimeoutTask, "Heartbeat task reference should be nil")
1269-
s.NotNil(timeoutTasks.ScheduleToCloseTimeoutTask, "ScheduleToClose task reference should be preserved")
1270-
}
1271-
12721162
func (s *mutableStateSuite) TestDeleteScheduleToStartTimeout_BothPathsWhenActivityStarts() {
12731163
// Test that ScheduleToStart is deleted for both transient and non-transient activity starts
12741164
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()

0 commit comments

Comments
 (0)