Skip to content

Commit 49ba20d

Browse files
committed
kvserver: rm enqueue timestamp from raftScheduleState
Epic: none Release note: none
1 parent d700ec8 commit 49ba20d

File tree

2 files changed

+22
-48
lines changed

2 files changed

+22
-48
lines changed

pkg/kv/kvserver/scheduler.go

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
const rangeIDChunkSize = 1000
2525

2626
type testProcessorI interface {
27-
processTestEvent(roachpb.RangeID, *raftSchedulerShard, raftScheduleState)
27+
processTestEvent(queuedRangeID, *raftSchedulerShard, raftScheduleState)
2828
}
2929

3030
type rangeIDChunk[T any] struct {
@@ -146,10 +146,6 @@ const (
146146

147147
type raftScheduleState struct {
148148
flags raftScheduleFlags
149-
// When this event was queued. This is set if and only if the item is present
150-
// in the raft scheduler shard's queue.
151-
queued crtime.Mono
152-
153149
// The number of ticks queued. Usually it's 0 or 1, but may go above if the
154150
// scheduling or processing is slow. It is limited by raftScheduler.maxTicks,
155151
// so that the cost of processing all the ticks doesn't grow uncontrollably.
@@ -361,14 +357,12 @@ func (s *raftScheduler) PriorityIDs() []roachpb.RangeID {
361357
func (ss *raftSchedulerShard) worker(
362358
ctx context.Context, processor raftProcessor, metrics *StoreMetrics,
363359
) {
364-
365360
// We use a sync.Cond for worker notification instead of a buffered
366361
// channel. Buffered channels have internal overhead for maintaining the
367362
// buffer even when the elements are empty. And the buffer isn't necessary as
368363
// the raftScheduler work is already buffered on the internal queue. Lastly,
369364
// signaling a sync.Cond is significantly faster than selecting and sending
370365
// on a buffered channel.
371-
372366
ss.Lock()
373367
for {
374368
var q queuedRangeID
@@ -391,13 +385,8 @@ func (ss *raftSchedulerShard) worker(
391385
ss.state[q.rangeID] = raftScheduleState{flags: stateQueued}
392386
ss.Unlock()
393387

394-
if util.RaceEnabled && state.queued == 0 {
395-
// See state.queued for the invariant being checked here.
396-
log.Fatalf(ctx, "raftSchedulerShard.worker called with zero queued: %+v", state)
397-
}
398388
// Record the scheduling latency for the range.
399-
lat := state.queued.Elapsed()
400-
metrics.RaftSchedulerLatency.RecordValue(int64(lat))
389+
metrics.RaftSchedulerLatency.RecordValue(int64(q.queued.Elapsed()))
401390

402391
// Process requests first. This avoids a scenario where a tick and a
403392
// "quiesce" message are processed in the same iteration and intervening
@@ -434,7 +423,7 @@ func (ss *raftSchedulerShard) worker(
434423
processor.processRACv2RangeController(ctx, q.rangeID)
435424
}
436425
if buildutil.CrdbTestBuild && state.flags&stateTestIntercept != 0 {
437-
processor.(testProcessorI).processTestEvent(q.rangeID, ss, state)
426+
processor.(testProcessorI).processTestEvent(q, ss, state)
438427
}
439428

440429
ss.Lock()
@@ -466,13 +455,10 @@ func (ss *raftSchedulerShard) worker(
466455
// iteration and the next iteration, so no change to num_signals
467456
// is needed.
468457
//
469-
// NB: we overwrite state.begin unconditionally since the next processing
470-
// can not possibly happen before the current processing is done (i.e.
471-
// now). We do not want the scheduler latency to pick up the time spent
472-
// handling this replica.
473-
state.queued = crtime.NowMono()
474-
ss.state[q.rangeID] = state
475-
ss.queue.Push(queuedRangeID{rangeID: q.rangeID, queued: state.queued})
458+
// NB: this is a new insertion into the queue, so we set a new timestamp.
459+
// We do not want the scheduler latency to pick up the time spent handling
460+
// this replica.
461+
ss.queue.Push(queuedRangeID{rangeID: q.rangeID, queued: crtime.NowMono()})
476462
}
477463
}
478464
}
@@ -503,11 +489,6 @@ func (ss *raftSchedulerShard) enqueue1Locked(
503489
if newState.flags&stateQueued == 0 {
504490
newState.flags |= stateQueued
505491
queued++
506-
if util.RaceEnabled && newState.queued != 0 {
507-
// See newState.queued for the invariant being checked here.
508-
log.Fatalf(context.Background(), "raftSchedulerShard.enqueue1Locked called with non-zero queued: %+v", newState)
509-
}
510-
newState.queued = now
511492
ss.queue.Push(queuedRangeID{rangeID: id, queued: now})
512493
}
513494
ss.state[id] = newState

pkg/kv/kvserver/scheduler_test.go

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ type testProcessor struct {
136136
rac2RangeController map[roachpb.RangeID]int
137137
ready func(roachpb.RangeID)
138138
}
139-
testEventCh chan func(roachpb.RangeID, *raftSchedulerShard, raftScheduleState)
139+
testEventCh chan func(queuedRangeID, *raftSchedulerShard, raftScheduleState)
140140
}
141141

142142
var _ testProcessorI = (*testProcessor)(nil)
@@ -148,7 +148,7 @@ func newTestProcessor() *testProcessor {
148148
p.mu.raftTick = make(map[roachpb.RangeID]int)
149149
p.mu.rac2PiggybackedAdmitted = make(map[roachpb.RangeID]int)
150150
p.mu.rac2RangeController = make(map[roachpb.RangeID]int)
151-
p.testEventCh = make(chan func(roachpb.RangeID, *raftSchedulerShard, raftScheduleState), 10)
151+
p.testEventCh = make(chan func(queuedRangeID, *raftSchedulerShard, raftScheduleState), 10)
152152
return p
153153
}
154154

@@ -198,11 +198,11 @@ func (p *testProcessor) processRACv2RangeController(_ context.Context, rangeID r
198198
}
199199

200200
func (p *testProcessor) processTestEvent(
201-
id roachpb.RangeID, ss *raftSchedulerShard, ev raftScheduleState,
201+
q queuedRangeID, ss *raftSchedulerShard, ev raftScheduleState,
202202
) {
203203
select {
204204
case fn := <-p.testEventCh:
205-
fn(id, ss, ev)
205+
fn(q, ss, ev)
206206
default:
207207
}
208208
}
@@ -387,45 +387,38 @@ func TestSchedulerEnqueueWhileProcessing(t *testing.T) {
387387

388388
// Inject code into the "middle" of event processing - after having consumed
389389
// from the queue, but before re-checking of overlapping enqueue calls.
390-
p.testEventCh <- func(id roachpb.RangeID, ss *raftSchedulerShard, ev raftScheduleState) {
391-
// First call into this method.
392-
//
393-
// The event calling into us must have `ev.queued` set; it was set when
394-
// enqueuing.
395-
assert.NotZero(t, ev.queued)
390+
p.testEventCh <- func(q queuedRangeID, ss *raftSchedulerShard, ev raftScheduleState) {
391+
// First call into this method. The `queued` timestamp must be set.
392+
assert.NotZero(t, q.queued)
396393

397394
// Even though our event is currently being processed, there is a queued
398395
// and otherwise blank event in the scheduler state (which is how we have
399396
// concurrent enqueue calls coalesce onto the still pending processing of
400397
// the current event).
401398
ss.Lock()
402-
statePre := ss.state[id]
399+
statePre := ss.state[q.rangeID]
403400
ss.Unlock()
404-
405-
assert.Zero(t, statePre.queued)
406401
assert.Equal(t, stateQueued, statePre.flags)
407402

408403
// Simulate a concurrent actor that enqueues the same range again.
409404
// This will not trigger the interceptor again, since the done channel
410405
// is closed by that time.
411406
s.enqueue1(stateTestIntercept, 1)
412407

413-
// Seeing that there is an existing "queued" event, the enqueue call below
414-
// should not populate `queued`. Instead, this will be the job of our
415-
// caller when it *actually* pushes into the queue again after fully
416-
// having handled `ev`.
408+
// Seeing that there is an existing "queued" event, the enqueue call does
409+
// not enqueue the rangeID again. It will be done after having handled `ev`.
417410
ss.Lock()
418-
statePost := ss.state[id]
411+
statePost := ss.state[q.rangeID]
419412
ss.Unlock()
420413

421-
assert.Zero(t, statePost.queued)
422414
assert.Equal(t, stateQueued|stateTestIntercept, statePost.flags)
423415
close(done)
424416
}
425-
p.testEventCh <- func(id roachpb.RangeID, shard *raftSchedulerShard, ev raftScheduleState) {
417+
p.testEventCh <- func(q queuedRangeID, shard *raftSchedulerShard, ev raftScheduleState) {
426418
// Second call into this method, i.e. the overlappingly-enqeued event is
427-
// being processed. Check that `queued` is now set.
428-
assert.NotZero(t, ev.queued)
419+
// being processed. Check that `queued` timestamp is set.
420+
assert.NotZero(t, q.queued)
421+
assert.Equal(t, stateQueued|stateTestIntercept, ev.flags)
429422
}
430423
s.enqueue1(stateTestIntercept, 1) // will become 'ev' in the intercept
431424
select {

0 commit comments

Comments
 (0)