Skip to content

Commit bc0fa89

Browse files
authored
Set a limit on page size for history queue v2 (#7179)
1 parent 01bc7bc commit bc0fa89

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

service/history/queuev2/virtual_queue.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
340340

341341
pendingTaskCount := q.monitor.GetTotalPendingTaskCount()
342342
maxTaskCount := q.options.MaxPendingTasksCount()
343-
if pendingTaskCount > maxTaskCount {
343+
if pendingTaskCount >= maxTaskCount {
344344
q.logger.Warn("Too many pending tasks, pause loading tasks for a while", tag.PendingTaskCount(pendingTaskCount), tag.MaxTaskCount(maxTaskCount))
345345
q.pauseController.Pause(q.options.PollBackoffInterval())
346346
}
@@ -355,7 +355,17 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
355355
// emit a metric indicating that the virtual queue is alive
356356
q.metricsScope.UpdateGauge(metrics.VirtualQueueRunningGauge, 1.0)
357357
sliceToRead := q.sliceToRead.Value.(VirtualSlice)
358-
tasks, err := sliceToRead.GetTasks(q.ctx, q.options.PageSize())
358+
359+
// This logic is to avoid the loop of loading tasks from max virtual queue -> pending task count exceeds critical task count -> unload tasks from max virtual queue
360+
// for non-root virtual queue, we know that maxTaskCout < ciriticalTaskCount
361+
remainingSize := maxTaskCount - pendingTaskCount
362+
if remainingSize <= 0 {
363+
remainingSize = 1
364+
q.logger.Error("unexpected error, virtual queue is not paused when pending task count exceeds max task cout limit", tag.PendingTaskCount(pendingTaskCount), tag.MaxTaskCount(maxTaskCount))
365+
}
366+
pageSize := min(q.options.PageSize(), remainingSize)
367+
q.logger.Debug("get tasks from virtual queue", tag.PendingTaskCount(pendingTaskCount), tag.MaxTaskCount(maxTaskCount), tag.Counter(pageSize))
368+
tasks, err := sliceToRead.GetTasks(q.ctx, pageSize)
359369
if err != nil {
360370
q.logger.Error("Virtual queue failed to get tasks", tag.Error(err))
361371
return

0 commit comments

Comments
 (0)