Skip to content

Commit 11e54a1

Browse files
authored
Revert "Optimize activity task timeouts (#8470)" (#8530)
This reverts commit c88fecc. ## What changed? Revert activity timeout optimization ## Why? to fix hb timeouts ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s)
1 parent 5e4f8f3 commit 11e54a1

File tree

7 files changed

+2
-997
lines changed

7 files changed

+2
-997
lines changed

service/history/interfaces/mutable_state.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,6 @@ type (
298298
GetWorkflowTaskScheduleToStartTimeoutTask() *tasks.WorkflowTaskTimeoutTask
299299
GetWorkflowTaskStartToCloseTimeoutTask() *tasks.WorkflowTaskTimeoutTask
300300

301-
StoreActivityTimeoutTask(task *tasks.ActivityTimeoutTask)
302-
303301
IsDirty() bool
304302
IsTransitionHistoryEnabled() bool
305303
// StartTransaction sets up the mutable state for transacting.

service/history/interfaces/mutable_state_mock.go

Lines changed: 0 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/workflow/mutable_state_impl.go

Lines changed: 0 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,6 @@ var (
120120
var emptyTasks = []tasks.Task{}
121121

122122
type (
123-
// activityTimeoutTasks holds references to all timeout tasks for a single activity.
124-
activityTimeoutTasks struct {
125-
ScheduleToStartTimeoutTask *tasks.ActivityTimeoutTask
126-
ScheduleToCloseTimeoutTask *tasks.ActivityTimeoutTask
127-
StartToCloseTimeoutTask *tasks.ActivityTimeoutTask
128-
HeartbeatTimeoutTask *tasks.ActivityTimeoutTask
129-
}
130-
131123
MutableStateImpl struct {
132124
pendingActivityTimerHeartbeats map[int64]time.Time // Scheduled Event ID -> LastHeartbeatTimeoutVisibilityInSeconds.
133125
pendingActivityInfoIDs map[int64]*persistencespb.ActivityInfo // Scheduled Event ID -> Activity Info.
@@ -235,10 +227,6 @@ type (
235227
wftScheduleToStartTimeoutTask *tasks.WorkflowTaskTimeoutTask
236228
wftStartToCloseTimeoutTask *tasks.WorkflowTaskTimeoutTask
237229

238-
// In-memory storage for activity timeout tasks. These are set when timeout tasks are generated and used to
239-
// delete them when the activity completes/fails. Map key is the activity's scheduled event ID.
240-
activityTimeoutTasks map[int64]*activityTimeoutTasks
241-
242230
// Do not rely on this, this is only updated on
243231
// Load() and closeTransactionXXX methods. So when
244232
// a transaction is in progress, this value will be
@@ -288,7 +276,6 @@ func NewMutableState(
288276
pendingActivityIDToEventID: make(map[string]int64),
289277
deleteActivityInfos: make(map[int64]struct{}),
290278
syncActivityTasks: make(map[int64]struct{}),
291-
activityTimeoutTasks: make(map[int64]*activityTimeoutTasks),
292279

293280
pendingTimerInfoIDs: make(map[string]*persistencespb.TimerInfo),
294281
pendingTimerEventIDToID: make(map[int64]string),
@@ -1864,9 +1851,6 @@ func (ms *MutableStateImpl) UpdateActivityProgress(
18641851
metrics.OperationTag(metrics.HistoryRecordActivityTaskHeartbeatScope),
18651852
metrics.NamespaceTag(ms.namespaceEntry.Name().String()))
18661853
}
1867-
1868-
// Delete Heartbeat timeout task since heartbeat has been received
1869-
ms.deleteActivityTimeoutTask(ai.ScheduledEventId, enumspb.TIMEOUT_TYPE_HEARTBEAT, ai)
18701854
}
18711855

18721856
// UpdateActivityInfo applies the necessary activity information
@@ -1973,7 +1957,6 @@ func (ms *MutableStateImpl) DeleteActivity(
19731957
// log data inconsistency instead of returning an error
19741958
ms.logDataInconsistency()
19751959
}
1976-
ms.recordActivityTimeoutTasksForDeletion(scheduledEventID, activityInfo)
19771960
} else {
19781961
ms.logError(
19791962
fmt.Sprintf("unable to find activity event id: %v in mutable state", scheduledEventID),
@@ -3652,8 +3635,6 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
36523635
if err := ms.ApplyActivityTaskStartedEvent(event); err != nil {
36533636
return nil, err
36543637
}
3655-
// Delete ScheduleToStart timeout task since activity has now started
3656-
ms.deleteActivityTimeoutTask(scheduledEventID, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, ai)
36573638
return event, nil
36583639
}
36593640

@@ -3677,8 +3658,6 @@ func (ms *MutableStateImpl) AddActivityTaskStartedEvent(
36773658
return nil, err
36783659
}
36793660
ms.syncActivityTasks[ai.ScheduledEventId] = struct{}{}
3680-
// Delete ScheduleToStart timeout task since activity has now started
3681-
ms.deleteActivityTimeoutTask(scheduledEventID, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, ai)
36823661
return nil, nil
36833662
}
36843663

@@ -6209,118 +6188,6 @@ func (ms *MutableStateImpl) GetWorkflowTaskStartToCloseTimeoutTask() *tasks.Work
62096188
return ms.wftStartToCloseTimeoutTask
62106189
}
62116190

6212-
// StoreActivityTimeoutTask stores an activity timeout task reference in memory.
6213-
func (ms *MutableStateImpl) StoreActivityTimeoutTask(
6214-
task *tasks.ActivityTimeoutTask,
6215-
) {
6216-
timeoutTasks, exists := ms.activityTimeoutTasks[task.EventID]
6217-
if !exists {
6218-
timeoutTasks = &activityTimeoutTasks{}
6219-
ms.activityTimeoutTasks[task.EventID] = timeoutTasks
6220-
}
6221-
6222-
switch task.TimeoutType {
6223-
case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START:
6224-
timeoutTasks.ScheduleToStartTimeoutTask = task
6225-
case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
6226-
timeoutTasks.ScheduleToCloseTimeoutTask = task
6227-
case enumspb.TIMEOUT_TYPE_START_TO_CLOSE:
6228-
timeoutTasks.StartToCloseTimeoutTask = task
6229-
case enumspb.TIMEOUT_TYPE_HEARTBEAT:
6230-
timeoutTasks.HeartbeatTimeoutTask = task
6231-
default:
6232-
// No-op for unhandled timeout types
6233-
}
6234-
}
6235-
6236-
// deleteActivityTimeoutTask marks a specific activity timeout task to be deleted.
6237-
// This is used to delete timeout tasks at specific lifecycle points (e.g., when activity starts, when heartbeat is received).
6238-
// The task will be regenerated with updated deadline on the next transaction close if still needed.
6239-
func (ms *MutableStateImpl) deleteActivityTimeoutTask(
6240-
scheduledEventID int64,
6241-
timeoutType enumspb.TimeoutType,
6242-
activityInfo *persistencespb.ActivityInfo,
6243-
) {
6244-
if activityInfo == nil {
6245-
return
6246-
}
6247-
6248-
timeoutTasks, exists := ms.activityTimeoutTasks[scheduledEventID]
6249-
if !exists {
6250-
return
6251-
}
6252-
6253-
var task *tasks.ActivityTimeoutTask
6254-
6255-
switch timeoutType {
6256-
case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START:
6257-
task = timeoutTasks.ScheduleToStartTimeoutTask
6258-
timeoutTasks.ScheduleToStartTimeoutTask = nil
6259-
case enumspb.TIMEOUT_TYPE_HEARTBEAT:
6260-
task = timeoutTasks.HeartbeatTimeoutTask
6261-
timeoutTasks.HeartbeatTimeoutTask = nil
6262-
default:
6263-
return
6264-
}
6265-
6266-
if task == nil {
6267-
return
6268-
}
6269-
6270-
key := task.GetKey()
6271-
ms.BestEffortDeleteTasks[tasks.CategoryTimer] = append(
6272-
ms.BestEffortDeleteTasks[tasks.CategoryTimer],
6273-
key,
6274-
)
6275-
}
6276-
6277-
// recordActivityTimeoutTasksForDeletion records activity timeout tasks for deletion. This is called when an activity
6278-
// completes or fails. It deletes all remaining timeout tasks for the activity.
6279-
func (ms *MutableStateImpl) recordActivityTimeoutTasksForDeletion(
6280-
scheduledEventID int64,
6281-
activityInfo *persistencespb.ActivityInfo,
6282-
) {
6283-
if activityInfo == nil {
6284-
return
6285-
}
6286-
6287-
timeoutTasks, exists := ms.activityTimeoutTasks[scheduledEventID]
6288-
if !exists {
6289-
return
6290-
}
6291-
6292-
// Delete all timeout tasks for this activity
6293-
if task := timeoutTasks.ScheduleToStartTimeoutTask; task != nil {
6294-
ms.BestEffortDeleteTasks[tasks.CategoryTimer] = append(
6295-
ms.BestEffortDeleteTasks[tasks.CategoryTimer],
6296-
task.GetKey(),
6297-
)
6298-
}
6299-
6300-
if task := timeoutTasks.ScheduleToCloseTimeoutTask; task != nil {
6301-
ms.BestEffortDeleteTasks[tasks.CategoryTimer] = append(
6302-
ms.BestEffortDeleteTasks[tasks.CategoryTimer],
6303-
task.GetKey(),
6304-
)
6305-
}
6306-
6307-
if task := timeoutTasks.StartToCloseTimeoutTask; task != nil {
6308-
ms.BestEffortDeleteTasks[tasks.CategoryTimer] = append(
6309-
ms.BestEffortDeleteTasks[tasks.CategoryTimer],
6310-
task.GetKey(),
6311-
)
6312-
}
6313-
6314-
if task := timeoutTasks.HeartbeatTimeoutTask; task != nil {
6315-
ms.BestEffortDeleteTasks[tasks.CategoryTimer] = append(
6316-
ms.BestEffortDeleteTasks[tasks.CategoryTimer],
6317-
task.GetKey(),
6318-
)
6319-
}
6320-
6321-
delete(ms.activityTimeoutTasks, scheduledEventID)
6322-
}
6323-
63246191
func (ms *MutableStateImpl) GetWorkflowStateStatus() (enumsspb.WorkflowExecutionState, enumspb.WorkflowExecutionStatus) {
63256192
return ms.executionState.State, ms.executionState.Status
63266193
}

0 commit comments

Comments
 (0)