Skip to content

Commit d11e146

Browse files
authored
Improve domain failover for history queue v2 (#7205)
* Improve domain failover for history queue v2 * Redefine interface
1 parent d0d7793 commit d11e146

File tree

11 files changed

+722
-53
lines changed

11 files changed

+722
-53
lines changed

common/metrics/defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2419,6 +2419,7 @@ const (
24192419
TaskRequestsOldScheduler
24202420
TaskRequestsNewScheduler
24212421
PendingTaskGauge
2422+
ReschedulerTaskCountGauge
24222423

24232424
TaskRequestsPerDomain
24242425
TaskLatencyPerDomain
@@ -3194,6 +3195,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
31943195
TaskRequestsOldScheduler: {metricName: "task_requests_old_scheduler", metricType: Counter},
31953196
TaskRequestsNewScheduler: {metricName: "task_requests_new_scheduler", metricType: Counter},
31963197
PendingTaskGauge: {metricName: "pending_task_gauge", metricType: Gauge},
3198+
ReschedulerTaskCountGauge: {metricName: "rescheduler_task_count", metricType: Gauge},
31973199

31983200
// per domain task metrics
31993201

service/history/queuev2/queue_base.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type (
8282
timeSource clock.TimeSource
8383
taskInitializer task.Initializer
8484

85-
redispatcher task.Redispatcher
85+
rescheduler task.Rescheduler
8686
queueReader QueueReader
8787
monitor Monitor
8888
mitigator Mitigator
@@ -114,12 +114,9 @@ func newQueueBase(
114114
queueState := FromPersistenceQueueState(persistenceQueueState)
115115
exclusiveAckLevel, _ := getExclusiveAckLevelAndMaxQueueIDFromQueueState(queueState)
116116

117-
redispatcher := task.NewRedispatcher(
117+
rescheduler := task.NewRescheduler(
118118
taskProcessor,
119119
timeSource,
120-
&task.RedispatcherOptions{
121-
TaskRedispatchInterval: options.RedispatchInterval,
122-
},
123120
logger,
124121
metricsScope,
125122
)
@@ -130,15 +127,14 @@ func newQueueBase(
130127
queueType = task.QueueTypeTimer
131128
}
132129
taskInitializer := func(t persistence.Task) task.Task {
133-
return task.NewHistoryTask(
130+
return task.NewHistoryTaskV2(
134131
shard,
135132
t,
136133
queueType,
137134
task.InitializeLoggerForTask(shard.GetShardID(), t, logger),
138-
func(task persistence.Task) (bool, error) { return true, nil },
139135
taskExecutor,
140136
taskProcessor,
141-
redispatcher,
137+
rescheduler,
142138
shard.GetConfig().TaskCriticalRetryCount,
143139
)
144140
}
@@ -155,7 +151,7 @@ func newQueueBase(
155151
)
156152
virtualQueueManager := NewVirtualQueueManager(
157153
taskProcessor,
158-
redispatcher,
154+
rescheduler,
159155
taskInitializer,
160156
queueReader,
161157
logger,
@@ -204,7 +200,7 @@ func newQueueBase(
204200
options: options,
205201
timeSource: timeSource,
206202
taskInitializer: taskInitializer,
207-
redispatcher: redispatcher,
203+
rescheduler: rescheduler,
208204
queueReader: queueReader,
209205
monitor: monitor,
210206
mitigator: mitigator,
@@ -224,7 +220,7 @@ func newQueueBase(
224220
}
225221

226222
func (q *queueBase) Start() {
227-
q.redispatcher.Start()
223+
q.rescheduler.Start()
228224
q.virtualQueueManager.Start()
229225

230226
q.updateQueueStateTimer = q.timeSource.NewTimer(backoff.JitDuration(
@@ -239,14 +235,16 @@ func (q *queueBase) Stop() {
239235
q.monitor.Unsubscribe()
240236
q.updateQueueStateTimer.Stop()
241237
q.virtualQueueManager.Stop()
242-
q.redispatcher.Stop()
238+
q.rescheduler.Stop()
243239
}
244240

245241
func (q *queueBase) Category() persistence.HistoryTaskCategory {
246242
return q.category
247243
}
248244

249-
func (q *queueBase) FailoverDomain(domainIDs map[string]struct{}) {}
245+
func (q *queueBase) FailoverDomain(domainIDs map[string]struct{}) {
246+
q.rescheduler.RescheduleDomains(domainIDs)
247+
}
250248

251249
func (q *queueBase) HandleAction(ctx context.Context, clusterName string, action *queue.Action) (*queue.ActionResult, error) {
252250
return nil, nil

service/history/queuev2/virtual_queue.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ import (
4141
"github.com/uber/cadence/service/history/task"
4242
)
4343

44+
var (
45+
taskSchedulerThrottleBackoffInterval = time.Second * 5
46+
)
47+
4448
type (
4549
VirtualQueue interface {
4650
common.Daemon
@@ -72,7 +76,7 @@ type (
7276
virtualQueueImpl struct {
7377
queueOptions *VirtualQueueOptions
7478
processor task.Processor
75-
redispatcher task.Redispatcher
79+
rescheduler task.Rescheduler
7680
logger log.Logger
7781
metricsScope metrics.Scope
7882
timeSource clock.TimeSource
@@ -93,7 +97,7 @@ type (
9397

9498
func NewVirtualQueue(
9599
processor task.Processor,
96-
redispatcher task.Redispatcher,
100+
rescheduler task.Rescheduler,
97101
logger log.Logger,
98102
metricsScope metrics.Scope,
99103
timeSource clock.TimeSource,
@@ -112,7 +116,7 @@ func NewVirtualQueue(
112116
return &virtualQueueImpl{
113117
queueOptions: queueOptions,
114118
processor: processor,
115-
redispatcher: redispatcher,
119+
rescheduler: rescheduler,
116120
logger: logger,
117121
metricsScope: metricsScope,
118122
timeSource: timeSource,
@@ -385,7 +389,7 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
385389
scheduledTime := task.GetTaskKey().GetScheduledTime()
386390
// if the scheduled time is in the future, we need to redispatch the task
387391
if now.Before(scheduledTime) {
388-
q.redispatcher.RedispatchTask(task, scheduledTime)
392+
q.rescheduler.RescheduleTask(task, scheduledTime)
389393
continue
390394
}
391395
// shard level metrics for the duration between a task being written to a queue and being fetched from it
@@ -402,7 +406,7 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
402406
}
403407
if !submitted {
404408
q.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
405-
q.redispatcher.AddTask(task)
409+
q.rescheduler.RescheduleTask(task, q.timeSource.Now().Add(taskSchedulerThrottleBackoffInterval))
406410
}
407411
}
408412

service/history/queuev2/virtual_queue_manager.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type (
6161
virtualQueueManagerImpl struct {
6262
processor task.Processor
6363
taskInitializer task.Initializer
64-
redispatcher task.Redispatcher
64+
rescheduler task.Rescheduler
6565
queueReader QueueReader
6666
logger log.Logger
6767
metricsScope metrics.Scope
@@ -81,7 +81,7 @@ type (
8181

8282
func NewVirtualQueueManager(
8383
processor task.Processor,
84-
redispatcher task.Redispatcher,
84+
rescheduler task.Rescheduler,
8585
taskInitializer task.Initializer,
8686
queueReader QueueReader,
8787
logger log.Logger,
@@ -104,13 +104,13 @@ func NewVirtualQueueManager(
104104
} else {
105105
opts = queueManagerOptions.NonRootQueueOptions
106106
}
107-
virtualQueues[queueID] = NewVirtualQueue(processor, redispatcher, logger.WithTags(tag.VirtualQueueID(queueID)), metricsScope, timeSource, taskLoadRateLimiter, monitor, virtualSlices, opts)
107+
virtualQueues[queueID] = NewVirtualQueue(processor, rescheduler, logger.WithTags(tag.VirtualQueueID(queueID)), metricsScope, timeSource, taskLoadRateLimiter, monitor, virtualSlices, opts)
108108
}
109109
return &virtualQueueManagerImpl{
110110
processor: processor,
111111
taskInitializer: taskInitializer,
112112
queueReader: queueReader,
113-
redispatcher: redispatcher,
113+
rescheduler: rescheduler,
114114
logger: logger,
115115
metricsScope: metricsScope,
116116
timeSource: timeSource,
@@ -126,7 +126,7 @@ func NewVirtualQueueManager(
126126
} else {
127127
opts = queueManagerOptions.NonRootQueueOptions
128128
}
129-
return NewVirtualQueue(processor, redispatcher, logger.WithTags(tag.VirtualQueueID(queueID)), metricsScope, timeSource, taskLoadRateLimiter, monitor, s, opts)
129+
return NewVirtualQueue(processor, rescheduler, logger.WithTags(tag.VirtualQueueID(queueID)), metricsScope, timeSource, taskLoadRateLimiter, monitor, s, opts)
130130
},
131131
}
132132
}

service/history/queuev2/virtual_queue_manager_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func TestVirtualQueueManager_VirtualQueues(t *testing.T) {
172172

173173
// Create mock dependencies
174174
mockProcessor := task.NewMockProcessor(ctrl)
175-
mockRedispatcher := task.NewMockRedispatcher(ctrl)
175+
mockRescheduler := task.NewMockRescheduler(ctrl)
176176
mockTaskInitializer := func(t persistence.Task) task.Task {
177177
mockTask := task.NewMockTask(ctrl)
178178
mockTask.EXPECT().GetTaskID().Return(t.GetTaskID())
@@ -198,7 +198,7 @@ func TestVirtualQueueManager_VirtualQueues(t *testing.T) {
198198
manager := &virtualQueueManagerImpl{
199199
processor: mockProcessor,
200200
taskInitializer: mockTaskInitializer,
201-
redispatcher: mockRedispatcher,
201+
rescheduler: mockRescheduler,
202202
queueReader: mockQueueReader,
203203
logger: mockLogger,
204204
metricsScope: mockMetricsScope,
@@ -426,7 +426,7 @@ func TestVirtualQueueManager_UpdateAndGetState(t *testing.T) {
426426

427427
// Create mock dependencies
428428
mockProcessor := task.NewMockProcessor(ctrl)
429-
mockRedispatcher := task.NewMockRedispatcher(ctrl)
429+
mockRescheduler := task.NewMockRescheduler(ctrl)
430430
mockTaskInitializer := func(t persistence.Task) task.Task {
431431
mockTask := task.NewMockTask(ctrl)
432432
mockTask.EXPECT().GetTaskID().Return(t.GetTaskID())
@@ -452,7 +452,7 @@ func TestVirtualQueueManager_UpdateAndGetState(t *testing.T) {
452452
manager := &virtualQueueManagerImpl{
453453
processor: mockProcessor,
454454
taskInitializer: mockTaskInitializer,
455-
redispatcher: mockRedispatcher,
455+
rescheduler: mockRescheduler,
456456
queueReader: mockQueueReader,
457457
logger: mockLogger,
458458
metricsScope: mockMetricsScope,
@@ -533,7 +533,7 @@ func TestVirtualQueueManager_AddNewVirtualSlice(t *testing.T) {
533533

534534
// Create mock dependencies
535535
mockProcessor := task.NewMockProcessor(ctrl)
536-
mockRedispatcher := task.NewMockRedispatcher(ctrl)
536+
mockRescheduler := task.NewMockRescheduler(ctrl)
537537
mockTaskInitializer := func(t persistence.Task) task.Task {
538538
mockTask := task.NewMockTask(ctrl)
539539
mockTask.EXPECT().GetTaskID().Return(t.GetTaskID())
@@ -564,7 +564,7 @@ func TestVirtualQueueManager_AddNewVirtualSlice(t *testing.T) {
564564
manager := &virtualQueueManagerImpl{
565565
processor: mockProcessor,
566566
taskInitializer: mockTaskInitializer,
567-
redispatcher: mockRedispatcher,
567+
rescheduler: mockRescheduler,
568568
queueReader: mockQueueReader,
569569
logger: mockLogger,
570570
metricsScope: mockMetricsScope,

0 commit comments

Comments
 (0)