Skip to content

Commit 01bc7bc

Browse files
authored
Fix bugs for history queue v2 (#7178)
1 parent 9e83480 commit 01bc7bc

File tree

7 files changed

+75
-18
lines changed

7 files changed

+75
-18
lines changed

common/log/tag/tags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,11 @@ func PendingTaskCount(count int) Tag {
12071207
return newInt("pending-task-count", count)
12081208
}
12091209

1210+
// MaxTaskCount returns a tag for max task count
1211+
func MaxTaskCount(count int) Tag {
1212+
return newInt("max-task-count", count)
1213+
}
1214+
12101215
// VirtualQueueID returns a tag for virtual queue id
12111216
func VirtualQueueID(id int64) Tag {
12121217
return newInt64("virtual-queue-id", id)

common/metrics/defs.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2697,6 +2697,9 @@ const (
26972697
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer
26982698
WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer
26992699
WorkflowIDCacheRequestsInternalRatelimitedCounter
2700+
VirtualQueueCountGauge
2701+
VirtualQueuePausedGauge
2702+
VirtualQueueRunningGauge
27002703
NumHistoryMetrics
27012704
)
27022705

@@ -3465,6 +3468,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
34653468
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer: {metricName: "workflow_id_external_requests_max_requests_per_seconds", metricType: Timer},
34663469
WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer: {metricName: "workflow_id_internal_requests_max_requests_per_seconds", metricType: Timer},
34673470
WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter},
3471+
VirtualQueueCountGauge: {metricName: "virtual_queue_count", metricType: Gauge},
3472+
VirtualQueuePausedGauge: {metricName: "virtual_queue_paused", metricType: Gauge},
3473+
VirtualQueueRunningGauge: {metricName: "virtual_queue_running", metricType: Gauge},
34683474
},
34693475
Matching: {
34703476
PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"},

service/history/queuev2/mitigator.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/uber/cadence/common/log"
1212
"github.com/uber/cadence/common/log/tag"
1313
"github.com/uber/cadence/common/metrics"
14+
"github.com/uber/cadence/common/types"
1415
)
1516

