@@ -26,6 +26,7 @@ import (
26
26
"github.com/cockroachdb/cockroach/pkg/util/stop"
27
27
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
28
28
"github.com/cockroachdb/errors"
29
+ "github.com/stretchr/testify/assert"
29
30
"github.com/stretchr/testify/require"
30
31
)
31
32
@@ -135,15 +136,19 @@ type testProcessor struct {
135
136
rac2RangeController map [roachpb.RangeID ]int
136
137
ready func (roachpb.RangeID )
137
138
}
139
+ testEventCh chan func (roachpb.RangeID , * raftSchedulerShard , raftScheduleState )
138
140
}
139
141
142
+ var _ testProcessorI = (* testProcessor )(nil )
143
+
140
144
func newTestProcessor () * testProcessor {
141
145
p := & testProcessor {}
142
146
p .mu .raftReady = make (map [roachpb.RangeID ]int )
143
147
p .mu .raftRequest = make (map [roachpb.RangeID ]int )
144
148
p .mu .raftTick = make (map [roachpb.RangeID ]int )
145
149
p .mu .rac2PiggybackedAdmitted = make (map [roachpb.RangeID ]int )
146
150
p .mu .rac2RangeController = make (map [roachpb.RangeID ]int )
151
+ p .testEventCh = make (chan func (roachpb.RangeID , * raftSchedulerShard , raftScheduleState ), 10 )
147
152
return p
148
153
}
149
154
@@ -192,6 +197,16 @@ func (p *testProcessor) processRACv2RangeController(_ context.Context, rangeID r
192
197
p .mu .Unlock ()
193
198
}
194
199
200
+ func (p * testProcessor ) processTestEvent (
201
+ id roachpb.RangeID , ss * raftSchedulerShard , ev raftScheduleState ,
202
+ ) {
203
+ select {
204
+ case fn := <- p .testEventCh :
205
+ fn (id , ss , ev )
206
+ default :
207
+ }
208
+ }
209
+
195
210
func (p * testProcessor ) readyCount (rangeID roachpb.RangeID ) int {
196
211
p .mu .Lock ()
197
212
defer p .mu .Unlock ()
@@ -352,6 +367,74 @@ func TestSchedulerBuffering(t *testing.T) {
352
367
}
353
368
}
354
369
370
+ func TestSchedulerEnqueueWhileProcessing (t * testing.T ) {
371
+ defer leaktest .AfterTest (t )()
372
+ defer log .Scope (t ).Close (t )
373
+ skip .UnderNonTestBuild (t ) // stateTestIntercept needs CrdbTestBuild
374
+
375
+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
376
+ defer cancel ()
377
+
378
+ stopper := stop .NewStopper ()
379
+ defer stopper .Stop (ctx )
380
+
381
+ m := newStoreMetrics (metric .TestSampleInterval )
382
+ p := newTestProcessor ()
383
+ s := newRaftScheduler (log .MakeTestingAmbientContext (stopper .Tracer ()), m , p , 1 , 1 , 1 , 5 )
384
+ s .Start (stopper )
385
+
386
+ done := make (chan struct {})
387
+
388
+ // Inject code into the "middle" of event processing - after having consumed
389
+ // from the queue, but before re-checking of overlapping enqueue calls.
390
+ p .testEventCh <- func (id roachpb.RangeID , ss * raftSchedulerShard , ev raftScheduleState ) {
391
+ // First call into this method.
392
+ //
393
+ // The event calling into us must have `ev.queued` set; it was set when
394
+ // enqueuing.
395
+ assert .NotZero (t , ev .queued )
396
+
397
+ // Even though our event is currently being processed, there is a queued
398
+ // and otherwise blank event in the scheduler state (which is how we have
399
+ // concurrent enqueue calls coalesce onto the still pending processing of
400
+ // the current event).
401
+ ss .Lock ()
402
+ statePre := ss .state [id ]
403
+ ss .Unlock ()
404
+
405
+ assert .Zero (t , statePre .queued )
406
+ assert .Equal (t , stateQueued , statePre .flags )
407
+
408
+ // Simulate a concurrent actor that enqueues the same range again.
409
+ // This will not trigger the interceptor again, since the done channel
410
+ // is closed by that time.
411
+ s .enqueue1 (stateTestIntercept , 1 )
412
+
413
+ // Seeing that there is an existing "queued" event, the enqueue call below
414
+ // should not populate `queued`. Instead, this will be the job of our
415
+ // caller when it *actually* pushes into the queue again after fully
416
+ // having handled `ev`.
417
+ ss .Lock ()
418
+ statePost := ss .state [id ]
419
+ ss .Unlock ()
420
+
421
+ assert .Zero (t , statePost .queued )
422
+ assert .Equal (t , stateQueued | stateTestIntercept , statePost .flags )
423
+ close (done )
424
+ }
425
+ p .testEventCh <- func (id roachpb.RangeID , shard * raftSchedulerShard , ev raftScheduleState ) {
426
+ // Second call into this method, i.e. the overlappingly-enqeued event is
427
+ // being processed. Check that `queued` is now set.
428
+ assert .NotZero (t , ev .queued )
429
+ }
430
+ s .enqueue1 (stateTestIntercept , 1 ) // will become 'ev' in the intercept
431
+ select {
432
+ case <- done :
433
+ case <- ctx .Done ():
434
+ t .Fatal (ctx .Err ())
435
+ }
436
+ }
437
+
355
438
func TestNewSchedulerShards (t * testing.T ) {
356
439
defer leaktest .AfterTest (t )()
357
440
defer log .Scope (t ).Close (t )
0 commit comments