Skip to content

Commit 18b8e6b

Browse files
committed
kvserver: plumb priority at enqueue for purgatory queue
Previously, replica items in the purgatory queue did not retain their enqueue time priority. This commit ensures that the priority is preserved so it can be passed to baseQueue.processReplica when processing items from purgatory.
1 parent bc56b14 commit 18b8e6b

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,6 +1155,7 @@ func (bq *baseQueue) finishProcessingReplica(
11551155
processing := item.processing
11561156
callbacks := item.callbacks
11571157
requeue := item.requeue
1158+
priority := item.priority
11581159
item.callbacks = nil
11591160
bq.removeFromReplicaSetLocked(repl.GetRangeID())
11601161
item = nil // prevent accidental use below
@@ -1185,7 +1186,7 @@ func (bq *baseQueue) finishProcessingReplica(
11851186
// purgatory.
11861187
if purgErr, ok := IsPurgatoryError(err); ok {
11871188
bq.mu.Lock()
1188-
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr)
1189+
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr, priority /*priorityAtEnqueue*/)
11891190
bq.mu.Unlock()
11901191
return
11911192
}
@@ -1205,7 +1206,11 @@ func (bq *baseQueue) finishProcessingReplica(
12051206
// addToPurgatoryLocked adds the specified replica to the purgatory queue, which
12061207
// holds replicas which have failed processing.
12071208
func (bq *baseQueue) addToPurgatoryLocked(
1208-
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError,
1209+
ctx context.Context,
1210+
stopper *stop.Stopper,
1211+
repl replicaInQueue,
1212+
purgErr PurgatoryError,
1213+
priorityAtEnqueue float64,
12091214
) {
12101215
bq.mu.AssertHeld()
12111216

@@ -1229,7 +1234,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
12291234
return
12301235
}
12311236

1232-
item := &replicaItem{rangeID: repl.GetRangeID(), replicaID: repl.ReplicaID(), index: -1}
1237+
item := &replicaItem{rangeID: repl.GetRangeID(), replicaID: repl.ReplicaID(), index: -1, priority: priorityAtEnqueue}
12331238
bq.mu.replicas[repl.GetRangeID()] = item
12341239

12351240
defer func() {
@@ -1318,7 +1323,7 @@ func (bq *baseQueue) processReplicasInPurgatory(
13181323
if _, err := bq.replicaCanBeProcessed(ctx, repl, false); err != nil {
13191324
bq.finishProcessingReplica(ctx, stopper, repl, err)
13201325
} else {
1321-
err = bq.processReplica(ctx, repl, -1 /*priorityAtEnqueue*/)
1326+
err = bq.processReplica(ctx, repl, item.priority /*priorityAtEnqueue*/)
13221327
bq.finishProcessingReplica(ctx, stopper, repl, err)
13231328
}
13241329
},

0 commit comments

Comments
 (0)