Skip to content

Commit 2201c4f

Browse files
committed
allocatorimpl: add invariants on priority to base queue tests
This commit adds additional invariants to verify the correctness of priority plumbing for range items in base queue tests.
1 parent b744cfc commit 2201c4f

File tree

4 files changed

+61
-5
lines changed

4 files changed

+61
-5
lines changed

pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ go_library(
2828
"//pkg/settings",
2929
"//pkg/settings/cluster",
3030
"//pkg/util/admission/admissionpb",
31+
"//pkg/util/buildutil",
3132
"//pkg/util/log",
3233
"//pkg/util/metric",
3334
"//pkg/util/stop",

pkg/kv/kvserver/queue.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,19 +1092,24 @@ func IsPurgatoryError(err error) (PurgatoryError, bool) {
10921092
}
10931093

10941094
// assertInvariants codifies the guarantees upheld by the data structures in the
1095-
// base queue. In summary, a replica is one of:
1095+
// base queue.
1096+
// 1. In summary, a replica is one of:
10961097
// - "queued" and in mu.replicas and mu.priorityQ
10971098
// - "processing" and only in mu.replicas
10981099
// - "purgatory" and in mu.replicas and mu.purgatory
1100+
// 2. For every item in bq.mu.priorityQ.sl, bq.mu.purgatory, and bq.mu.replicas,
1101+
// assertOnReplicaItem callback is called with the item. Note that we expect
1102+
// items in priorityQ and purgatory to be in replicas.
10991103
//
11001104
// Note that in particular, nothing is ever in both mu.priorityQ and
11011105
// mu.purgatory.
1102-
func (bq *baseQueue) assertInvariants() {
1106+
func (bq *baseQueue) assertInvariants(assertOnReplicaItem func(item *replicaItem)) {
11031107
bq.mu.Lock()
11041108
defer bq.mu.Unlock()
11051109

11061110
ctx := bq.AnnotateCtx(context.Background())
11071111
for _, item := range bq.mu.priorityQ.sl {
1112+
assertOnReplicaItem(item)
11081113
if item.processing {
11091114
log.Fatalf(ctx, "processing item found in prioQ: %v", item)
11101115
}
@@ -1117,6 +1122,7 @@ func (bq *baseQueue) assertInvariants() {
11171122
}
11181123
for rangeID := range bq.mu.purgatory {
11191124
item, inReplicas := bq.mu.replicas[rangeID]
1125+
assertOnReplicaItem(item)
11201126
if !inReplicas {
11211127
log.Fatalf(ctx, "item found in purg but not in mu.replicas: %v", item)
11221128
}
@@ -1131,6 +1137,7 @@ func (bq *baseQueue) assertInvariants() {
11311137
// that there aren't any non-processing replicas *only* in bq.mu.replicas.
11321138
var nNotProcessing int
11331139
for _, item := range bq.mu.replicas {
1140+
assertOnReplicaItem(item)
11341141
if !item.processing {
11351142
nNotProcessing++
11361143
}

pkg/kv/kvserver/queue_concurrency_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func TestBaseQueueConcurrent(t *testing.T) {
112112
})
113113
}
114114
g.Go(func() error {
115-
bq.assertInvariants()
115+
bq.assertInvariants(func(item *replicaItem) {})
116116
return nil
117117
})
118118
}

pkg/kv/kvserver/queue_test.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,11 @@ func TestBaseQueueAddUpdateAndRemove(t *testing.T) {
222222
} else {
223223
bq.finishProcessingReplica(ctx, stopper, r2, nil)
224224
}
225+
bq.assertInvariants(func(item *replicaItem) {
226+
replica, err := bq.getReplica(item.rangeID)
227+
require.NoError(t, err)
228+
require.Equal(t, priorityMap[replica.(*Replica)], item.priority)
229+
})
225230
if v := bq.pending.Value(); v != 1 {
226231
t.Errorf("expected 1 pending replicas; got %d", v)
227232
}
@@ -294,6 +299,11 @@ func TestBaseQueueAddUpdateAndRemove(t *testing.T) {
294299
if r, _ := bq.pop(); r != nil {
295300
t.Errorf("expected empty queue; got %v", r)
296301
}
302+
bq.assertInvariants(func(item *replicaItem) {
303+
replica, err := bq.getReplica(item.rangeID)
304+
require.NoError(t, err)
305+
require.Equal(t, priorityMap[replica.(*Replica)], item.priority)
306+
})
297307

298308
// Try removing a replica.
299309
bq.maybeAdd(ctx, r1, hlc.ClockTimestamp{})
@@ -313,6 +323,11 @@ func TestBaseQueueAddUpdateAndRemove(t *testing.T) {
313323
if v := bq.pending.Value(); v != 0 {
314324
t.Errorf("expected 0 pending replicas; got %d", v)
315325
}
326+
bq.assertInvariants(func(item *replicaItem) {
327+
replica, err := bq.getReplica(item.rangeID)
328+
require.NoError(t, err)
329+
require.Equal(t, priorityMap[replica.(*Replica)], item.priority)
330+
})
316331
}
317332

318333
// TestBaseQueueSamePriorityFIFO verifies that if multiple items are queued at
@@ -538,19 +553,27 @@ func TestBaseQueueAddRemove(t *testing.T) {
538553
t.Fatal(err)
539554
}
540555

556+
const testPriority = 1.0
541557
testQueue := &testQueueImpl{
542558
blocker: make(chan struct{}, 1),
543559
shouldQueueFn: func(now hlc.ClockTimestamp, r *Replica) (shouldQueue bool, priority float64) {
544560
shouldQueue = true
545-
priority = 1.0
561+
priority = testPriority
546562
return
547563
},
548564
}
549565
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: 2})
550566
bq.Start(stopper)
551567

552568
bq.maybeAdd(ctx, r, hlc.ClockTimestamp{})
569+
bq.assertInvariants(func(item *replicaItem) {
570+
require.Equal(t, testPriority, item.priority)
571+
})
572+
553573
bq.MaybeRemove(r.RangeID)
574+
bq.assertInvariants(func(item *replicaItem) {
575+
require.Equal(t, testPriority, item.priority)
576+
})
554577

555578
// Wake the queue
556579
close(testQueue.blocker)
@@ -837,10 +860,19 @@ func TestBaseQueuePurgatory(t *testing.T) {
837860
bq.maybeAdd(context.Background(), r, hlc.ClockTimestamp{})
838861
}
839862

863+
// Make sure priority is preserved during processing.
864+
bq.assertInvariants(func(item *replicaItem) {
865+
require.Equal(t, float64(item.rangeID), item.priority)
866+
})
867+
840868
testutils.SucceedsSoon(t, func() error {
841869
if pc := testQueue.getProcessed(); pc != replicaCount {
842870
return errors.Errorf("expected %d processed replicas; got %d", replicaCount, pc)
843871
}
872+
// Make sure priorities are preserved with the purgatory queue.
873+
bq.assertInvariants(func(item *replicaItem) {
874+
require.Equal(t, float64(item.rangeID), item.priority)
875+
})
844876
// We have to loop checking the following conditions because the increment
845877
// of testQueue.processed does not happen atomically with the replica being
846878
// placed in purgatory.
@@ -852,6 +884,9 @@ func TestBaseQueuePurgatory(t *testing.T) {
852884
if l := bq.Length(); l != 0 {
853885
return errors.Errorf("expected empty priorityQ; got %d", l)
854886
}
887+
bq.assertInvariants(func(item *replicaItem) {
888+
require.Equal(t, float64(item.rangeID), item.priority)
889+
})
855890
// Check metrics.
856891
if v := bq.successes.Count(); v != 0 {
857892
return errors.Errorf("expected 0 processed replicas; got %d", v)
@@ -886,6 +921,9 @@ func TestBaseQueuePurgatory(t *testing.T) {
886921
if l := bq.Length(); l != 0 {
887922
return errors.Errorf("expected empty priorityQ; got %d", l)
888923
}
924+
bq.assertInvariants(func(item *replicaItem) {
925+
require.Equal(t, float64(item.rangeID), item.priority)
926+
})
889927
// Check metrics.
890928
if v := bq.successes.Count(); v != 0 {
891929
return errors.Errorf("expected 0 processed replicas; got %d", v)
@@ -918,6 +956,9 @@ func TestBaseQueuePurgatory(t *testing.T) {
918956
if pc := testQueue.getProcessed(); pc != replicaCount*3-rmReplCount {
919957
return errors.Errorf("expected %d processed replicas; got %d", replicaCount*3-rmReplCount, pc)
920958
}
959+
bq.assertInvariants(func(item *replicaItem) {
960+
require.Equal(t, float64(item.rangeID), item.priority)
961+
})
921962
// Check metrics.
922963
if v := bq.successes.Count(); v != int64(replicaCount)-rmReplCount {
923964
return errors.Errorf("expected %d processed replicas; got %d", replicaCount-rmReplCount, v)
@@ -961,6 +1002,9 @@ func TestBaseQueuePurgatory(t *testing.T) {
9611002
if pc := testQueue.getProcessed(); pc != beforeProcessCount+1 {
9621003
return errors.Errorf("expected %d processed replicas; got %d", beforeProcessCount+1, pc)
9631004
}
1005+
bq.assertInvariants(func(item *replicaItem) {
1006+
require.Equal(t, float64(item.rangeID), item.priority)
1007+
})
9641008
if v := bq.successes.Count(); v != beforeSuccessCount+1 {
9651009
return errors.Errorf("expected %d processed replicas; got %d", beforeSuccessCount+1, v)
9661010
}
@@ -1365,11 +1409,12 @@ func TestBaseQueueProcessConcurrently(t *testing.T) {
13651409
repls := createReplicas(t, &tc, 3)
13661410
r1, r2, r3 := repls[0], repls[1], repls[2]
13671411

1412+
const testPriority = 1
13681413
pQueue := &parallelQueueImpl{
13691414
testQueueImpl: testQueueImpl{
13701415
blocker: make(chan struct{}, 1),
13711416
shouldQueueFn: func(now hlc.ClockTimestamp, r *Replica) (shouldQueue bool, priority float64) {
1372-
return true, 1
1417+
return true, testPriority
13731418
},
13741419
},
13751420
processBlocker: make(chan struct{}, 1),
@@ -1414,6 +1459,9 @@ func TestBaseQueueProcessConcurrently(t *testing.T) {
14141459

14151460
pQueue.processBlocker <- struct{}{}
14161461
assertProcessedAndProcessing(3, 0)
1462+
bq.assertInvariants(func(item *replicaItem) {
1463+
require.Equal(t, float64(testPriority), item.priority)
1464+
})
14171465
}
14181466

14191467
// TestBaseQueueReplicaChange ensures that if a replica is added to the queue

0 commit comments

Comments
 (0)