Skip to content

Commit d8dec00

Browse files
craig[bot]wenyihu6
andcommitted
Merge #152697
152697: kvserver: track priority inversion in replicate queue metrics r=tbg a=wenyihu6 Part of: #151847 Resolves: #152022 Release note: none ---- **kvserver: track priority inversion in replicate queue metrics** Previously, replicas could be enqueued at a high priority but end up processing a lower-priority actions, causing priority inversion and unfairness to other replicas behind them that needs a repair action. This commit adds metrics to track such cases. In addition, this commit also adds metrics to track when replicas are requeued in the replicate queue due to a priority inversion from a repair action to a rebalance action. --- **kvserver: add TestPriorityInversionRequeue** Previously, we added priority inversion requeuing mechanism. This commit adds a unit test that forces the race condition we suspected to be happening in escalations involving priority inversion and asserts that priority inversion occurs and that the replica is correctly requeued. Test set up: 1. range’s leaseholder replica is rebalanced from one store to another. 2. new leaseholder enqueues the replica for repair with high priority (e.g. to finalize the atomic replication change or remove a learner replica) 3. before processing, the old leaseholder completes the change (exits the joint config or removes the learner). 4. when the new leaseholder processes the replica, it computes a ConsiderRebalance action, resulting in a priority inversion and potentially blocking other high-priority work. --- **kvserver: delete per action priority inversion metrics** This commit removes per-action priority inversion metrics due to their high cardinality. We already have logging in place, which should provide sufficient observability. For now, we care about is priority inversion that leads to consider rebalance and requeuing the most. Co-authored-by: wenyihu6 <[email protected]>
2 parents f328e00 + a96cb74 commit d8dec00

File tree

5 files changed

+163
-0
lines changed

5 files changed

+163
-0
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13951,6 +13951,22 @@ layers:
1395113951
unit: COUNT
1395213952
aggregation: AVG
1395313953
derivative: NONE
13954+
- name: queue.replicate.priority_inversion.requeue
13955+
exported_name: queue_replicate_priority_inversion_requeue
13956+
description: Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time. When the priority has changed from a high priority repair action to rebalance, the change is requeued to avoid unfairness.
13957+
y_axis_label: Replicas
13958+
type: COUNTER
13959+
unit: COUNT
13960+
aggregation: AVG
13961+
derivative: NON_NEGATIVE_DERIVATIVE
13962+
- name: queue.replicate.priority_inversion.total
13963+
exported_name: queue_replicate_priority_inversion_total
13964+
description: Total number of priority inversions in the replicate queue. A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time
13965+
y_axis_label: Replicas
13966+
type: COUNTER
13967+
unit: COUNT
13968+
aggregation: AVG
13969+
derivative: NON_NEGATIVE_DERIVATIVE
1395413970
- name: queue.replicate.process.failure
1395513971
exported_name: queue_replicate_process_failure
1395613972
description: Number of replicas which failed processing in the replicate queue

pkg/kv/kvserver/queue.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,6 +1001,9 @@ func (bq *baseQueue) addInternal(
10011001
default:
10021002
// No need to signal again.
10031003
}
1004+
if postEnqueueInterceptor := bq.store.TestingKnobs().BaseQueuePostEnqueueInterceptor; postEnqueueInterceptor != nil {
1005+
postEnqueueInterceptor(bq.store.StoreID(), desc.RangeID)
1006+
}
10041007
// Note that we are bumping enqueueAdd here instead of during defer to avoid
10051008
// treating requeuing a processing replica as newly added. They will be
10061009
// re-added to the queue later which will double count them.

pkg/kv/kvserver/replicate_queue.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,22 @@ var (
322322
Measurement: "Replicas",
323323
Unit: metric.Unit_COUNT,
324324
}
325+
metaReplicateQueueRequeueDueToPriorityInversion = metric.Metadata{
326+
Name: "queue.replicate.priority_inversion.requeue",
327+
Help: "Number of priority inversions in the replicate queue that resulted in requeuing of the replicas. " +
328+
"A priority inversion occurs when the priority at processing time ends up being lower " +
329+
"than at enqueue time. When the priority has changed from a high priority repair action to rebalance, " +
330+
"the change is requeued to avoid unfairness.",
331+
Measurement: "Replicas",
332+
Unit: metric.Unit_COUNT,
333+
}
334+
metaReplicateQueuePriorityInversionTotal = metric.Metadata{
335+
Name: "queue.replicate.priority_inversion.total",
336+
Help: "Total number of priority inversions in the replicate queue. " +
337+
"A priority inversion occurs when the priority at processing time ends up being lower than at enqueue time",
338+
Measurement: "Replicas",
339+
Unit: metric.Unit_COUNT,
340+
}
325341
)
326342

