Skip to content

Commit 3642319

Browse files
craig[bot]wenyihu6
andcommitted
Merge #152512
152512: kvserver: pass priority at enqueue time to baseQueue.process r=arulajmani a=wenyihu6 Epic: none Release note: none ---- **kvserver: plumb enqueue time priority** This commit plumbs the enqueue time priority into baseQueue.processReplica, enabling comparison between the priority at enqueue time and at processing time. For now, we pass -1 in all cases except when processing replicas directly from the base queue, where -1 signals that priority verification should be skipped. No logic change has been made yet to check for priority inversion; future commits will extend processReplica to validate that processing priority has not differed significantly from the enqueue time priority. --- **kvserver: remove priority reset during setProcessing** Previously, a replicaItem’s priority was cleared when marked as processing, to indicate it was no longer in the priority queue. This behavior made sense when the purgatory queue did not track priorities. However, we now need to preserve priorities for items in purgatory as well since they will be calling into baseQueue.processReplica. This commit removes the priority reset in replicaItem.SetProcessing(), ensuring that the enqueue time priority is retained when replicas are popped from the heap and passed into the purgatory queue properly. No behavior change should happen from this change. --- **kvserver: plumb priority at enqueue for purgatory queue** Previously, replica items in the purgatory queue did not retain their enqueue time priority. This commit ensures that the priority is preserved so it can be passed to baseQueue.processReplica when processing items from purgatory. --- **allocatorimpl: adds a priority assertion to computeAction** This commit adds an assertion to Allocator.ComputeAction to ensure that priority is never -1 in cases where it shouldn’t be. Normally, ComputeAction returns action.Priority(), but we sometimes adjust the priority for specific actions like AllocatorAddVoter, AllocatorRemoveDeadVoter, and AllocatorRemoveVoter. A priority of -1 is a special case reserved for processing logic to run even if there’s a priority inversion. If the priority is not -1, the range may be re-queued to be processed with the correct priority. --- **allocatorimpl: add invariants on priority to base queue tests** This commit adds additional invariants to verify the correctness of priority plumbing for range items in base queue tests. Co-authored-by: wenyihu6 <[email protected]>
2 parents a6e4ba9 + 2201c4f commit 3642319

20 files changed

+137
-54
lines changed

pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ go_library(
2828
"//pkg/settings",
2929
"//pkg/settings/cluster",
3030
"//pkg/util/admission/admissionpb",
31+
"//pkg/util/buildutil",
3132
"//pkg/util/log",
3233
"//pkg/util/metric",
3334
"//pkg/util/stop",

pkg/kv/kvserver/allocator/allocatorimpl/allocator.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/roachpb"
2828
"github.com/cockroachdb/cockroach/pkg/settings"
2929
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
30+
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
3031
"github.com/cockroachdb/cockroach/pkg/util/log"
3132
"github.com/cockroachdb/cockroach/pkg/util/metric"
3233
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -955,8 +956,21 @@ func (a *Allocator) ComputeAction(
955956
return action, action.Priority()
956957
}
957958

958-
return a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
959+
action, priority = a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
959960
desc.Replicas().NonVoterDescriptors())
961+
// Ensure that priority is never -1. Typically, computeAction return
962+
// action.Priority(), but we sometimes modify the priority for specific
963+
// actions like AllocatorAddVoter, AllocatorRemoveDeadVoter, and
964+
// AllocatorRemoveVoter. A priority of -1 is a special case, indicating that
965+
// the caller expects the processing logic to be invoked even if there's a
966+
// priority inversion. If the priority is not -1, the range might be re-queued
967+
// to be processed with the correct priority.
968+
if priority == -1 && buildutil.CrdbTestBuild {
969+
log.Fatalf(ctx, "allocator returned -1 priority for range %s: %v", desc, action)
970+
} else {
971+
log.Warningf(ctx, "allocator returned -1 priority for range %s: %v", desc, action)
972+
}
973+
return action, priority
960974
}
961975

