@@ -26,6 +26,7 @@ import (
26
26
"github.com/cockroachdb/cockroach/pkg/util/stop"
27
27
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
28
28
"github.com/cockroachdb/errors"
29
+ "github.com/stretchr/testify/assert"
29
30
"github.com/stretchr/testify/require"
30
31
)
31
32
@@ -135,15 +136,19 @@ type testProcessor struct {
135
136
rac2RangeController map [roachpb.RangeID ]int
136
137
ready func (roachpb.RangeID )
137
138
}
139
+ testEventCh chan func (roachpb.RangeID , * raftSchedulerShard , raftScheduleState )
138
140
}
139
141
142
+ var _ testProcessorI = (* testProcessor )(nil )
143
+
140
144
func newTestProcessor () * testProcessor {
141
145
p := & testProcessor {}
142
146
p .mu .raftReady = make (map [roachpb.RangeID ]int )
143
147
p .mu .raftRequest = make (map [roachpb.RangeID ]int )
144
148
p .mu .raftTick = make (map [roachpb.RangeID ]int )
145
149
p .mu .rac2PiggybackedAdmitted = make (map [roachpb.RangeID ]int )
146
150
p .mu .rac2RangeController = make (map [roachpb.RangeID ]int )
151
+ p .testEventCh = make (chan func (roachpb.RangeID , * raftSchedulerShard , raftScheduleState ), 10 )
147
152
return p
148
153
}
149
154
@@ -192,6 +197,16 @@ func (p *testProcessor) processRACv2RangeController(_ context.Context, rangeID r
192
197
p .mu .Unlock ()
193
198
}
194
199
200
+ func (p * testProcessor ) processTestEvent (
201
+ ctx context.Context , id roachpb.RangeID , ss * raftSchedulerShard , ev raftScheduleState ,
202
+ ) {
203
+ select {
204
+ case fn := <- p .testEventCh :
205
+ fn (id , ss , ev )
206
+ default :
207
+ }
208
+ }
209
+
195
210
func (p * testProcessor ) readyCount (rangeID roachpb.RangeID ) int {
196
211
p .mu .Lock ()
197
212
defer p .mu .Unlock ()
@@ -352,6 +367,75 @@ func TestSchedulerBuffering(t *testing.T) {
352
367
}
353
368
}
354
369
370
+ func TestSchedulerEnqueueWhileProcessing (t * testing.T ) {
371
+ defer leaktest .AfterTest (t )()
372
+ defer log .Scope (t ).Close (t )
373
+ skip .UnderNonTestBuild (t ) // stateTestIntercept needs crdb test build
374
+
375
+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
376
+ defer cancel ()
377
+
378
+ stopper := stop .NewStopper ()
379
+ defer stopper .Stop (ctx )
380
+
381
+ m := newStoreMetrics (metric .TestSampleInterval )
382
+ p := newTestProcessor ()
383
+ s := newRaftScheduler (log .MakeTestingAmbientContext (stopper .Tracer ()), m , p , 1 , 1 , 1 , 5 )
384
+ s .Start (stopper )
385
+
386
+ done := make (chan struct {})
387
+
388
+ // Inject code into the "middle" of event processing - after having consumed
389
+ // from the queue, but before re-checking of overlapping enqueue calls.
390
+ p .testEventCh <- func (id roachpb.RangeID , ss * raftSchedulerShard , ev raftScheduleState ) {
391
+ // First call into this method.
392
+ //
393
+ // The event calling into us must have `ev.begin` set; it was set when
394
+ // enqueuing.
395
+ assert .NotZero (t , ev .begin )
396
+
397
+ // Even though our event is currently being processed, there is a queued
398
+ // and otherwise blank event in the scheduler state (which is how we have
399
+ // concurrent enqueue calls coalesce onto the still pending processing of
400
+ // the current event).
401
+ ss .Lock ()
402
+ statePre := ss .state [id ]
403
+ ss .Unlock ()
404
+
405
+ assert .Zero (t , statePre .begin )
406
+ assert .Equal (t , stateQueued , statePre .flags )
407
+
408
+ // Simulate a concurrent actor that enqueues the same range again.
409
+ // This will not trigger the interceptor again, since the done channel
410
+ // is closed by that time.
411
+ s .enqueue1 (stateTestIntercept , 1 )
412
+
413
+ // Seeing that there is an existing "queued" event, the enqueue call below
414
+ // should not populate `begin`. Instead, this will be the job of our
415
+ // caller when it *actually* pushes into the queue again after fully
416
+ // having handled `ev`.
417
+ ss .Lock ()
418
+ statePost := ss .state [id ]
419
+ ss .Unlock ()
420
+
421
+ // TODO(tbg): enable in follow-up commit.
422
+ // assert.Zero(t, statePost.begin)
423
+ assert .Equal (t , stateQueued | stateTestIntercept , statePost .flags )
424
+ close (done )
425
+ }
426
+ p .testEventCh <- func (id roachpb.RangeID , shard * raftSchedulerShard , ev raftScheduleState ) {
427
+ // Second call into this method, i.e. the overlappingly-enqeued event is
428
+ // being processed. Check that `begin` is now set.
429
+ assert .NotZero (t , ev .begin )
430
+ }
431
+ s .enqueue1 (stateTestIntercept , 1 ) // will become 'ev' in the intercept
432
+ select {
433
+ case <- done :
434
+ case <- ctx .Done ():
435
+ t .Fatal (ctx .Err ())
436
+ }
437
+ }
438
+
355
439
func TestNewSchedulerShards (t * testing.T ) {
356
440
defer leaktest .AfterTest (t )()
357
441
defer log .Scope (t ).Close (t )
0 commit comments