1617
const (
@@ -53,10 +54,11 @@ func NewMitigator(
5354
options *MitigatorOptions,
5455
) Mitigator {
5556
m := &mitigatorImpl{
56-
monitor: monitor,
57-
logger: logger,
58-
metricsScope: metricsScope,
59-
options: options,
57+
virtualQueueManager: virtualQueueManager,
58+
monitor: monitor,
59+
logger: logger,
60+
metricsScope: metricsScope,
61+
options: options,
6062
}
6163
m.handlers = map[AlertType]func(Alert){
6264
AlertTypeQueuePendingTaskCount: m.handleQueuePendingTaskCount,
@@ -83,17 +85,49 @@ func (m *mitigatorImpl) handleQueuePendingTaskCount(alert Alert) {
8385
virtualQueue.UpdateAndGetState()
8486
}
8587
if m.monitor.GetTotalPendingTaskCount() <= alert.AlertAttributesQueuePendingTaskCount.CriticalPendingTaskCount {
88+
m.logger.Debug("mitigating queue alert, skip mitigation because the alert is no longer valid")
8689
return
8790
}
8891
// Second, getting the stats of pending tasks. We need:
8992
stats := m.collectPendingTaskStats()
9093

9194
// Third, find virtual slices to split given the target pending task count and the stats of pending tasks
9295
targetPendingTaskCount := int(float64(alert.AlertAttributesQueuePendingTaskCount.CriticalPendingTaskCount) * targetLoadFactor)
96+
if m.logger.DebugOn() {
97+
sliceStatesPerDomain := make(map[string][]*types.VirtualSliceState)
98+
for domain, slices := range stats.slicesPerDomain {
99+
for _, s := range slices {
100+
sliceStatesPerDomain[domain] = append(sliceStatesPerDomain[domain], ToPersistenceVirtualSliceState(s.GetState()))
101+
}
102+
}
103+
for s, domainStats := range stats.pendingTaskCountPerDomainPerSlice {
104+
m.logger.Debug("mitigating queue alert, get task stats per slice", tag.Dynamic("slice", ToPersistenceVirtualSliceState(s.GetState())), tag.Dynamic("domain-stats", domainStats))
105+
}
106+
m.logger.Debug("mitigating queue alert, get task stats",
107+
tag.AlertType(int(alert.AlertType)),
108+
tag.Dynamic("pending-task-count-per-domain", stats.pendingTaskCountPerDomain),
109+
tag.Dynamic("slices-per-domain", sliceStatesPerDomain),
110+
tag.Dynamic("pending-task-count", stats.totalPendingTaskCount),
111+
tag.Dynamic("target-task-count", targetPendingTaskCount),
112+
)
113+
}
93114
domainsToClearPerSlice := m.findDomainsToClear(stats, targetPendingTaskCount)
115+
if m.logger.DebugOn() {
116+
for s, domains := range domainsToClearPerSlice {
117+
m.logger.Debug("mitigating queue alert, get domains to clear", tag.Dynamic("slice", ToPersistenceVirtualSliceState(s.GetState())), tag.WorkflowDomainIDs(domains))
118+
}
119+
}
94120

95121
// Finally, split and clear the slices
96122
m.processQueueSplitsAndClear(virtualQueues, domainsToClearPerSlice)
123+
if m.logger.DebugOn() {
124+
virtualQueues := m.virtualQueueManager.VirtualQueues()
125+
state := make(map[int64]*types.VirtualQueueState)
126+
for queueID, vq := range virtualQueues {
127+
state[queueID] = ToPersistenceVirtualQueueState(vq.GetState())
128+
}
129+
m.logger.Debug("mitigating queue alert, get queue state after mitigation", tag.Dynamic("queue-state", state))
130+
}
97131
}
98132

99133
// The stats of pending tasks are used to calculate the domains to clear. We need:

service/history/queuev2/queue_base.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func newQueueBase(
111111
logger.Fatal("Failed to get queue state, probably task category is not supported", tag.Error(err), tag.Dynamic("category", category))
112112
}
113113
queueState := FromPersistenceQueueState(persistenceQueueState)
114-
exclusiveAckLevel := getExclusiveAckLevelFromQueueState(queueState)
114+
exclusiveAckLevel, _ := getExclusiveAckLevelAndMaxQueueIDFromQueueState(queueState)
115115

116116
redispatcher := task.NewRedispatcher(
117117
taskProcessor,
@@ -277,7 +277,8 @@ func (q *queueBase) updateQueueState(ctx context.Context) {
277277
VirtualQueueStates: q.virtualQueueManager.UpdateAndGetState(),
278278
ExclusiveMaxReadLevel: q.newVirtualSliceState.Range.InclusiveMinTaskKey,
279279
}
280-
newExclusiveAckLevel := getExclusiveAckLevelFromQueueState(queueState)
280+
newExclusiveAckLevel, maxQueueID := getExclusiveAckLevelAndMaxQueueIDFromQueueState(queueState)
281+
q.metricsScope.UpdateGauge(metrics.VirtualQueueCountGauge, float64(maxQueueID+1))
281282

282283
// for backward compatibility, we record the timer metrics in shard info scope
283284
pendingTaskCount := q.monitor.GetTotalPendingTaskCount()
@@ -318,7 +319,9 @@ func (q *queueBase) updateQueueState(ctx context.Context) {
318319
}
319320

320321
// even though the ack level is not updated, we still need to update the queue state
321-
err := q.shard.UpdateQueueState(q.category, ToPersistenceQueueState(queueState))
322+
persistenceQueueState := ToPersistenceQueueState(queueState)
323+
q.logger.Debug("store queue state", tag.Dynamic("queue-state", persistenceQueueState))
324+
err := q.shard.UpdateQueueState(q.category, persistenceQueueState)
322325
if err != nil {
323326
q.logger.Error("Failed to update queue state", tag.Error(err))
324327
q.metricsScope.IncCounter(metrics.AckLevelUpdateFailedCounter)
@@ -339,12 +342,14 @@ func (q *queueBase) handleAlert(ctx context.Context, alert *Alert) {
339342
q.updateQueueStateFn(ctx)
340343
}
341344

342-
func getExclusiveAckLevelFromQueueState(state *QueueState) persistence.HistoryTaskKey {
345+
func getExclusiveAckLevelAndMaxQueueIDFromQueueState(state *QueueState) (persistence.HistoryTaskKey, int64) {
346+
maxQueueID := int64(0)
343347
newExclusiveAckLevel := state.ExclusiveMaxReadLevel
344-
for _, virtualQueueState := range state.VirtualQueueStates {
348+
for queueID, virtualQueueState := range state.VirtualQueueStates {
345349
if len(virtualQueueState) != 0 {
346350
newExclusiveAckLevel = persistence.MinHistoryTaskKey(newExclusiveAckLevel, virtualQueueState[0].Range.InclusiveMinTaskKey)
347351
}
352+
maxQueueID = max(maxQueueID, queueID)
348353
}
349-
return newExclusiveAckLevel
354+
return newExclusiveAckLevel, maxQueueID
350355
}

service/history/queuev2/virtual_queue.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,15 +339,21 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
339339
}
340340

341341
pendingTaskCount := q.monitor.GetTotalPendingTaskCount()
342-
if pendingTaskCount > q.options.MaxPendingTasksCount() {
343-
q.logger.Warn("Too many pending tasks, pause loading tasks for a while", tag.PendingTaskCount(pendingTaskCount))
342+
maxTaskCount := q.options.MaxPendingTasksCount()
343+
if pendingTaskCount > maxTaskCount {
344+
q.logger.Warn("Too many pending tasks, pause loading tasks for a while", tag.PendingTaskCount(pendingTaskCount), tag.MaxTaskCount(maxTaskCount))
344345
q.pauseController.Pause(q.options.PollBackoffInterval())
345346
}
346347

347348
if q.pauseController.IsPaused() {
349+
// emit a metric indicating that the virtual queue is paused
350+
q.metricsScope.UpdateGauge(metrics.VirtualQueuePausedGauge, 1.0)
351+
q.logger.Debug("virtual queue is paused")
348352
return
349353
}
350354

355+
// emit a metric indicating that the virtual queue is alive
356+
q.metricsScope.UpdateGauge(metrics.VirtualQueueRunningGauge, 1.0)
351357
sliceToRead := q.sliceToRead.Value.(VirtualSlice)
352358
tasks, err := sliceToRead.GetTasks(q.ctx, q.options.PageSize())
353359
if err != nil {

service/history/queuev2/virtual_queue_manager.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type (
7979
sync.RWMutex
8080
status int32
8181
virtualQueues map[int64]VirtualQueue
82-
createVirtualQueueFn func(VirtualSlice, int64) VirtualQueue
82+
createVirtualQueueFn func(int64, ...VirtualSlice) VirtualQueue
8383

8484
nextForceNewSliceTime time.Time
8585
}
@@ -127,14 +127,14 @@ func NewVirtualQueueManager(
127127
nonRootQueueOptions: nonRootQueueOptions,
128128
status: common.DaemonStatusInitialized,
129129
virtualQueues: virtualQueues,
130-
createVirtualQueueFn: func(s VirtualSlice, queueID int64) VirtualQueue {
130+
createVirtualQueueFn: func(queueID int64, s ...VirtualSlice) VirtualQueue {
131131
var options *VirtualQueueOptions
132132
if queueID == rootQueueID {
133133
options = rootQueueOptions
134134
} else {
135135
options = nonRootQueueOptions
136136
}
137-
return NewVirtualQueue(processor, redispatcher, logger.WithTags(tag.VirtualQueueID(queueID)), metricsScope, timeSource, taskLoadRateLimiter, monitor, []VirtualSlice{s}, options)
137+
return NewVirtualQueue(processor, redispatcher, logger.WithTags(tag.VirtualQueueID(queueID)), metricsScope, timeSource, taskLoadRateLimiter, monitor, s, options)
138138
},
139139
}
140140
}
@@ -184,7 +184,8 @@ func (m *virtualQueueManagerImpl) GetOrCreateVirtualQueue(queueID int64) Virtual
184184
if vq, ok := m.virtualQueues[queueID]; ok {
185185
return vq
186186
}
187-
m.virtualQueues[queueID] = m.createVirtualQueueFn(nil, queueID)
187+
m.virtualQueues[queueID] = m.createVirtualQueueFn(queueID)
188+
m.virtualQueues[queueID].Start()
188189
return m.virtualQueues[queueID]
189190
}
190191

@@ -221,7 +222,7 @@ func (m *virtualQueueManagerImpl) AddNewVirtualSliceToRootQueue(s VirtualSlice)
221222
return
222223
}
223224

224-
m.virtualQueues[rootQueueID] = m.createVirtualQueueFn(s, rootQueueID)
225+
m.virtualQueues[rootQueueID] = m.createVirtualQueueFn(rootQueueID, s)
225226
m.virtualQueues[rootQueueID].Start()
226227
}
227228

service/history/queuev2/virtual_queue_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ func TestVirtualQueueManager_AddNewVirtualSlice(t *testing.T) {
571571
},
572572
status: common.DaemonStatusInitialized,
573573
virtualQueues: virtualQueues,
574-
createVirtualQueueFn: func(s VirtualSlice, queueID int64) VirtualQueue {
574+
createVirtualQueueFn: func(queueID int64, s ...VirtualSlice) VirtualQueue {
575575
vq := NewMockVirtualQueue(ctrl)
576576
vq.EXPECT().Start()
577577
return vq

0 commit comments

Comments
 (0)