Skip to content

Commit 9f1991a

Browse files
committed
after review
1 parent 5ea633f commit 9f1991a

File tree

2 files changed

+16
-17
lines changed

2 files changed

+16
-17
lines changed

service/history/queuev2/virtual_queue_manager.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,14 +227,13 @@ func (m *virtualQueueManagerImpl) InsertSingleTask(t task.Task) bool {
227227
m.Lock()
228228
defer m.Unlock()
229229

230-
inserted := false
231230
for _, vq := range m.virtualQueues {
232231
if vq.InsertSingleTask(t) {
233-
inserted = true
232+
return true
234233
}
235234
}
236235

237-
return inserted
236+
return false
238237
}
239238

240239
func (m *virtualQueueManagerImpl) ResetProgress(key persistence.HistoryTaskKey) {

service/history/queuev2/virtual_slice.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -118,19 +118,12 @@ func (s *virtualSliceImpl) Clear() {
118118
}
119119

120120
func (s *virtualSliceImpl) InsertTask(task task.Task) bool {
121-
r := s.state.Range
122-
key := task.GetTaskKey()
123-
124-
if key.Compare(r.InclusiveMinTaskKey) < 0 || key.Compare(r.ExclusiveMaxTaskKey) >= 0 {
125-
return false
126-
}
127-
128-
if !s.state.Predicate.Check(task) {
129-
return false
121+
if s.state.Contains(task) {
122+
s.pendingTaskTracker.AddTask(task)
123+
return true
130124
}
131125

132-
s.pendingTaskTracker.AddTask(task)
133-
return true
126+
return false
134127
}
135128

136129
func (s *virtualSliceImpl) GetTasks(ctx context.Context, pageSize int) ([]task.Task, error) {
@@ -213,6 +206,12 @@ func (s *virtualSliceImpl) UpdateAndGetState() VirtualSliceState {
213206
// we want to cancel all the tasks after the given key and reset the progress to the given key,
214207
// so that in the next poll, we will read the tasks from the DB starting from the given key.
215208
func (s *virtualSliceImpl) ResetProgress(key persistence.HistoryTaskKey) {
209+
210+
// the given key is after the current slice, no need to reset
211+
if key.Compare(s.state.Range.ExclusiveMaxTaskKey) >= 0 {
212+
return
213+
}
214+
216215
taskMap := s.pendingTaskTracker.GetTasks()
217216
for _, task := range taskMap {
218217
if task.GetTaskKey().Compare(key) >= 0 {
@@ -237,17 +236,18 @@ func (s *virtualSliceImpl) ResetProgress(key persistence.HistoryTaskKey) {
237236
for i, progress := range s.progress {
238237
// progress contains sorted non-overlapping ranges. If we found a range that contains the given key, we can reset
239238
// this range's progress to the given key and merge the remaining ranges into it.
240-
if progress.NextTaskKey.Compare(key) > 0 {
239+
if progress.ExclusiveMaxTaskKey.Compare(key) > 0 {
241240
maxTaskKey := s.progress[len(s.progress)-1].Range.ExclusiveMaxTaskKey
242241
s.progress[i] = &GetTaskProgress{
243242
Range: Range{
244-
InclusiveMinTaskKey: key,
243+
InclusiveMinTaskKey: persistence.MinHistoryTaskKey(key, progress.InclusiveMinTaskKey),
245244
ExclusiveMaxTaskKey: maxTaskKey,
246245
},
247246
NextPageToken: nil,
248-
NextTaskKey: key,
247+
NextTaskKey: persistence.MinHistoryTaskKey(key, progress.NextTaskKey),
249248
}
250249

250+
s.state.Range.InclusiveMinTaskKey = persistence.MinHistoryTaskKey(key, s.state.Range.InclusiveMinTaskKey)
251251
s.progress = s.progress[:i+1]
252252
break
253253
}

0 commit comments

Comments
 (0)