Skip to content

Commit d700ec8

Browse files
committed
kvserver: record timestamp in the rangeID queue
Epic: none Release note: none
1 parent a369dc3 commit d700ec8

File tree

2 files changed

+23
-17
lines changed

2 files changed

+23
-17
lines changed

pkg/kv/kvserver/scheduler.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,16 @@ type raftScheduler struct {
234234
done sync.WaitGroup
235235
}
236236

237+
type queuedRangeID struct {
238+
rangeID roachpb.RangeID
239+
// queued is the moment in time when the rangeID was added to the queue.
240+
queued crtime.Mono
241+
}
242+
237243
type raftSchedulerShard struct {
238244
syncutil.Mutex
239245
cond *sync.Cond
240-
queue rangeIDQueue[roachpb.RangeID]
246+
queue rangeIDQueue[queuedRangeID]
241247
state map[roachpb.RangeID]raftScheduleState
242248
numWorkers int
243249
maxTicks int64
@@ -365,14 +371,14 @@ func (ss *raftSchedulerShard) worker(
365371

366372
ss.Lock()
367373
for {
368-
var id roachpb.RangeID
374+
var q queuedRangeID
369375
for {
370376
if ss.stopped {
371377
ss.Unlock()
372378
return
373379
}
374380
var ok bool
375-
if id, ok = ss.queue.PopFront(); ok {
381+
if q, ok = ss.queue.PopFront(); ok {
376382
break
377383
}
378384
ss.cond.Wait()
@@ -381,8 +387,8 @@ func (ss *raftSchedulerShard) worker(
381387
// Grab and clear the existing state for the range ID. Note that we leave
382388
// the range ID marked as "queued" so that a concurrent Enqueue* will not
383389
// queue the range ID again.
384-
state := ss.state[id]
385-
ss.state[id] = raftScheduleState{flags: stateQueued}
390+
state := ss.state[q.rangeID]
391+
ss.state[q.rangeID] = raftScheduleState{flags: stateQueued}
386392
ss.Unlock()
387393

388394
if util.RaceEnabled && state.queued == 0 {
@@ -400,7 +406,7 @@ func (ss *raftSchedulerShard) worker(
400406
if state.flags&stateRaftRequest != 0 {
401407
// processRequestQueue returns true if the range should perform ready
402408
// processing. Do not reorder this below the call to processReady.
403-
if processor.processRequestQueue(ctx, id) {
409+
if processor.processRequestQueue(ctx, q.rangeID) {
404410
state.flags |= stateRaftReady
405411
}
406412
}
@@ -413,30 +419,30 @@ func (ss *raftSchedulerShard) worker(
413419
for t := state.ticks; t > 0; t-- {
414420
// processRaftTick returns true if the range should perform ready
415421
// processing. Do not reorder this below the call to processReady.
416-
if processor.processTick(ctx, id) {
422+
if processor.processTick(ctx, q.rangeID) {
417423
state.flags |= stateRaftReady
418424
}
419425
}
420426
}
421427
if state.flags&stateRACv2PiggybackedAdmitted != 0 {
422-
processor.processRACv2PiggybackedAdmitted(ctx, id)
428+
processor.processRACv2PiggybackedAdmitted(ctx, q.rangeID)
423429
}
424430
if state.flags&stateRaftReady != 0 {
425-
processor.processReady(id)
431+
processor.processReady(q.rangeID)
426432
}
427433
if state.flags&stateRACv2RangeController != 0 {
428-
processor.processRACv2RangeController(ctx, id)
434+
processor.processRACv2RangeController(ctx, q.rangeID)
429435
}
430436
if buildutil.CrdbTestBuild && state.flags&stateTestIntercept != 0 {
431-
processor.(testProcessorI).processTestEvent(id, ss, state)
437+
processor.(testProcessorI).processTestEvent(q.rangeID, ss, state)
432438
}
433439

434440
ss.Lock()
435-
state = ss.state[id]
441+
state = ss.state[q.rangeID]
436442
if state.flags == stateQueued {
437443
// No further processing required by the range ID, clear it from the
438444
// state map.
439-
delete(ss.state, id)
445+
delete(ss.state, q.rangeID)
440446
} else {
441447
// There was a concurrent call to one of the Enqueue* methods. Queue
442448
// the range ID for further processing.
@@ -465,8 +471,8 @@ func (ss *raftSchedulerShard) worker(
465471
// now). We do not want the scheduler latency to pick up the time spent
466472
// handling this replica.
467473
state.queued = crtime.NowMono()
468-
ss.state[id] = state
469-
ss.queue.Push(id)
474+
ss.state[q.rangeID] = state
475+
ss.queue.Push(queuedRangeID{rangeID: q.rangeID, queued: state.queued})
470476
}
471477
}
472478
}
@@ -502,7 +508,7 @@ func (ss *raftSchedulerShard) enqueue1Locked(
502508
log.Fatalf(context.Background(), "raftSchedulerShard.enqueue1Locked called with non-zero queued: %+v", newState)
503509
}
504510
newState.queued = now
505-
ss.queue.Push(id)
511+
ss.queue.Push(queuedRangeID{rangeID: id, queued: now})
506512
}
507513
ss.state[id] = newState
508514
return queued

pkg/kv/kvserver/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ func runSchedulerEnqueueRaftTicks(
687687
// Flush the queue. We haven't started any workers that pull from it, so we
688688
// just clear it out.
689689
for _, shard := range s.shards {
690-
shard.queue = rangeIDQueue[roachpb.RangeID]{}
690+
shard.queue = rangeIDQueue[queuedRangeID]{}
691691
}
692692
}
693693
ids.Close()

0 commit comments

Comments
 (0)