Skip to content

Commit f441e4b

Browse files
committed
kvserver: plumb enqueue time priority
This commit plumbs the enqueue time priority into baseQueue.processReplica, enabling comparison between the priority at enqueue time and at processing time. For now, we pass -1 in all cases except when processing replicas directly from the base queue, where -1 signals that priority verification should be skipped. No logic change has been made yet to check for priority inversion; future commits will extend processReplica to validate that processing priority has not differed significantly from the enqueue time priority.
1 parent 9bee340 commit f441e4b

18 files changed

+53
-44
lines changed

pkg/kv/kvserver/consistency_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ func consistencyQueueShouldQueueImpl(
164164

165165
// process() is called on every range for which this node is a lease holder.
166166
func (q *consistencyQueue) process(
167-
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
167+
ctx context.Context, repl *Replica, _ spanconfig.StoreReader, _ float64,
168168
) (bool, error) {
169169
if q.interval() <= 0 {
170170
return false, nil

pkg/kv/kvserver/helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func manualQueue(s *Store, q queueImpl, repl *Replica) error {
212212
return fmt.Errorf("%s: system config not yet available", s)
213213
}
214214
ctx := repl.AnnotateCtx(context.Background())
215-
_, err := q.process(ctx, repl, cfg)
215+
_, err := q.process(ctx, repl, cfg, -1 /*priorityAtEnqueue*/)
216216
return err
217217
}
218218

pkg/kv/kvserver/lease_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (lq *leaseQueue) shouldQueue(
115115
}
116116

117117
func (lq *leaseQueue) process(
118-
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
118+
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, _ float64,
119119
) (processed bool, err error) {
120120
if tokenErr := repl.allocatorToken.TryAcquire(ctx, lq.name); tokenErr != nil {
121121
return false, tokenErr

pkg/kv/kvserver/merge_queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (mq *mergeQueue) requestRangeStats(
238238
}
239239

240240
func (mq *mergeQueue) process(
241-
ctx context.Context, lhsRepl *Replica, confReader spanconfig.StoreReader,
241+
ctx context.Context, lhsRepl *Replica, confReader spanconfig.StoreReader, _ float64,
242242
) (processed bool, err error) {
243243

244244
lhsDesc := lhsRepl.Desc()
@@ -419,7 +419,7 @@ func (mq *mergeQueue) process(
419419
return false, rangeMergePurgatoryError{err}
420420
}
421421
if testingAggressiveConsistencyChecks {
422-
if _, err := mq.store.consistencyQueue.process(ctx, lhsRepl, confReader); err != nil {
422+
if _, err := mq.store.consistencyQueue.process(ctx, lhsRepl, confReader, -1 /*priorityAtEnqueue*/); err != nil {
423423
log.Dev.Warningf(ctx, "%v", err)
424424
}
425425
}

pkg/kv/kvserver/mvcc_gc_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ func (r *replicaGCer) GC(
660660
// 7. push these transactions (again, recreating txn entries).
661661
// 8. send a GCRequest.
662662
func (mgcq *mvccGCQueue) process(
663-
ctx context.Context, repl *Replica, _ spanconfig.StoreReader,
663+
ctx context.Context, repl *Replica, _ spanconfig.StoreReader, _ float64,
664664
) (processed bool, err error) {
665665
// Record the CPU time processing the request for this replica. This is
666666
// recorded regardless of errors that are encountered.

pkg/kv/kvserver/mvcc_gc_queue_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,7 @@ func testMVCCGCQueueProcessImpl(t *testing.T, snapshotBounds bool) {
911911

912912
// Process through a scan queue.
913913
mgcq := newMVCCGCQueue(tc.store)
914-
processed, err := mgcq.process(ctx, tc.repl, cfg)
914+
processed, err := mgcq.process(ctx, tc.repl, cfg, -1 /* priorityAtEnqueue */)
915915
if err != nil {
916916
t.Fatal(err)
917917
}
@@ -1162,7 +1162,7 @@ func TestMVCCGCQueueTransactionTable(t *testing.T) {
11621162
t.Fatal(err)
11631163
}
11641164

1165-
processed, err := mgcq.process(ctx, tc.repl, cfg)
1165+
processed, err := mgcq.process(ctx, tc.repl, cfg, -1 /* priorityAtEnqueue */)
11661166
if err != nil {
11671167
t.Fatal(err)
11681168
}
@@ -1179,7 +1179,7 @@ func TestMVCCGCQueueTransactionTable(t *testing.T) {
11791179
if err != nil {
11801180
return err
11811181
}
1182-
if expGC := (sp.newStatus == -1); expGC {
1182+
if expGC := (sp.newStatus == -1 /* priorityAtEnqueue */); expGC {
11831183
if expGC != !ok {
11841184
return fmt.Errorf("%s: expected gc: %t, but found %s\n%s", strKey, expGC, txn, roachpb.Key(strKey))
11851185
}
@@ -1296,7 +1296,7 @@ func TestMVCCGCQueueIntentResolution(t *testing.T) {
12961296
t.Fatal(err)
12971297
}
12981298
mgcq := newMVCCGCQueue(tc.store)
1299-
processed, err := mgcq.process(ctx, tc.repl, confReader)
1299+
processed, err := mgcq.process(ctx, tc.repl, confReader, -1 /* priorityAtEnqueue */)
13001300
if err != nil {
13011301
t.Fatal(err)
13021302
}
@@ -1361,7 +1361,7 @@ func TestMVCCGCQueueLastProcessedTimestamps(t *testing.T) {
13611361

13621362
// Process through a scan queue.
13631363
mgcq := newMVCCGCQueue(tc.store)
1364-
processed, err := mgcq.process(ctx, tc.repl, confReader)
1364+
processed, err := mgcq.process(ctx, tc.repl, confReader, -1 /* priorityAtEnqueue */)
13651365
if err != nil {
13661366
t.Fatal(err)
13671367
}
@@ -1472,7 +1472,7 @@ func TestMVCCGCQueueChunkRequests(t *testing.T) {
14721472
}
14731473
tc.manualClock.Advance(conf.TTL() + 1)
14741474
mgcq := newMVCCGCQueue(tc.store)
1475-
processed, err := mgcq.process(ctx, tc.repl, confReader)
1475+
processed, err := mgcq.process(ctx, tc.repl, confReader, -1 /* priorityAtEnqueue */)
14761476
if err != nil {
14771477
t.Fatal(err)
14781478
}

pkg/kv/kvserver/queue.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ type replicaItem struct {
123123
replicaID roachpb.ReplicaID
124124
seq int // enforce FIFO order for equal priorities
125125

126-
// fields used when a replicaItem is enqueued in a priority queue.
126+
// fields used when a replicaItem is enqueued in a priority queue. This field
127+
// is preserved for purgatory queue as well since baseQueue.processReplica
128+
// requies it.
127129
priority float64
128130
index int // The index of the item in the heap, maintained by the heap.Interface methods
129131

@@ -260,7 +262,7 @@ type queueImpl interface {
260262
// queue-specific work on it. The Replica is guaranteed to be initialized.
261263
// We return a boolean to indicate if the Replica was processed successfully
262264
// (vs. it being a no-op or an error).
263-
process(context.Context, *Replica, spanconfig.StoreReader) (processed bool, err error)
265+
process(context.Context, *Replica, spanconfig.StoreReader, float64) (processed bool, err error)
264266

265267
// processScheduled is called after async task was created to run process.
266268
// This function is called by the process loop synchronously. This method is
@@ -870,10 +872,10 @@ func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
870872
// Acquire from the process semaphore.
871873
bq.processSem <- struct{}{}
872874

873-
repl, priority := bq.pop()
875+
repl, priorityAtEnqueue := bq.pop()
874876
if repl != nil {
875-
bq.processOneAsyncAndReleaseSem(ctx, repl, stopper)
876-
bq.impl.postProcessScheduled(ctx, repl, priority)
877+
bq.processOneAsyncAndReleaseSem(ctx, repl, stopper, priorityAtEnqueue)
878+
bq.impl.postProcessScheduled(ctx, repl, priorityAtEnqueue)
877879
} else {
878880
// Release semaphore if no replicas were available.
879881
<-bq.processSem
@@ -901,7 +903,7 @@ func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
901903
// processOneAsyncAndReleaseSem processes a replica if possible and releases the
902904
// processSem when the processing is complete.
903905
func (bq *baseQueue) processOneAsyncAndReleaseSem(
904-
ctx context.Context, repl replicaInQueue, stopper *stop.Stopper,
906+
ctx context.Context, repl replicaInQueue, stopper *stop.Stopper, priorityAtEnqueue float64,
905907
) {
906908
ctx = repl.AnnotateCtx(ctx)
907909
taskName := bq.processOpName() + " [outer]"
@@ -917,7 +919,7 @@ func (bq *baseQueue) processOneAsyncAndReleaseSem(
917919
// Release semaphore when finished processing.
918920
defer func() { <-bq.processSem }()
919921
start := timeutil.Now()
920-
err := bq.processReplica(ctx, repl)
922+
err := bq.processReplica(ctx, repl, priorityAtEnqueue)
921923
bq.recordProcessDuration(ctx, timeutil.Since(start))
922924
bq.finishProcessingReplica(ctx, stopper, repl, err)
923925
}); err != nil {
@@ -950,7 +952,9 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
950952
//
951953
// ctx should already be annotated by both bq.AnnotateCtx() and
952954
// repl.AnnotateCtx().
953-
func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error {
955+
func (bq *baseQueue) processReplica(
956+
ctx context.Context, repl replicaInQueue, priorityAtEnqueue float64,
957+
) error {
954958

955959
ctx, span := tracing.EnsureChildSpan(ctx, bq.Tracer, bq.processOpName())
956960
defer span.Finish()
@@ -974,7 +978,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er
974978
// it may not be and shouldQueue will be passed a nil realRepl. These tests
975979
// know what they're getting into so that's fine.
976980
realRepl, _ := repl.(*Replica)
977-
processed, err := bq.impl.process(ctx, realRepl, conf)
981+
processed, err := bq.impl.process(ctx, realRepl, conf, priorityAtEnqueue)
978982
if err != nil {
979983
return err
980984
}
@@ -1315,7 +1319,7 @@ func (bq *baseQueue) processReplicasInPurgatory(
13151319
if _, err := bq.replicaCanBeProcessed(ctx, repl, false); err != nil {
13161320
bq.finishProcessingReplica(ctx, stopper, repl, err)
13171321
} else {
1318-
err = bq.processReplica(ctx, repl)
1322+
err = bq.processReplica(ctx, repl, -1 /*priorityAtEnqueue*/)
13191323
bq.finishProcessingReplica(ctx, stopper, repl, err)
13201324
}
13211325
},
@@ -1442,7 +1446,7 @@ func (bq *baseQueue) DrainQueue(ctx context.Context, stopper *stop.Stopper) {
14421446
if _, err := bq.replicaCanBeProcessed(annotatedCtx, repl, false); err != nil {
14431447
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
14441448
} else {
1445-
err = bq.processReplica(annotatedCtx, repl)
1449+
err = bq.processReplica(annotatedCtx, repl, -1 /*priorityAtEnqueue*/)
14461450
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
14471451
}
14481452
}

pkg/kv/kvserver/queue_concurrency_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (fakeQueueImpl) shouldQueue(
140140
}
141141

142142
func (fq fakeQueueImpl) process(
143-
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
143+
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, _ float64,
144144
) (bool, error) {
145145
return fq.pr(ctx, repl, confReader)
146146
}

pkg/kv/kvserver/queue_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (tq *testQueueImpl) shouldQueue(
5858
}
5959

6060
func (tq *testQueueImpl) process(
61-
_ context.Context, _ *Replica, _ spanconfig.StoreReader,
61+
_ context.Context, _ *Replica, _ spanconfig.StoreReader, _ float64,
6262
) (bool, error) {
6363
defer atomic.AddInt32(&tq.processed, 1)
6464
if tq.err != nil {
@@ -984,7 +984,7 @@ type processTimeoutQueueImpl struct {
984984
var _ queueImpl = &processTimeoutQueueImpl{}
985985

986986
func (pq *processTimeoutQueueImpl) process(
987-
ctx context.Context, r *Replica, _ spanconfig.StoreReader,
987+
ctx context.Context, r *Replica, _ spanconfig.StoreReader, _ float64,
988988
) (processed bool, err error) {
989989
<-ctx.Done()
990990
atomic.AddInt32(&pq.processed, 1)
@@ -1114,7 +1114,7 @@ type processTimeQueueImpl struct {
11141114
var _ queueImpl = &processTimeQueueImpl{}
11151115

11161116
func (pq *processTimeQueueImpl) process(
1117-
_ context.Context, _ *Replica, _ spanconfig.StoreReader,
1117+
_ context.Context, _ *Replica, _ spanconfig.StoreReader, _ float64,
11181118
) (processed bool, err error) {
11191119
time.Sleep(5 * time.Millisecond)
11201120
return true, nil
@@ -1338,13 +1338,13 @@ type parallelQueueImpl struct {
13381338
var _ queueImpl = &parallelQueueImpl{}
13391339

13401340
func (pq *parallelQueueImpl) process(
1341-
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader,
1341+
ctx context.Context, repl *Replica, confReader spanconfig.StoreReader, priority float64,
13421342
) (processed bool, err error) {
13431343
atomic.AddInt32(&pq.processing, 1)
13441344
if pq.processBlocker != nil {
13451345
<-pq.processBlocker
13461346
}
1347-
processed, err = pq.testQueueImpl.process(ctx, repl, confReader)
1347+
processed, err = pq.testQueueImpl.process(ctx, repl, confReader, priority)
13481348
atomic.AddInt32(&pq.processing, -1)
13491349
return processed, err
13501350
}

pkg/kv/kvserver/raft_log_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ func (rlq *raftLogQueue) shouldQueueImpl(
659659
// leader and if the total number of the range's raft log's stale entries
660660
// exceeds RaftLogQueueStaleThreshold.
661661
func (rlq *raftLogQueue) process(
662-
ctx context.Context, r *Replica, _ spanconfig.StoreReader,
662+
ctx context.Context, r *Replica, _ spanconfig.StoreReader, _ float64,
663663
) (processed bool, err error) {
664664
decision, err := newTruncateDecision(ctx, r)
665665
if err != nil {

0 commit comments

Comments
 (0)