Skip to content

Commit 5ea633f

Browse files
committed
After review #2
Signed-off-by: Ignat Tubylov <[email protected]>
1 parent d26d21d commit 5ea633f

File tree

7 files changed

+64
-40
lines changed

7 files changed

+64
-40
lines changed

service/history/queuev2/queue_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func (q *queueBase) processNewTasks() bool {
275275
}
276276

277277
func (q *queueBase) insertSingleTask(task task.Task) bool {
278-
return q.virtualQueueManager.InsertSingleTaskToRootQueue(task)
278+
return q.virtualQueueManager.InsertSingleTask(task)
279279
}
280280

281281
func (q *queueBase) resetProgress(key persistence.HistoryTaskKey) {

service/history/queuev2/virtual_queue.go

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type (
6767
SplitSlices(func(VirtualSlice) (remaining []VirtualSlice, split bool))
6868
// Pause pauses the virtual queue for a while
6969
Pause(time.Duration)
70-
// InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice..
70+
// InsertSingleTask inserts a single task to the virtual queue. Return false if the task's timestamp is out of range of the current queue slice or the task does not satisfy the predicate
7171
InsertSingleTask(task task.Task) bool
7272
// ResetProgress removes the scheduled tasks after the given time
7373
ResetProgress(persistence.HistoryTaskKey)
@@ -398,7 +398,7 @@ func (q *virtualQueueImpl) loadAndSubmitTasks() {
398398

399399
now := q.timeSource.Now()
400400
for _, task := range tasks {
401-
if err := q.submitTask(now, task); err != nil {
401+
if err := q.submitOrRescheduleTask(now, task); err != nil {
402402
select {
403403
case <-q.ctx.Done():
404404
return
@@ -423,32 +423,21 @@ func (q *virtualQueueImpl) InsertSingleTask(task task.Task) bool {
423423
q.Lock()
424424
defer q.Unlock()
425425

426-
taskKey := task.GetTaskKey()
427-
var slice VirtualSlice
428-
429426
for e := q.virtualSlices.Front(); e != nil; e = e.Next() {
430-
s := e.Value.(VirtualSlice)
431-
r := s.GetState().Range
432-
if taskKey.Compare(r.InclusiveMinTaskKey) >= 0 && taskKey.Compare(r.ExclusiveMaxTaskKey) < 0 {
433-
slice = s
434-
break
435-
}
436-
}
437-
438-
if slice == nil {
439-
// the new task is outside of the current range, it will be read from the DB on the next poll
440-
return false
441-
}
442-
443-
slice.InsertTask(task)
444-
q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount())
427+
slice := e.Value.(VirtualSlice)
428+
inserted := slice.InsertTask(task)
429+
if inserted {
430+
q.monitor.SetSlicePendingTaskCount(slice, slice.GetPendingTaskCount())
431+
now := q.timeSource.Now()
432+
if err := q.submitOrRescheduleTask(now, task); err != nil {
433+
q.logger.Error("Error submitting task to virtual queue", tag.Error(err))
434+
}
445435

446-
now := q.timeSource.Now()
447-
if err := q.submitTask(now, task); err != nil {
448-
q.logger.Error("Error submitting task to virtual queue", tag.Error(err))
436+
return true
437+
}
449438
}
450439

451-
return true
440+
return false
452441
}
453442

454443
func (q *virtualQueueImpl) ResetProgress(key persistence.HistoryTaskKey) {
@@ -464,9 +453,11 @@ func (q *virtualQueueImpl) ResetProgress(key persistence.HistoryTaskKey) {
464453
break
465454
}
466455
}
456+
457+
q.resetNextReadSliceLocked()
467458
}
468459

469-
func (q *virtualQueueImpl) submitTask(now time.Time, task task.Task) error {
460+
func (q *virtualQueueImpl) submitOrRescheduleTask(now time.Time, task task.Task) error {
470461
if persistence.IsTaskCorrupted(task) {
471462
q.logger.Error("Virtual queue encountered a corrupted task", tag.Dynamic("task", task))
472463
q.metricsScope.IncCounter(metrics.CorruptedHistoryTaskCounter)

service/history/queuev2/virtual_queue_manager.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ type (
5858
// By default, all new tasks belong to the root queue, so we need to add a new virtual slice to the root queue.
5959
AddNewVirtualSliceToRootQueue(VirtualSlice)
6060
// Insert a single task to the current slice. Return false if the task's timestamp is out of range of the current slice.
61-
InsertSingleTaskToRootQueue(task.Task) bool
61+
InsertSingleTask(task.Task) bool
62+
// ResetProgress resets the progress of the virtual queue to the given key. Pending tasks after the given key are cancelled.
6263
ResetProgress(persistence.HistoryTaskKey)
6364
}
6465

@@ -222,15 +223,18 @@ func (m *virtualQueueManagerImpl) AddNewVirtualSliceToRootQueue(s VirtualSlice)
222223
m.virtualQueues[rootQueueID].Start()
223224
}
224225

225-
func (m *virtualQueueManagerImpl) InsertSingleTaskToRootQueue(t task.Task) bool {
226+
func (m *virtualQueueManagerImpl) InsertSingleTask(t task.Task) bool {
226227
m.Lock()
227228
defer m.Unlock()
228-
if vq, ok := m.virtualQueues[rootQueueID]; ok {
229-
return vq.InsertSingleTask(t)
229+
230+
inserted := false
231+
for _, vq := range m.virtualQueues {
232+
if vq.InsertSingleTask(t) {
233+
inserted = true
234+
}
230235
}
231236

232-
// if a root queue is not created yet, no need to schedule an incoming task, it will be read from the slice
233-
return false
237+
return inserted
234238
}
235239

236240
func (m *virtualQueueManagerImpl) ResetProgress(key persistence.HistoryTaskKey) {

service/history/queuev2/virtual_queue_manager_mock.go

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/queuev2/virtual_slice.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type (
3737
GetState() VirtualSliceState
3838
IsEmpty() bool
3939
GetTasks(context.Context, int) ([]task.Task, error)
40-
InsertTask(task.Task)
40+
InsertTask(task.Task) bool
4141
HasMoreTasks() bool
4242
UpdateAndGetState() VirtualSliceState
4343
GetPendingTaskCount() int
@@ -117,8 +117,20 @@ func (s *virtualSliceImpl) Clear() {
117117
}
118118
}
119119

120-
func (s *virtualSliceImpl) InsertTask(task task.Task) {
120+
func (s *virtualSliceImpl) InsertTask(task task.Task) bool {
121+
r := s.state.Range
122+
key := task.GetTaskKey()
123+
124+
if key.Compare(r.InclusiveMinTaskKey) < 0 || key.Compare(r.ExclusiveMaxTaskKey) >= 0 {
125+
return false
126+
}
127+
128+
if !s.state.Predicate.Check(task) {
129+
return false
130+
}
131+
121132
s.pendingTaskTracker.AddTask(task)
133+
return true
122134
}
123135

124136
func (s *virtualSliceImpl) GetTasks(ctx context.Context, pageSize int) ([]task.Task, error) {
@@ -223,15 +235,21 @@ func (s *virtualSliceImpl) ResetProgress(key persistence.HistoryTaskKey) {
223235
}
224236

225237
for i, progress := range s.progress {
238+
// progress contains sorted non-overlapping ranges. If we found a range that contains the given key, we can reset
239+
// this range's progress to the given key and merge the remaining ranges into it.
226240
if progress.NextTaskKey.Compare(key) > 0 {
241+
maxTaskKey := s.progress[len(s.progress)-1].Range.ExclusiveMaxTaskKey
227242
s.progress[i] = &GetTaskProgress{
228243
Range: Range{
229244
InclusiveMinTaskKey: key,
230-
ExclusiveMaxTaskKey: progress.Range.ExclusiveMaxTaskKey,
245+
ExclusiveMaxTaskKey: maxTaskKey,
231246
},
232247
NextPageToken: nil,
233248
NextTaskKey: key,
234249
}
250+
251+
s.progress = s.progress[:i+1]
252+
break
235253
}
236254
}
237255
}

service/history/queuev2/virtual_slice_mock.go

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/shard/context.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,6 +1363,15 @@ func (s *contextImpl) allocateTimerIDsLocked(
13631363
tag.ValueShardAllocateTimerBeforeRead)
13641364
ts = now.Add(persistence.DBTimestampMinPrecision)
13651365
}
1366+
1367+
if ts.Before(s.shardInfo.TimerAckLevel) {
1368+
s.logger.Warn("New timer generated is less than ack level",
1369+
tag.WorkflowDomainID(domainEntry.GetInfo().ID),
1370+
tag.WorkflowID(workflowID),
1371+
tag.Timestamp(ts))
1372+
ts = s.shardInfo.TimerAckLevel.Add(persistence.DBTimestampMinPrecision)
1373+
}
1374+
13661375
task.SetVisibilityTimestamp(ts)
13671376

13681377
seqNum, err := s.generateTaskIDLocked()

0 commit comments

Comments
 (0)