Skip to content

Commit 544894d

Browse files
authored
Fix task schedule latency metric (#7055)
1 parent bfdbbe3 commit 544894d

File tree

3 files changed

+13
-9
lines changed

3 files changed

+13
-9
lines changed

service/history/queuev2/queue_base.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,21 +204,22 @@ func (q *queueBase) LockTaskProcessing() {}
204204

205205
func (q *queueBase) UnlockTaskProcessing() {}
206206

207-
func (q *queueBase) processNewTasks() {
207+
func (q *queueBase) processNewTasks() bool {
208208
newExclusiveMaxTaskKey := q.shard.UpdateIfNeededAndGetQueueMaxReadLevel(q.category, q.shard.GetClusterMetadata().GetCurrentClusterName())
209209
if q.category.Type() == persistence.HistoryTaskCategoryTypeImmediate {
210210
newExclusiveMaxTaskKey = persistence.NewImmediateTaskKey(newExclusiveMaxTaskKey.GetTaskID() + 1)
211211
}
212212

213213
newVirtualSliceState, remainingVirtualSliceState, ok := q.newVirtualSliceState.TrySplitByTaskKey(newExclusiveMaxTaskKey)
214214
if !ok {
215-
return
215+
return false
216216
}
217217
q.newVirtualSliceState = remainingVirtualSliceState
218218

219219
newVirtualSlice := NewVirtualSlice(newVirtualSliceState, q.taskInitializer, q.queueReader, NewPendingTaskTracker())
220220

221221
q.virtualQueueManager.AddNewVirtualSliceToRootQueue(newVirtualSlice)
222+
return true
222223
}
223224

224225
func (q *queueBase) updateQueueState(ctx context.Context) {

service/history/queuev2/queue_immediate.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,11 @@ func (q *immediateQueue) processPollTimer() {
153153
// thus this periodic poll is not needed, but we keep it for now to provide a fallback mechanism in case
154154
// there is a bug in the notification mechanism
155155
if q.lastPollTime.Add(q.base.options.PollBackoffInterval()).Before(q.base.timeSource.Now()) {
156-
q.base.logger.Info("processing new tasks because poll timer fired")
157-
q.base.processNewTasks()
156+
newTasks := q.base.processNewTasks()
157+
if newTasks {
158+
// TODO: consider changing it to warn level
159+
q.base.logger.Info("processing new tasks because poll timer fired")
160+
}
158161
q.lastPollTime = q.base.timeSource.Now()
159162
}
160163

service/history/task/task.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,7 @@ func (t *taskImpl) Execute() error {
134134
if t.State() != ctask.TaskStatePending {
135135
return nil
136136
}
137-
if t.GetAttempt() == 0 {
138-
// domain level metrics for the duration between task being submitted to task scheduler and being executed
139-
t.scope.RecordHistogramDuration(metrics.TaskScheduleLatencyPerDomain, time.Since(t.initialSubmitTime))
140-
}
141-
137+
scheduleLatency := t.timeSource.Now().Sub(t.initialSubmitTime)
142138
var err error
143139
t.shouldProcessTask, err = t.taskFilter(t.Task)
144140
if err != nil {
@@ -158,6 +154,10 @@ func (t *taskImpl) Execute() error {
158154
}()
159155
executeResponse, err := t.taskExecutor.Execute(t)
160156
t.scope = executeResponse.Scope
157+
if t.GetAttempt() == 0 {
158+
// domain level metrics for the duration between task being submitted to task scheduler and being executed
159+
t.scope.RecordHistogramDuration(metrics.TaskScheduleLatencyPerDomain, scheduleLatency)
160+
}
161161
if t.isPreviousExecutorActive != executeResponse.IsActiveTask {
162162
t.resetAttempt()
163163
}

0 commit comments

Comments
 (0)