Skip to content

Commit a07ce67

Browse files
authored
Improve logging and metric for history queue v2 (#7221)
1 parent 71832d6 commit a07ce67

12 files changed

+63
-12
lines changed

common/metrics/defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2420,6 +2420,7 @@ const (
24202420
TaskRequestsNewScheduler
24212421
PendingTaskGauge
24222422
ReschedulerTaskCountGauge
2423+
NewHistoryTaskCounter
24232424

24242425
TaskRequestsPerDomain
24252426
TaskLatencyPerDomain
@@ -3196,6 +3197,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
31963197
TaskRequestsNewScheduler: {metricName: "task_requests_new_scheduler", metricType: Counter},
31973198
PendingTaskGauge: {metricName: "pending_task_gauge", metricType: Gauge},
31983199
ReschedulerTaskCountGauge: {metricName: "rescheduler_task_count", metricType: Gauge},
3200+
NewHistoryTaskCounter: {metricName: "new_history_task_counter", metricType: Counter},
31993201

32003202
// per domain task metrics
32013203

service/history/queuev2/pending_task_tracker.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type (
3535
// AddTask adds a task to the pending task tracker.
3636
AddTask(task.Task)
3737
// PruneAckedTasks prunes the acked tasks from the pending task tracker.
38-
PruneAckedTasks()
38+
PruneAckedTasks() int
3939
// GetMinimumTaskKey returns the minimum task key in the pending task tracker, if there are no pending tasks, it returns MaximumHistoryTaskKey.
4040
GetMinimumTaskKey() (persistence.HistoryTaskKey, bool)
4141
// GetTasks returns all the tasks in the pending task tracker, the result should be read-only.
@@ -85,12 +85,14 @@ func (t *pendingTaskTrackerImpl) GetTasks() map[persistence.HistoryTaskKey]task.
8585
return t.taskMap
8686
}
8787

88-
func (t *pendingTaskTrackerImpl) PruneAckedTasks() {
88+
func (t *pendingTaskTrackerImpl) PruneAckedTasks() int {
89+
prunedCount := 0
8990
minTaskKey := persistence.MaximumHistoryTaskKey
9091
for key, task := range t.taskMap {
9192
if task.State() == ctask.TaskStateAcked {
9293
delete(t.taskMap, key)
9394
t.taskCountPerDomain[task.GetDomainID()]--
95+
prunedCount++
9496
continue
9597
}
9698

@@ -99,6 +101,7 @@ func (t *pendingTaskTrackerImpl) PruneAckedTasks() {
99101
}
100102
}
101103
t.minTaskKey = minTaskKey
104+
return prunedCount
102105
}
103106

104107
func (t *pendingTaskTrackerImpl) GetPendingTaskCount() int {

service/history/queuev2/pending_task_tracker_mock.go

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/queuev2/pending_task_tracker_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func TestPendingTaskTracker(t *testing.T) {
1818
name string
1919
setupTasks func(ctrl *gomock.Controller) []*task.MockTask
2020
pruneAcked bool
21+
pruneAckedCount int
2122
clear bool
2223
wantMinKey persistence.HistoryTaskKey
2324
wantHasMinKey bool
@@ -92,6 +93,7 @@ func TestPendingTaskTracker(t *testing.T) {
9293
return []*task.MockTask{task1, task2, task3}
9394
},
9495
pruneAcked: true,
96+
pruneAckedCount: 2,
9597
wantMinKey: persistence.NewHistoryTaskKey(testTime, 2),
9698
wantHasMinKey: true,
9799
wantTaskCount: 1,
@@ -113,6 +115,7 @@ func TestPendingTaskTracker(t *testing.T) {
113115
return []*task.MockTask{task1, task2}
114116
},
115117
pruneAcked: true,
118+
pruneAckedCount: 2,
116119
wantMinKey: persistence.MaximumHistoryTaskKey,
117120
wantHasMinKey: false,
118121
wantTaskCount: 0,
@@ -162,7 +165,8 @@ func TestPendingTaskTracker(t *testing.T) {
162165

163166
// Prune acked tasks if needed
164167
if tt.pruneAcked {
165-
tracker.PruneAckedTasks()
168+
prunedCount := tracker.PruneAckedTasks()
169+
assert.Equal(t, tt.pruneAckedCount, prunedCount)
166170
}
167171

168172
// Clear all tasks if needed

service/history/queuev2/queue_base.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func newQueueBase(
111111
if err != nil {
112112
logger.Fatal("Failed to get queue state, probably task category is not supported", tag.Error(err), tag.Dynamic("category", category))
113113
}
114+
logger.Info("loading queue state", tag.Dynamic("queue-state", persistenceQueueState))
114115
queueState := FromPersistenceQueueState(persistenceQueueState)
115116
exclusiveAckLevel, _ := getExclusiveAckLevelAndMaxQueueIDFromQueueState(queueState)
116117

@@ -266,7 +267,7 @@ func (q *queueBase) processNewTasks() bool {
266267
}
267268
q.newVirtualSliceState = remainingVirtualSliceState
268269

269-
newVirtualSlice := NewVirtualSlice(newVirtualSliceState, q.taskInitializer, q.queueReader, NewPendingTaskTracker())
270+
newVirtualSlice := NewVirtualSlice(newVirtualSliceState, q.taskInitializer, q.queueReader, NewPendingTaskTracker(), q.logger)
270271

271272
q.logger.Debug("processing new tasks", tag.Dynamic("inclusiveMinTaskKey", newVirtualSliceState.Range.InclusiveMinTaskKey), tag.Dynamic("exclusiveMaxTaskKey", newVirtualSliceState.Range.ExclusiveMaxTaskKey))
272273
q.virtualQueueManager.AddNewVirtualSliceToRootQueue(newVirtualSlice)
@@ -326,7 +327,7 @@ func (q *queueBase) updateQueueState(ctx context.Context) {
326327

327328
// even though the ack level is not updated, we still need to update the queue state
328329
persistenceQueueState := ToPersistenceQueueState(queueState)
329-
q.logger.Debug("store queue state", tag.Dynamic("queue-state", persistenceQueueState))
330+
q.logger.Debug("store queue state", tag.Dynamic("queue-state", persistenceQueueState), tag.PendingTaskCount(pendingTaskCount))
330331
err := q.shard.UpdateQueueState(q.category, persistenceQueueState)
331332
if err != nil {
332333
q.logger.Error("Failed to update queue state", tag.Error(err))

service/history/queuev2/queue_immediate.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,13 @@ func (q *immediateQueue) LockTaskProcessing() {
134134
}
135135

136136
func (q *immediateQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
137-
if len(info.Tasks) == 0 {
137+
numTasks := len(info.Tasks)
138+
if numTasks == 0 {
138139
return
139140
}
140141

141142
q.notify()
143+
q.base.metricsScope.AddCounter(metrics.NewHistoryTaskCounter, int64(numTasks))
142144
}
143145

144146
func (q *immediateQueue) notify() {

service/history/queuev2/queue_scheduled.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,19 +138,21 @@ func (q *scheduledQueue) UnlockTaskProcessing() {
138138
}
139139

140140
func (q *scheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) {
141-
if len(info.Tasks) == 0 {
141+
numTasks := len(info.Tasks)
142+
if numTasks == 0 {
142143
return
143144
}
144145

145146
nextTime := info.Tasks[0].GetVisibilityTimestamp()
146-
for i := 1; i < len(info.Tasks); i++ {
147+
for i := 1; i < numTasks; i++ {
147148
ts := info.Tasks[i].GetVisibilityTimestamp()
148149
if ts.Before(nextTime) {
149150
nextTime = ts
150151
}
151152
}
152153

153154
q.notify(nextTime)
155+
q.base.metricsScope.AddCounter(metrics.NewHistoryTaskCounter, int64(numTasks))
154156
}
155157

156158
func (q *scheduledQueue) notify(t time.Time) {

service/history/queuev2/virtual_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
344344

345345
pendingTaskCount := q.monitor.GetTotalPendingTaskCount()
346346
maxTaskCount := q.queueOptions.MaxPendingTasksCount()
347+
// TODO: review the metrics and remove this comment or change the metric from gauge to histogram
348+
q.metricsScope.UpdateGauge(metrics.PendingTaskGauge, float64(pendingTaskCount))
347349
if pendingTaskCount >= maxTaskCount {
348350
q.logger.Warn("Too many pending tasks, pause loading tasks for a while", tag.PendingTaskCount(pendingTaskCount), tag.MaxTaskCount(maxTaskCount))
349351
q.pauseController.Pause(q.queueOptions.PollBackoffInterval())

service/history/queuev2/virtual_queue_manager.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func NewVirtualQueueManager(
9696
for queueID, states := range virtualQueueStates {
9797
virtualSlices := make([]VirtualSlice, len(states))
9898
for i, state := range states {
99-
virtualSlices[i] = NewVirtualSlice(state, taskInitializer, queueReader, NewPendingTaskTracker())
99+
virtualSlices[i] = NewVirtualSlice(state, taskInitializer, queueReader, NewPendingTaskTracker(), logger)
100100
}
101101
var opts *VirtualQueueOptions
102102
if queueID == rootQueueID {
@@ -220,10 +220,13 @@ func (m *virtualQueueManagerImpl) AddNewVirtualSliceToRootQueue(s VirtualSlice)
220220

221221
func (m *virtualQueueManagerImpl) appendOrMergeSlice(vq VirtualQueue, s VirtualSlice) {
222222
now := m.timeSource.Now()
223+
newVirtualSliceState := s.GetState()
223224
if now.After(m.nextForceNewSliceTime) {
225+
m.logger.Debug("append new slice to virtual queue", tag.Dynamic("currentTime", now), tag.Dynamic("nextForceNewSliceTime", m.nextForceNewSliceTime), tag.Dynamic("inclusiveMinTaskKey", newVirtualSliceState.Range.InclusiveMinTaskKey), tag.Dynamic("exclusiveMaxTaskKey", newVirtualSliceState.Range.ExclusiveMaxTaskKey))
224226
vq.AppendSlices(s)
225227
m.nextForceNewSliceTime = now.Add(m.queueManagerOptions.VirtualSliceForceAppendInterval())
226228
return
227229
}
230+
m.logger.Debug("merge slice to virtual queue", tag.Dynamic("currentTime", now), tag.Dynamic("nextForceNewSliceTime", m.nextForceNewSliceTime), tag.Dynamic("inclusiveMinTaskKey", newVirtualSliceState.Range.InclusiveMinTaskKey), tag.Dynamic("exclusiveMaxTaskKey", newVirtualSliceState.Range.ExclusiveMaxTaskKey))
228231
vq.MergeSlices(s)
229232
}

service/history/queuev2/virtual_queue_manager_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,12 @@ func TestVirtualQueueManager_AddNewVirtualSlice(t *testing.T) {
492492
newSlice: nil, // Will be replaced with mock
493493
setupMockQueues: func(mocks map[int64]*MockVirtualQueue, slice *MockVirtualSlice) {
494494
mocks[rootQueueID].EXPECT().MergeSlices(slice)
495+
slice.EXPECT().GetState().Return(VirtualSliceState{
496+
Range: Range{
497+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1),
498+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10),
499+
},
500+
})
495501
},
496502
verifyQueues: func(t *testing.T, queues map[int64]VirtualQueue) {
497503
assert.Contains(t, queues, int64(rootQueueID))
@@ -506,6 +512,12 @@ func TestVirtualQueueManager_AddNewVirtualSlice(t *testing.T) {
506512
newSlice: nil, // Will be replaced with mock
507513
setupMockQueues: func(mocks map[int64]*MockVirtualQueue, slice *MockVirtualSlice) {
508514
mocks[rootQueueID].EXPECT().AppendSlices(slice)
515+
slice.EXPECT().GetState().Return(VirtualSliceState{
516+
Range: Range{
517+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(1),
518+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(10),
519+
},
520+
})
509521
},
510522
verifyQueues: func(t *testing.T, queues map[int64]VirtualQueue) {
511523
assert.Contains(t, queues, int64(rootQueueID))

0 commit comments

Comments
 (0)