962976
func (a *Allocator) computeAction(

pkg/kv/kvserver/consistency_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func consistencyQueueShouldQueueImpl(
164164

165165
// process() is called on every range for which this node is a lease holder.
166166
func (q *consistencyQueue) process(
167-
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
167+
ctx context.Context, repl *Replica, _ spanconfig.StoreReader, _ float64,
168168
) (bool, error) {
169169
if q.interval() <= 0 {
170170
return false, nil

pkg/kv/kvserver/helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func manualQueue(s *Store, q queueImpl, repl *Replica) error {
212212
return fmt.Errorf("%s: system config not yet available", s)
213213
}
214214
ctx := repl.AnnotateCtx(context.Background())
215-
_, err := q.process(ctx, repl, cfg)
215+
_, err := q.process(ctx, repl, cfg, -1 /*priorityAtEnqueue*/)
216216
return err
217217
}
218218

pkg/kv/kvserver/lease_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (lq *leaseQueue) shouldQueue(
115115
}
116116

117117
func (lq *leaseQueue) process(
118-
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
118+
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, _ float64,
119119
) (processed bool, err error) {
120120
if tokenErr := repl.allocatorToken.TryAcquire(ctx, lq.name); tokenErr != nil {
121121
return false, tokenErr

pkg/kv/kvserver/merge_queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (mq *mergeQueue) requestRangeStats(
238238
}
239239

240240
func (mq *mergeQueue) process(
241-
ctx context.Context, lhsRepl *Replica, confReader spanconfig.StoreReader,
241+
ctx context.Context, lhsRepl *Replica, confReader spanconfig.StoreReader, _ float64,
242242
) (processed bool, err error) {
243243

244244
lhsDesc := lhsRepl.Desc()
@@ -419,7 +419,7 @@ func (mq *mergeQueue) process(
419419
return false, rangeMergePurgatoryError{err}
420420
}
421421
if testingAggressiveConsistencyChecks {
422-
if _, err := mq.store.consistencyQueue.process(ctx, lhsRepl, confReader); err != nil {
422+
if _, err := mq.store.consistencyQueue.process(ctx, lhsRepl, confReader, -1 /*priorityAtEnqueue*/); err != nil {
423423
log.Dev.Warningf(ctx, "%v", err)
424424
}
425425
}

pkg/kv/kvserver/mvcc_gc_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ func (r *replicaGCer) GC(
660660
// 7. push these transactions (again, recreating txn entries).
661661
// 8. send a GCRequest.
662662
func (mgcq *mvccGCQueue) process(
663-
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
663+
ctx context.Context, repl *Replica, _ spanconfig.StoreReader, _ float64,
664664
) (processed bool, err error) {
665665
// Record the CPU time processing the request for this replica. This is
666666
// recorded regardless of errors that are encountered.

pkg/kv/kvserver/mvcc_gc_queue_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,7 @@ func testMVCCGCQueueProcessImpl(t *testing.T, snapshotBounds bool) {
911911

912912
// Process through a scan queue.
913913
mgcq := newMVCCGCQueue(tc.store)
914-
processed, err := mgcq.process(ctx, tc.repl, cfg)
914+
processed, err := mgcq.process(ctx, tc.repl, cfg, -1 /* priorityAtEnqueue */)
915915
if err != nil {
916916
t.Fatal(err)
917917
}
@@ -1162,7 +1162,7 @@ func TestMVCCGCQueueTransactionTable(t *testing.T) {
11621162
t.Fatal(err)
11631163
}
11641164

1165-
processed, err := mgcq.process(ctx, tc.repl, cfg)
1165+
processed, err := mgcq.process(ctx, tc.repl, cfg, -1 /* priorityAtEnqueue */)
11661166
if err != nil {
11671167
t.Fatal(err)
11681168
}
@@ -1179,7 +1179,7 @@ func TestMVCCGCQueueTransactionTable(t *testing.T) {
11791179
if err != nil {
11801180
return err
11811181
}
1182-
if expGC := (sp.newStatus == -1); expGC {
1182+
if expGC := (sp.newStatus == -1 /* priorityAtEnqueue */); expGC {
11831183
if expGC != !ok {
11841184
return fmt.Errorf("%s: expected gc: %t, but found %s\n%s", strKey, expGC, txn, roachpb.Key(strKey))
11851185
}
@@ -1296,7 +1296,7 @@ func TestMVCCGCQueueIntentResolution(t *testing.T) {
12961296
t.Fatal(err)
12971297
}
12981298
mgcq := newMVCCGCQueue(tc.store)
1299-
processed, err := mgcq.process(ctx, tc.repl, confReader)
1299+
processed, err := mgcq.process(ctx, tc.repl, confReader, -1 /* priorityAtEnqueue */)
13001300
if err != nil {
13011301
t.Fatal(err)
13021302
}
@@ -1361,7 +1361,7 @@ func TestMVCCGCQueueLastProcessedTimestamps(t *testing.T) {
13611361

13621362
// Process through a scan queue.
13631363
mgcq := newMVCCGCQueue(tc.store)
1364-
processed, err := mgcq.process(ctx, tc.repl, confReader)
1364+
processed, err := mgcq.process(ctx, tc.repl, confReader, -1 /* priorityAtEnqueue */)
13651365
if err != nil {
13661366
t.Fatal(err)
13671367
}
@@ -1472,7 +1472,7 @@ func TestMVCCGCQueueChunkRequests(t *testing.T) {
14721472
}
14731473
tc.manualClock.Advance(conf.TTL() + 1)
14741474
mgcq := newMVCCGCQueue(tc.store)
1475-
processed, err := mgcq.process(ctx, tc.repl, confReader)
1475+
processed, err := mgcq.process(ctx, tc.repl, confReader, -1 /* priorityAtEnqueue */)
14761476
if err != nil {
14771477
t.Fatal(err)
14781478
}

pkg/kv/kvserver/queue.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ type replicaItem struct {
123123
replicaID roachpb.ReplicaID
124124
seq int // enforce FIFO order for equal priorities
125125

126-
// fields used when a replicaItem is enqueued in a priority queue.
126+
// fields used when a replicaItem is enqueued in a priority queue. This field
127+
// is preserved for purgatory queue as well since baseQueue.processReplica
128+
// requies it.
127129
priority float64
128130
index int // The index of the item in the heap, maintained by the heap.Interface methods
129131

@@ -135,7 +137,6 @@ type replicaItem struct {
135137

136138
// setProcessing moves the item from an enqueued state to a processing state.
137139
func (i *replicaItem) setProcessing() {
138-
i.priority = 0
139140
if i.index >= 0 {
140141
log.Dev.Fatalf(context.Background(),
141142
"r%d marked as processing but appears in prioQ", i.rangeID,
@@ -260,7 +261,7 @@ type queueImpl interface {
260261
// queue-specific work on it. The Replica is guaranteed to be initialized.
261262
// We return a boolean to indicate if the Replica was processed successfully
262263
// (vs. it being a no-op or an error).
263-
process(context.Context, *Replica, spanconfig.StoreReader) (processed bool, err error)
264+
process(context.Context, *Replica, spanconfig.StoreReader, float64) (processed bool, err error)
264265

265266
// processScheduled is called after async task was created to run process.
266267
// This function is called by the process loop synchronously. This method is
@@ -876,10 +877,10 @@ func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
876877
// Acquire from the process semaphore.
877878
bq.processSem <- struct{}{}
878879

879-
repl, priority := bq.pop()
880+
repl, priorityAtEnqueue := bq.pop()
880881
if repl != nil {
881-
bq.processOneAsyncAndReleaseSem(ctx, repl, stopper)
882-
bq.impl.postProcessScheduled(ctx, repl, priority)
882+
bq.processOneAsyncAndReleaseSem(ctx, repl, stopper, priorityAtEnqueue)
883+
bq.impl.postProcessScheduled(ctx, repl, priorityAtEnqueue)
883884
} else {
884885
// Release semaphore if no replicas were available.
885886
<-bq.processSem
@@ -907,7 +908,7 @@ func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
907908
// processOneAsyncAndReleaseSem processes a replica if possible and releases the
908909
// processSem when the processing is complete.
909910
func (bq *baseQueue) processOneAsyncAndReleaseSem(
910-
ctx context.Context, repl replicaInQueue, stopper *stop.Stopper,
911+
ctx context.Context, repl replicaInQueue, stopper *stop.Stopper, priorityAtEnqueue float64,
911912
) {
912913
ctx = repl.AnnotateCtx(ctx)
913914
taskName := bq.processOpName() + " [outer]"
@@ -923,7 +924,7 @@ func (bq *baseQueue) processOneAsyncAndReleaseSem(
923924
// Release semaphore when finished processing.
924925
defer func() { <-bq.processSem }()
925926
start := timeutil.Now()
926-
err := bq.processReplica(ctx, repl)
927+
err := bq.processReplica(ctx, repl, priorityAtEnqueue)
927928
bq.recordProcessDuration(ctx, timeutil.Since(start))
928929
bq.finishProcessingReplica(ctx, stopper, repl, err)
929930
}); err != nil {
@@ -956,7 +957,9 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
956957
//
957958
// ctx should already be annotated by both bq.AnnotateCtx() and
958959
// repl.AnnotateCtx().
959-
func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error {
960+
func (bq *baseQueue) processReplica(
961+
ctx context.Context, repl replicaInQueue, priorityAtEnqueue float64,
962+
) error {
960963

961964
ctx, span := tracing.EnsureChildSpan(ctx, bq.Tracer, bq.processOpName())
962965
defer span.Finish()
@@ -980,7 +983,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er
980983
// it may not be and shouldQueue will be passed a nil realRepl. These tests
981984
// know what they're getting into so that's fine.
982985
realRepl, _ := repl.(*Replica)
983-
processed, err := bq.impl.process(ctx, realRepl, conf)
986+
processed, err := bq.impl.process(ctx, realRepl, conf, priorityAtEnqueue)
984987
if err != nil {
985988
return err
986989
}
@@ -1095,19 +1098,24 @@ func IsPurgatoryError(err error) (PurgatoryError, bool) {
10951098
}
10961099

10971100
// assertInvariants codifies the guarantees upheld by the data structures in the
1098-
// base queue. In summary, a replica is one of:
1101+
// base queue.
1102+
// 1. In summary, a replica is one of:
10991103
// - "queued" and in mu.replicas and mu.priorityQ
11001104
// - "processing" and only in mu.replicas
11011105
// - "purgatory" and in mu.replicas and mu.purgatory
1106+
// 2. For every item in bq.mu.priorityQ.sl, bq.mu.purgatory, and bq.mu.replicas,
1107+
// assertOnReplicaItem callback is called with the item. Note that we expect
1108+
// items in priorityQ and purgatory to be in replicas.
11021109
//
11031110
// Note that in particular, nothing is ever in both mu.priorityQ and
11041111
// mu.purgatory.
1105-
func (bq *baseQueue) assertInvariants() {
1112+
func (bq *baseQueue) assertInvariants(assertOnReplicaItem func(item *replicaItem)) {
11061113
bq.mu.Lock()
11071114
defer bq.mu.Unlock()
11081115

11091116
ctx := bq.AnnotateCtx(context.Background())
11101117
for _, item := range bq.mu.priorityQ.sl {
1118+
assertOnReplicaItem(item)
11111119
if item.processing {
11121120
log.Dev.Fatalf(ctx, "processing item found in prioQ: %v", item)
11131121
}
@@ -1120,6 +1128,7 @@ func (bq *baseQueue) assertInvariants() {
11201128
}
11211129
for rangeID := range bq.mu.purgatory {
11221130
item, inReplicas := bq.mu.replicas[rangeID]
1131+
assertOnReplicaItem(item)
11231132
if !inReplicas {
11241133
log.Dev.Fatalf(ctx, "item found in purg but not in mu.replicas: %v", item)
11251134
}
@@ -1134,6 +1143,7 @@ func (bq *baseQueue) assertInvariants() {
11341143
// that there aren't any non-processing replicas *only* in bq.mu.replicas.
11351144
var nNotProcessing int
11361145
for _, item := range bq.mu.replicas {
1146+
assertOnReplicaItem(item)
11371147
if !item.processing {
11381148
nNotProcessing++
11391149
}
@@ -1158,6 +1168,7 @@ func (bq *baseQueue) finishProcessingReplica(
11581168
processing := item.processing
11591169
callbacks := item.callbacks
11601170
requeue := item.requeue
1171+
priority := item.priority
11611172
item.callbacks = nil
11621173
bq.removeFromReplicaSetLocked(repl.GetRangeID())
11631174
item = nil // prevent accidental use below
@@ -1188,7 +1199,7 @@ func (bq *baseQueue) finishProcessingReplica(
11881199
// purgatory.
11891200
if purgErr, ok := IsPurgatoryError(err); ok {
11901201
bq.mu.Lock()
1191-
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr)
1202+
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr, priority /*priorityAtEnqueue*/)
11921203
bq.mu.Unlock()
11931204
return
11941205
}
@@ -1208,7 +1219,11 @@ func (bq *baseQueue) finishProcessingReplica(
12081219
// addToPurgatoryLocked adds the specified replica to the purgatory queue, which
12091220
// holds replicas which have failed processing.
12101221
func (bq *baseQueue) addToPurgatoryLocked(
1211-
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError,
1222+
ctx context.Context,
1223+
stopper *stop.Stopper,
1224+
repl replicaInQueue,
1225+
purgErr PurgatoryError,
1226+
priorityAtEnqueue float64,
12121227
) {
12131228
bq.mu.AssertHeld()
12141229

@@ -1232,7 +1247,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
12321247
return
12331248
}
12341249

1235-
item := &replicaItem{rangeID: repl.GetRangeID(), replicaID: repl.ReplicaID(), index: -1}
1250+
item := &replicaItem{rangeID: repl.GetRangeID(), replicaID: repl.ReplicaID(), index: -1, priority: priorityAtEnqueue}
12361251
bq.mu.replicas[repl.GetRangeID()] = item
12371252

12381253
defer func() {
@@ -1321,7 +1336,7 @@ func (bq *baseQueue) processReplicasInPurgatory(
13211336
if _, err := bq.replicaCanBeProcessed(ctx, repl, false); err != nil {
13221337
bq.finishProcessingReplica(ctx, stopper, repl, err)
13231338
} else {
1324-
err = bq.processReplica(ctx, repl)
1339+
err = bq.processReplica(ctx, repl, item.priority /*priorityAtEnqueue*/)
13251340
bq.finishProcessingReplica(ctx, stopper, repl, err)
13261341
}
13271342
},
@@ -1448,7 +1463,7 @@ func (bq *baseQueue) DrainQueue(ctx context.Context, stopper *stop.Stopper) {
14481463
if _, err := bq.replicaCanBeProcessed(annotatedCtx, repl, false); err != nil {
14491464
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
14501465
} else {
1451-
err = bq.processReplica(annotatedCtx, repl)
1466+
err = bq.processReplica(annotatedCtx, repl, -1 /*priorityAtEnqueue*/)
14521467
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
14531468
}
14541469
}

pkg/kv/kvserver/queue_concurrency_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func TestBaseQueueConcurrent(t *testing.T) {
112112
})
113113
}
114114
g.Go(func() error {
115-
bq.assertInvariants()
115+
bq.assertInvariants(func(item *replicaItem) {})
116116
return nil
117117
})
118118
}
@@ -140,7 +140,7 @@ func (fakeQueueImpl) shouldQueue(
140140
}
141141

142142
func (fq fakeQueueImpl) process(
143-
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
143+
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, _ float64,
144144
) (bool, error) {
145145
return fq.pr(ctx, repl, confReader)
146146
}

0 commit comments

Comments
 (0)