Skip to content

Commit 8430e34

Browse files
authored
Retry standby tasks immediately on failover (#7199)
1 parent a0a122e commit 8430e34

14 files changed

+63
-42
lines changed

common/task/fifo_task_scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (f *fifoTaskSchedulerImpl) dispatcher() {
151151
case task := <-f.taskCh:
152152
if err := f.processor.Submit(task); err != nil {
153153
f.logger.Error("failed to submit task to processor", tag.Error(err))
154-
task.Nack()
154+
task.Nack(err)
155155
}
156156
case <-f.shutdownCh:
157157
return
@@ -167,7 +167,7 @@ func (f *fifoTaskSchedulerImpl) drainAndNackTasks() {
167167
for {
168168
select {
169169
case task := <-f.taskCh:
170-
task.Nack()
170+
task.Nack(nil)
171171
default:
172172
return
173173
}

common/task/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type (
5757
// Ack marks the task as successful completed
5858
Ack()
5959
// Nack marks the task as unsuccessful completed
60-
Nack()
60+
Nack(err error)
6161
// Cancel marks the task as canceled
6262
Cancel()
6363
// State returns the current task state

common/task/interface_mock.go

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/task/parallel_task_processor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struc
158158
if r := recover(); r != nil {
159159
p.logger.Error("recovered panic in task execution", tag.Dynamic("recovered-panic", r))
160160
task.HandleErr(fmt.Errorf("recovered panic: %v", r))
161-
task.Nack()
161+
task.Nack(nil)
162162
}
163163
}()
164164

@@ -185,7 +185,7 @@ func (p *parallelTaskProcessorImpl) executeTask(task Task, shutdownCh chan struc
185185

186186
if err := throttleRetry.Do(context.Background(), op); err != nil {
187187
// non-retryable error or exhausted all retries or worker shutdown
188-
task.Nack()
188+
task.Nack(err)
189189
return
190190
}
191191

@@ -245,7 +245,7 @@ func (p *parallelTaskProcessorImpl) drainAndNackTasks() {
245245
for {
246246
select {
247247
case task := <-p.tasksCh:
248-
task.Nack()
248+
task.Nack(nil)
249249
default:
250250
return
251251
}

common/task/parallel_task_processor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func (s *parallelTaskProcessorSuite) TestExecuteTask_NonRetryableError() {
146146
mockTask.EXPECT().Execute().Return(errNonRetryable),
147147
mockTask.EXPECT().HandleErr(errNonRetryable).Return(errNonRetryable),
148148
mockTask.EXPECT().RetryErr(errNonRetryable).Return(false).AnyTimes(),
149-
mockTask.EXPECT().Nack(),
149+
mockTask.EXPECT().Nack(gomock.Any()),
150150
)
151151

152152
s.processor.executeTask(mockTask, make(chan struct{}))
@@ -157,7 +157,7 @@ func (s *parallelTaskProcessorSuite) TestExecuteTask_WorkerStopped() {
157157
mockTask.EXPECT().Execute().Return(errRetryable).AnyTimes()
158158
mockTask.EXPECT().HandleErr(errRetryable).Return(errRetryable).AnyTimes()
159159
mockTask.EXPECT().RetryErr(errRetryable).Return(true).AnyTimes()
160-
mockTask.EXPECT().Nack().Times(1)
160+
mockTask.EXPECT().Nack(gomock.Any()).Times(1)
161161

162162
done := make(chan struct{})
163163
workerShutdownCh := make(chan struct{})
@@ -265,7 +265,7 @@ func (s *parallelTaskProcessorSuite) TestProcessorContract() {
265265
taskStatus[mockTask] = TaskStateAcked
266266
taskWG.Done()
267267
}).MaxTimes(1)
268-
mockTask.EXPECT().Nack().Do(func() {
268+
mockTask.EXPECT().Nack(gomock.Any()).Do(func(err error) {
269269
taskStatusLock.Lock()
270270
defer taskStatusLock.Unlock()
271271

@@ -315,7 +315,7 @@ func (s *parallelTaskProcessorSuite) TestExecuteTask_PanicHandling() {
315315
panic("A panic occurred")
316316
})
317317
mockTask.EXPECT().HandleErr(gomock.Any()).Return(errRetryable).AnyTimes()
318-
mockTask.EXPECT().Nack().Times(1)
318+
mockTask.EXPECT().Nack(gomock.Any()).Times(1)
319319
done := make(chan struct{})
320320
workerShutdownCh := make(chan struct{})
321321
go func() {

common/task/sequential_task_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func (t *sequentialTaskProcessorImpl) processTaskOnce(taskqueue SequentialTaskQu
185185
taskqueue.Add(task)
186186
} else {
187187
t.logger.Error("Unable to process task", tag.Error(err))
188-
task.Nack()
188+
task.Nack(err)
189189
}
190190
} else {
191191
task.Ack()

common/task/sequential_task_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (t *testSequentialTaskImpl) NumAcked() int {
276276
return t.acked
277277
}
278278

279-
func (t *testSequentialTaskImpl) Nack() {
279+
func (t *testSequentialTaskImpl) Nack(err error) {
280280
t.lock.Lock()
281281
defer t.lock.Unlock()
282282

common/task/weighted_round_robin_task_scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (w *weightedRoundRobinTaskSchedulerImpl[K]) dispatchTasks() {
191191
hasTask = true
192192
if err := w.processor.Submit(task); err != nil {
193193
w.logger.Error("fail to submit task to processor", tag.Error(err))
194-
task.Nack()
194+
task.Nack(err)
195195
}
196196
case <-w.shutdownCh:
197197
return
@@ -218,7 +218,7 @@ func drainAndNackPriorityTask(taskCh <-chan PriorityTask) {
218218
for {
219219
select {
220220
case task := <-taskCh:
221-
task.Nack()
221+
task.Nack(nil)
222222
default:
223223
return
224224
}

common/task/weighted_round_robin_task_scheduler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ func (s *weightedRoundRobinTaskSchedulerSuite) TestDispatcher_SubmitWithNoError(
227227
func (s *weightedRoundRobinTaskSchedulerSuite) TestDispatcher_FailToSubmit() {
228228
mockTask := NewMockPriorityTask(s.controller)
229229
mockTask.EXPECT().Priority().Return(0)
230-
mockTask.EXPECT().Nack()
230+
mockTask.EXPECT().Nack(gomock.Any())
231231

232232
var taskWG sync.WaitGroup
233233
s.scheduler.Submit(mockTask)
@@ -328,7 +328,7 @@ func testSchedulerContract(
328328
taskStatus[mockTask] = TaskStateAcked
329329
taskWG.Done()
330330
}).MaxTimes(1)
331-
mockTask.EXPECT().Nack().Do(func() {
331+
mockTask.EXPECT().Nack(gomock.Any()).Do(func(err error) {
332332
taskStatusLock.Lock()
333333
defer taskStatusLock.Unlock()
334334

service/history/task/interface_mock.go

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)