Skip to content

Commit e774d17

Browse files
authored
Don't attempt to dispatch expired activities (#7204)
We check whether an activity is expired before putting it into the buffer, but once it's there the only way we will find out that it has expired is from the Record(Decision/Activity)TaskStarted call returning an error. This is particularly problematic for TaskLists that have an Activity Rate Limit, since they'll only be able to remove tasks from the buffer according to that Rate Limit. They can enter a steady state where new non-expired tasks are added to the end of the buffer but they'll always expire by the time they actually are attempted. Eagerly removing expired tasks ensures we can promptly dispatch relevant ones.
1 parent d744c76 commit e774d17

File tree

2 files changed

+46
-24
lines changed

2 files changed

+46
-24
lines changed

service/matching/tasklist/task_reader.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,19 @@ func (tr *taskReader) dispatchSingleTaskFromBufferWithRetries(taskInfo *persiste
427427
}
428428

429429
func (tr *taskReader) dispatchSingleTaskFromBuffer(taskInfo *persistence.TaskInfo) (breakDispatchLoop bool, breakRetries bool) {
430+
e := event.E{
431+
TaskListName: tr.taskListID.GetName(),
432+
TaskListType: tr.taskListID.GetType(),
433+
TaskListKind: &tr.tlMgr.taskListKind,
434+
TaskInfo: *taskInfo,
435+
}
436+
if tr.isTaskExpired(taskInfo) {
437+
e.EventName = "Task Expired"
438+
event.Log(e)
439+
tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter)
440+
tr.taskAckManager.AckItem(taskInfo.TaskID)
441+
return false, true
442+
}
430443
isolationGroup, isolationDuration := tr.getIsolationGroupForTask(tr.cancelCtx, taskInfo)
431444
_, isolationGroupIsKnown := tr.taskBuffers[isolationGroup]
432445
if !isolationGroupIsKnown {
@@ -440,13 +453,6 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(taskInfo *persistence.TaskInf
440453
timerScope.Stop()
441454
cancel()
442455

443-
e := event.E{
444-
TaskListName: tr.taskListID.GetName(),
445-
TaskListType: tr.taskListID.GetType(),
446-
TaskListKind: &tr.tlMgr.taskListKind,
447-
TaskInfo: *taskInfo,
448-
}
449-
450456
if err == nil {
451457
e.EventName = "Dispatched Buffered Task"
452458
event.Log(e)
@@ -488,9 +494,9 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(taskInfo *persistence.TaskInf
488494
}
489495

490496
if errors.Is(err, errTaskNotStarted) {
491-
e.EventName = "Dispatch failed on completing task on the passive side because task not started. Will retry dispatch if task is not expired"
497+
e.EventName = "Dispatch failed on completing task on the passive side because task not started."
492498
event.Log(e)
493-
return false, tr.isTaskExpired(taskInfo)
499+
return false, false
494500
}
495501

496502
if errors.Is(err, errWaitTimeNotReachedForEntityNotExists) {

service/matching/tasklist/task_reader_test.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ var defaultIsolationGroups = []string{
5454
func TestDispatchSingleTaskFromBuffer(t *testing.T) {
5555
testCases := []struct {
5656
name string
57-
allowances func(t *testing.T, reader *taskReader)
57+
allowances func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource)
5858
ttl int
5959
breakDispatch bool
6060
breakRetries bool
6161
}{
6262
{
6363
name: "success - no isolation",
64-
allowances: func(t *testing.T, reader *taskReader) {
64+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
6565
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
6666
return "", -1
6767
}
@@ -77,7 +77,7 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
7777
},
7878
{
7979
name: "success - isolation",
80-
allowances: func(t *testing.T, reader *taskReader) {
80+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
8181
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
8282
return defaultIsolationGroup, -1
8383
}
@@ -93,7 +93,7 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
9393
},
9494
{
9595
name: "success - unknown isolation group",
96-
allowances: func(t *testing.T, reader *taskReader) {
96+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
9797
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
9898
return "mystery group", -1
9999
}
@@ -108,9 +108,24 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
108108
breakDispatch: false,
109109
breakRetries: true,
110110
},
111+
{
112+
name: "success - skip expired tasks",
113+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
114+
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
115+
return defaultIsolationGroup, -1
116+
}
117+
reader.dispatchTask = func(ctx context.Context, task *InternalTask) error {
118+
t.Fatal("task must not be dispatched")
119+
return nil
120+
}
121+
},
122+
ttl: -2,
123+
breakDispatch: false,
124+
breakRetries: true,
125+
},
111126
{
112127
name: "Error - context cancelled, should stop",
113-
allowances: func(t *testing.T, reader *taskReader) {
128+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
114129
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
115130
return defaultIsolationGroup, -1
116131
}
@@ -124,7 +139,7 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
124139
},
125140
{
126141
name: "Error - Deadline Exceeded, should retry",
127-
allowances: func(t *testing.T, reader *taskReader) {
142+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
128143
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
129144
return defaultIsolationGroup, -1
130145
}
@@ -138,7 +153,7 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
138153
},
139154
{
140155
name: "Error - throttled, should retry",
141-
allowances: func(t *testing.T, reader *taskReader) {
156+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
142157
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
143158
return defaultIsolationGroup, -1
144159
}
@@ -151,7 +166,7 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
151166
},
152167
{
153168
name: "Error - unknown, should retry",
154-
allowances: func(t *testing.T, reader *taskReader) {
169+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
155170
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
156171
return defaultIsolationGroup, -1
157172
}
@@ -164,7 +179,7 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
164179
},
165180
{
166181
name: "Error - task not started and not expired, should retry",
167-
allowances: func(t *testing.T, reader *taskReader) {
182+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
168183
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
169184
return defaultIsolationGroup, -1
170185
}
@@ -177,22 +192,23 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
177192
breakRetries: false,
178193
},
179194
{
180-
name: "Error - task not started and expired, should not retry",
181-
allowances: func(t *testing.T, reader *taskReader) {
195+
name: "Error - task not started and expired, should retry",
196+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
182197
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
183198
return defaultIsolationGroup, -1
184199
}
185200
reader.dispatchTask = func(ctx context.Context, task *InternalTask) error {
201+
mockTime.Advance(time.Hour)
186202
return errTaskNotStarted
187203
}
188204
},
189-
ttl: -2,
205+
ttl: 1,
190206
breakDispatch: false,
191-
breakRetries: true,
207+
breakRetries: false,
192208
},
193209
{
194210
name: "Error - time not reached to complete task without workflow execution, should retry",
195-
allowances: func(t *testing.T, reader *taskReader) {
211+
allowances: func(t *testing.T, reader *taskReader, mockTime clock.MockedTimeSource) {
196212
reader.getIsolationGroupForTask = func(ctx context.Context, info *persistence.TaskInfo) (string, time.Duration) {
197213
return defaultIsolationGroup, -1
198214
}
@@ -211,7 +227,7 @@ func TestDispatchSingleTaskFromBuffer(t *testing.T) {
211227
c := defaultConfig()
212228
tlm := createTestTaskListManagerWithConfig(t, testlogger.New(t), controller, c, timeSource)
213229
reader := tlm.taskReader
214-
tc.allowances(t, reader)
230+
tc.allowances(t, reader, timeSource)
215231
taskInfo := newTask(timeSource)
216232
taskInfo.Expiry = timeSource.Now().Add(time.Duration(tc.ttl) * time.Second)
217233

0 commit comments

Comments
 (0)