327343
// quorumError indicates a retryable error condition which sends replicas being
@@ -344,6 +360,7 @@ func (e *quorumError) Error() string {
344360
func (*quorumError) PurgatoryErrorMarker() {}
345361

346362
// ReplicateQueueMetrics is the set of metrics for the replicate queue.
363+
// TODO(wenyihu6): metrics initialization could be cleaned up by using a map.
347364
type ReplicateQueueMetrics struct {
348365
AddReplicaCount *metric.Counter
349366
AddVoterReplicaCount *metric.Counter
@@ -381,6 +398,10 @@ type ReplicateQueueMetrics struct {
381398
// TODO(sarkesian): Consider adding metrics for AllocatorRemoveLearner,
382399
// AllocatorConsiderRebalance, and AllocatorFinalizeAtomicReplicationChange
383400
// allocator actions.
401+
402+
// Priority Inversion.
403+
RequeueDueToPriorityInversion *metric.Counter
404+
PriorityInversionTotal *metric.Counter
384405
}
385406

386407
func makeReplicateQueueMetrics() ReplicateQueueMetrics {
@@ -417,6 +438,9 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics {
417438
ReplaceDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaErrorCount),
418439
RemoveDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaSuccessCount),
419440
RemoveDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaErrorCount),
441+
442+
RequeueDueToPriorityInversion: metric.NewCounter(metaReplicateQueueRequeueDueToPriorityInversion),
443+
PriorityInversionTotal: metric.NewCounter(metaReplicateQueuePriorityInversionTotal),
420444
}
421445
}
422446

