Skip to content

Commit d26d21d

Browse files
committed
after review
Signed-off-by: Ignat Tubylov <[email protected]>
1 parent 06e9553 commit d26d21d

File tree

10 files changed

+49
-44
lines changed

10 files changed

+49
-44
lines changed

config/development.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,4 +172,4 @@ shardDistribution:
172172
prefix: "store"
173173
process:
174174
period: 1s
175-
heartbeatTTL: 2s
175+
heartbeatTTL: 2s

service/history/common/type.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ type (
3838
PersistenceError bool
3939

4040
// if true, the task will be scheduled in memory for the current execution, otherwise
41-
// it will only be scheduled after the next DB scan
41+
// it will only be scheduled after the next DB scan. This notification is sometimes passed with a fake
42+
// timer with the sole purpose of resetting the next scheduled DB read, that's why sometimes we want to
43+
// avoid scheduling the task in memory.
4244
ScheduleInMemory bool
4345
}
4446
)

service/history/queuev2/queue_base.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,8 @@ func (q *queueBase) insertSingleTask(task task.Task) bool {
278278
return q.virtualQueueManager.InsertSingleTaskToRootQueue(task)
279279
}
280280

281-
func (q *queueBase) removeScheduledTasksAfter(key persistence.HistoryTaskKey) {
282-
q.virtualQueueManager.RemoveScheduledTasksAfter(key)
281+
func (q *queueBase) resetProgress(key persistence.HistoryTaskKey) {
282+
q.virtualQueueManager.ResetProgress(key)
283283
}
284284

285285
func (q *queueBase) updateQueueState(ctx context.Context) {

service/history/queuev2/queue_scheduled.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (q *scheduledQueue) NotifyNewTask(clusterName string, info *hcommon.NotifyT
175175
}
176176

177177
if !nextReadTime.IsZero() {
178-
q.base.removeScheduledTasksAfter(persistence.NewHistoryTaskKey(nextReadTime, 0))
178+
q.base.resetProgress(persistence.NewHistoryTaskKey(nextReadTime, 0))
179179
q.notify(nextReadTime)
180180
}
181181

service/history/queuev2/virtual_queue.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ type (
6969
Pause(time.Duration)
7070
// InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice..
7171
InsertSingleTask(task task.Task) bool
72-
// RemoveScheduledTasksAfter removes the scheduled tasks after the given time
73-
RemoveScheduledTasksAfter(persistence.HistoryTaskKey)
72+
// ResetProgress removes the scheduled tasks after the given time
73+
ResetProgress(persistence.HistoryTaskKey)
7474
}
7575

7676
VirtualQueueOptions struct {
@@ -445,21 +445,24 @@ func (q *virtualQueueImpl) InsertSingleTask(task task.Task) bool {
445445

446446
now := q.timeSource.Now()
447447
if err := q.submitTask(now, task); err != nil {
448-
q.logger.Error("Virtual queue failed to submit task", tag.Error(err))
449-
return false
448+
q.logger.Error("Error submitting task to virtual queue", tag.Error(err))
450449
}
451450

452451
return true
453452
}
454453

455-
func (q *virtualQueueImpl) RemoveScheduledTasksAfter(key persistence.HistoryTaskKey) {
454+
func (q *virtualQueueImpl) ResetProgress(key persistence.HistoryTaskKey) {
456455
q.Lock()
457456
defer q.Unlock()
458457

459458
for e := q.virtualSlices.Front(); e != nil; e = e.Next() {
460459
slice := e.Value.(VirtualSlice)
461-
slice.CancelTasksAfter(key)
460+
slice.ResetProgress(key)
462461
q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount())
462+
463+
if e == q.sliceToRead {
464+
break
465+
}
463466
}
464467
}
465468

@@ -482,16 +485,13 @@ func (q *virtualQueueImpl) submitTask(now time.Time, task task.Task) error {
482485
q.metricsScope.RecordHistogramDuration(metrics.TaskEnqueueToFetchLatency, now.Sub(task.GetVisibilityTimestamp()))
483486
task.SetInitialSubmitTime(now)
484487
submitted, err := q.processor.TrySubmit(task)
485-
if err != nil {
486-
return err
487-
}
488488

489489
if !submitted {
490490
q.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
491491
q.rescheduler.RescheduleTask(task, q.timeSource.Now().Add(taskSchedulerThrottleBackoffInterval))
492492
}
493493

494-
return nil
494+
return err
495495
}
496496

497497
func (q *virtualQueueImpl) resetNextReadSliceLocked() {

service/history/queuev2/virtual_queue_manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ type (
5959
AddNewVirtualSliceToRootQueue(VirtualSlice)
6060
// Insert a single task to the current slice. Return false if the task's timestamp is out of range of the current slice.
6161
InsertSingleTaskToRootQueue(task.Task) bool
62-
RemoveScheduledTasksAfter(persistence.HistoryTaskKey)
62+
ResetProgress(persistence.HistoryTaskKey)
6363
}
6464

6565
virtualQueueManagerImpl struct {
@@ -233,11 +233,11 @@ func (m *virtualQueueManagerImpl) InsertSingleTaskToRootQueue(t task.Task) bool
233233
return false
234234
}
235235

236-
func (m *virtualQueueManagerImpl) RemoveScheduledTasksAfter(key persistence.HistoryTaskKey) {
236+
func (m *virtualQueueManagerImpl) ResetProgress(key persistence.HistoryTaskKey) {
237237
m.Lock()
238238
defer m.Unlock()
239239
for _, vq := range m.virtualQueues {
240-
vq.RemoveScheduledTasksAfter(key)
240+
vq.ResetProgress(key)
241241
}
242242
}
243243

service/history/queuev2/virtual_queue_manager_mock.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/queuev2/virtual_queue_mock.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/queuev2/virtual_slice.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type (
4343
GetPendingTaskCount() int
4444
Clear()
4545
PendingTaskStats() PendingTaskStats
46-
CancelTasksAfter(key persistence.HistoryTaskKey)
46+
ResetProgress(key persistence.HistoryTaskKey)
4747

4848
TrySplitByTaskKey(persistence.HistoryTaskKey) (VirtualSlice, VirtualSlice, bool)
4949
TrySplitByPredicate(Predicate) (VirtualSlice, VirtualSlice, bool)
@@ -197,7 +197,10 @@ func (s *virtualSliceImpl) UpdateAndGetState() VirtualSliceState {
197197
return s.state
198198
}
199199

200-
func (s *virtualSliceImpl) CancelTasksAfter(key persistence.HistoryTaskKey) {
200+
// this function is used when we are not sure if our in-memory state after the given key is correct,
201+
// we want to cancel all the tasks after the given key and reset the progress to the given key,
202+
// so that in the next poll, we will read the tasks from the DB starting from the given key.
203+
func (s *virtualSliceImpl) ResetProgress(key persistence.HistoryTaskKey) {
201204
taskMap := s.pendingTaskTracker.GetTasks()
202205
for _, task := range taskMap {
203206
if task.GetTaskKey().Compare(key) >= 0 {

service/history/queuev2/virtual_slice_mock.go

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)