@@ -955,12 +979,14 @@ func (rq *replicateQueue) processOneChange(
955979
// starving other higher priority work.
956980
if PriorityInversionRequeue.Get(&rq.store.cfg.Settings.SV) {
957981
if inversion, shouldRequeue := allocatorimpl.CheckPriorityInversion(priorityAtEnqueue, change.Action); inversion {
982+
rq.metrics.PriorityInversionTotal.Inc(1)
958983
if priorityInversionLogEveryN.ShouldLog() {
959984
log.KvDistribution.Infof(ctx,
960985
"priority inversion during process: shouldRequeue = %t action=%s, priority=%v, enqueuePriority=%v",
961986
shouldRequeue, change.Action, change.Action.Priority(), priorityAtEnqueue)
962987
}
963988
if shouldRequeue {
989+
rq.metrics.RequeueDueToPriorityInversion.Inc(1)
964990
// Return true to requeue the range. Return the error to ensure it is
965991
// logged and tracked in replicate queue bq.failures metrics. See
966992
// replicateQueue.process for details.

pkg/kv/kvserver/replicate_queue_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2540,3 +2540,117 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
25402540
afterProcessSuccess := getDecommissioningNudgerMetricValue(t, tc, "process_success")
25412541
require.Greater(t, afterProcessSuccess, int64(0))
25422542
}
2543+
2544+
// TestPriorityInversionRequeue tests that the replicate queue correctly handles
2545+
// priority inversions by requeuing replicas when the PriorityInversionRequeue
2546+
// setting is enabled.
2547+
//
2548+
// This test specifically targets a race condition where:
2549+
// 1. A replica is enqueued for a high-priority repair action
2550+
// (FinalizeAtomicReplicationChange or RemoveLearner).
2551+
// 2. By the time the replica is processed, the repair is no longer needed and
2552+
// only a low-priority rebalance action (ConsiderRebalance) is computed.
2553+
// 3. This creates a priority inversion where a low-priority action blocks
2554+
// other higher-priority replicas in the queue from being processed.
2555+
//
2556+
// The race occurs during range rebalancing:
2557+
// 1. A leaseholder replica of a range is rebalanced from one store to another.
2558+
// 2. The new leaseholder enqueues the replica for repair (e.g. to finalize
2559+
// the atomic replication change or remove a learner replica).
2560+
// 3. Before processing, the old leaseholder has left the atomic joint config
2561+
// state or removed the learner replica. 4. When the new leaseholder processes
2562+
// the replica, it computes a ConsiderRebalance action, causing priority
2563+
// inversion.
2564+
//
2565+
// With PriorityInversionRequeue enabled, the queue should detect this condition
2566+
// and requeue the replica at the correct priority. The test validates this
2567+
// behavior through metrics that track priority inversions and requeuing events.
2568+
func TestPriorityInversionRequeue(t *testing.T) {
2569+
defer leaktest.AfterTest(t)()
2570+
defer log.Scope(t).Close(t)
2571+
skip.UnderDuress(t)
2572+
2573+
ctx := context.Background()
2574+
settings := cluster.MakeTestingClusterSettings()
2575+
kvserver.PriorityInversionRequeue.Override(ctx, &settings.SV, true)
2576+
2577+
var scratchRangeID int64
2578+
atomic.StoreInt64(&scratchRangeID, -1)
2579+
require.NoError(t, log.SetVModule("queue=5,replicate_queue=5,replica_command=5,replicate=5,replica=5"))
2580+
2581+
const newLeaseholderStoreAndNodeID = 4
2582+
var waitUntilLeavingJoint = func() {}
2583+
2584+
tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
2585+
ReplicationMode: base.ReplicationManual,
2586+
ServerArgs: base.TestServerArgs{
2587+
Settings: settings,
2588+
Knobs: base.TestingKnobs{
2589+
Store: &kvserver.StoreTestingKnobs{
2590+
BaseQueueDisabledBypassFilter: func(rangeID roachpb.RangeID) bool {
2591+
// Disable the replicate queue except for the scratch range on the new leaseholder.
2592+
t.Logf("range %d is added to replicate queue store", rangeID)
2593+
return rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID))
2594+
},
2595+
BaseQueuePostEnqueueInterceptor: func(storeID roachpb.StoreID, rangeID roachpb.RangeID) {
2596+
// After enqueuing, wait for the old leaseholder to leave the atomic
2597+
// joint config state or remove the learner replica to force the
2598+
// priority inversion.
2599+
t.Logf("waiting for %d to leave joint config", rangeID)
2600+
if storeID == 4 && rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) {
2601+
waitUntilLeavingJoint()
2602+
}
2603+
},
2604+
},
2605+
},
2606+
},
2607+
})
2608+
defer tc.Stopper().Stop(ctx)
2609+
2610+
scratchKey := tc.ScratchRange(t)
2611+
2612+
// Wait until the old leaseholder has left the atomic joint config state or
2613+
// removed the learner replica.
2614+
waitUntilLeavingJoint = func() {
2615+
testutils.SucceedsSoon(t, func() error {
2616+
rangeDesc := tc.LookupRangeOrFatal(t, scratchKey)
2617+
replicas := rangeDesc.Replicas()
2618+
t.Logf("range %v: waiting to leave joint conf", rangeDesc)
2619+
if replicas.InAtomicReplicationChange() || len(replicas.LearnerDescriptors()) != 0 {
2620+
return errors.Newf("in between atomic changes: %v", replicas)
2621+
}
2622+
return nil
2623+
})
2624+
}
2625+
2626+
scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
2627+
tc.AddVotersOrFatal(t, scratchRange.StartKey.AsRawKey(), tc.Targets(1, 2)...)
2628+
atomic.StoreInt64(&scratchRangeID, int64(scratchRange.RangeID))
2629+
lh, err := tc.FindRangeLeaseHolder(scratchRange, nil)
2630+
require.NoError(t, err)
2631+
2632+
// Rebalance the leaseholder replica to a new store. This will cause the race
2633+
// condition where the new leaseholder can enqueue a replica to replicate
2634+
// queue with high priority but compute a low priority action at processing
2635+
// time.
2636+
t.Logf("rebalancing range %d from s%d to s%d", scratchRange, lh.StoreID, newLeaseholderStoreAndNodeID)
2637+
_, err = tc.RebalanceVoter(
2638+
ctx,
2639+
scratchRange.StartKey.AsRawKey(),
2640+
roachpb.ReplicationTarget{StoreID: lh.StoreID, NodeID: lh.NodeID}, /* src */
2641+
roachpb.ReplicationTarget{StoreID: newLeaseholderStoreAndNodeID, NodeID: newLeaseholderStoreAndNodeID}, /* dest */
2642+
)
2643+
require.NoError(t, err)
2644+
2645+
// Wait until the priority inversion is detected and the replica is requeued.
2646+
testutils.SucceedsSoon(t, func() error {
2647+
store := tc.GetFirstStoreFromServer(t, 3)
2648+
if c := store.ReplicateQueueMetrics().PriorityInversionTotal.Count(); c == 0 {
2649+
return errors.New("expected non-zero priority inversion total count but got 0")
2650+
}
2651+
if c := store.ReplicateQueueMetrics().RequeueDueToPriorityInversion.Count(); c == 0 {
2652+
return errors.New("expected to requeue due to priority inversion but got 0")
2653+
}
2654+
return nil
2655+
})
2656+
}

pkg/kv/kvserver/testing_knobs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,10 @@ type StoreTestingKnobs struct {
535535
// rangeID should ignore the queue being disabled, and be processed anyway.
536536
BaseQueueDisabledBypassFilter func(rangeID roachpb.RangeID) bool
537537

538+
// BaseQueuePostEnqueueInterceptor is called with the storeID and rangeID of
539+
// the replica right after a replica is enqueued (before it is processed)
540+
BaseQueuePostEnqueueInterceptor func(storeID roachpb.StoreID, rangeID roachpb.RangeID)
541+
538542
// InjectReproposalError injects an error in tryReproposeWithNewLeaseIndexRaftMuLocked.
539543
// If nil is returned, reproposal will be attempted.
540544
InjectReproposalError func(p *ProposalData) error

0 commit comments

Comments
 (0)