Skip to content

Commit c354041

Browse files
committed
kvserver: add TestPriorityInversionRequeue
Previously, we added priority inversion requeuing mechanism. This commit adds a unit test that forces the race condition we suspected to be happening in escalations involving priority inversion and asserts that priority inversion occurs and that the replica is correctly requeued. Test set up: 1. range’s leaseholder replica is rebalanced from one store to another. 2. new leaseholder enqueues the replica for repair with high priority (e.g. to finalize the atomic replication change or remove a learner replica) 3. before processing, the old leaseholder completes the change (exits the joint config or removes the learner). 4. when the new leaseholder processes the replica, it computes a ConsiderRebalance action, resulting in a priority inversion and potentially blocking other high-priority work.
1 parent b7eeaee commit c354041

File tree

3 files changed

+123
-0
lines changed

3 files changed

+123
-0
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,9 @@ func (bq *baseQueue) addInternal(
10051005
default:
10061006
// No need to signal again.
10071007
}
1008+
if postEnqueueInterceptor := bq.store.TestingKnobs().BaseQueuePostEnqueueInterceptor; postEnqueueInterceptor != nil {
1009+
postEnqueueInterceptor(bq.store.StoreID(), desc.RangeID)
1010+
}
10081011
// Note that we are bumping enqueueAdd here instead of during defer to avoid
10091012
// treating requeuing a processing replica as newly added. They will be
10101013
// re-added to the queue later which will double count them.

pkg/kv/kvserver/replicate_queue_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2537,3 +2537,119 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
25372537
afterProcessSuccess := getDecommissioningNudgerMetricValue(t, tc, "process_success")
25382538
require.Greater(t, afterProcessSuccess, int64(0))
25392539
}
2540+
2541+
// TestPriorityInversionRequeue tests that the replicate queue correctly handles
2542+
// priority inversions by requeuing replicas when the PriorityInversionRequeue
2543+
// setting is enabled.
2544+
//
2545+
// This test specifically targets a race condition where:
2546+
// 1. A replica is enqueued for a high-priority repair action
2547+
// (FinalizeAtomicReplicationChange or RemoveLearner).
2548+
// 2. By the time the replica is processed, the repair is no longer needed and
2549+
// only a low-priority rebalance action (ConsiderRebalance) is computed.
2550+
// 3. This creates a priority inversion where a low-priority action blocks
2551+
// other higher-priority replicas in the queue from being processed.
2552+
//
2553+
// The race occurs during range rebalancing:
2554+
// 1. A leaseholder replica of a range is rebalanced from one store to another.
2555+
// 2. The new leaseholder enqueues the replica for repair (e.g. to finalize
2556+
// the atomic replication change or remove a learner replica).
2557+
// 3. Before processing, the old leaseholder has left the atomic joint config
2558+
// state or removed the learner replica. 4. When the new leaseholder processes
2559+
// the replica, it computes a ConsiderRebalance action, causing priority
2560+
// inversion.
2561+
//
2562+
// With PriorityInversionRequeue enabled, the queue should detect this condition
2563+
// and requeue the replica at the correct priority. The test validates this
2564+
// behavior through metrics that track priority inversions and requeuing events.
2565+
func TestPriorityInversionRequeue(t *testing.T) {
2566+
defer leaktest.AfterTest(t)()
2567+
defer log.Scope(t).Close(t)
2568+
2569+
ctx := context.Background()
2570+
settings := cluster.MakeTestingClusterSettings()
2571+
kvserver.PriorityInversionRequeue.Override(ctx, &settings.SV, true)
2572+
2573+
var scratchRangeID int64
2574+
atomic.StoreInt64(&scratchRangeID, -1)
2575+
require.NoError(t, log.SetVModule("queue=5,replicate_queue=5,replica_command=5,replicate=5,replica=5"))
2576+
2577+
const newLeaseholderStoreAndNodeID = 4
2578+
var waitUntilLeavingJoint = func() {}
2579+
2580+
tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
2581+
ReplicationMode: base.ReplicationManual,
2582+
ServerArgs: base.TestServerArgs{
2583+
Settings: settings,
2584+
Knobs: base.TestingKnobs{
2585+
Store: &kvserver.StoreTestingKnobs{
2586+
BaseQueueDisabledBypassFilter: func(rangeID roachpb.RangeID) bool {
2587+
// Disable the replicate queue except for the scratch range on the new leaseholder.
2588+
t.Logf("range %d is added to replicate queue store", rangeID)
2589+
return rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID))
2590+
},
2591+
BaseQueuePostEnqueueInterceptor: func(storeID roachpb.StoreID, rangeID roachpb.RangeID) {
2592+
// After enqueuing, wait for the old leaseholder to leave the atomic
2593+
// joint config state or remove the learner replica to force the
2594+
// priority inversion.
2595+
t.Logf("waiting for %d to leave joint config", rangeID)
2596+
if storeID == 4 && rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) {
2597+
waitUntilLeavingJoint()
2598+
}
2599+
},
2600+
},
2601+
},
2602+
},
2603+
})
2604+
defer tc.Stopper().Stop(ctx)
2605+
2606+
scratchKey := tc.ScratchRange(t)
2607+
2608+
// Wait until the old leaseholder has left the atomic joint config state or
2609+
// removed the learner replica.
2610+
waitUntilLeavingJoint = func() {
2611+
testutils.SucceedsSoon(t, func() error {
2612+
rangeDesc := tc.LookupRangeOrFatal(t, scratchKey)
2613+
replicas := rangeDesc.Replicas()
2614+
t.Logf("range %v: waiting to leave joint conf", rangeDesc)
2615+
if replicas.InAtomicReplicationChange() || len(replicas.LearnerDescriptors()) != 0 {
2616+
return errors.Newf("in between atomic changes: %v", replicas)
2617+
}
2618+
return nil
2619+
})
2620+
}
2621+
2622+
scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
2623+
tc.AddVotersOrFatal(t, scratchRange.StartKey.AsRawKey(), tc.Targets(1, 2)...)
2624+
atomic.StoreInt64(&scratchRangeID, int64(scratchRange.RangeID))
2625+
lh, err := tc.FindRangeLeaseHolder(scratchRange, nil)
2626+
require.NoError(t, err)
2627+
2628+
// Rebalance the leaseholder replica to a new store. This will cause the race
2629+
// condition where the new leaseholder can enqueue a replica to replicate
2630+
// queue with high priority but compute a low priority action at processing
2631+
// time.
2632+
t.Logf("rebalancing range %d from s%d to s%d", scratchRange, lh.StoreID, newLeaseholderStoreAndNodeID)
2633+
_, err = tc.RebalanceVoter(
2634+
ctx,
2635+
scratchRange.StartKey.AsRawKey(),
2636+
roachpb.ReplicationTarget{StoreID: lh.StoreID, NodeID: lh.NodeID}, /* src */
2637+
roachpb.ReplicationTarget{StoreID: newLeaseholderStoreAndNodeID, NodeID: newLeaseholderStoreAndNodeID}, /* dest */
2638+
)
2639+
require.NoError(t, err)
2640+
2641+
// Wait until the priority inversion is detected and the replica is requeued.
2642+
testutils.SucceedsSoon(t, func() error {
2643+
store := tc.GetFirstStoreFromServer(t, 3)
2644+
if c := store.ReplicateQueueMetrics().PriorityInversionTotal.Count(); c == 0 {
2645+
return errors.New("expected non-zero priority inversion total count but got 0")
2646+
}
2647+
if c := store.ReplicateQueueMetrics().PriorityInversionForConsiderRebalance.Count(); c == 0 {
2648+
return errors.New("expected non-zero priority inversion count for consider rebalance but got 0")
2649+
}
2650+
if c := store.ReplicateQueueMetrics().RequeueDueToPriorityInversion.Count(); c == 0 {
2651+
return errors.New("expected to requeue due to priority inversion but got 0")
2652+
}
2653+
return nil
2654+
})
2655+
}

pkg/kv/kvserver/testing_knobs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,10 @@ type StoreTestingKnobs struct {
527527
// rangeID should ignore the queue being disabled, and be processed anyway.
528528
BaseQueueDisabledBypassFilter func(rangeID roachpb.RangeID) bool
529529

530+
// BaseQueuePostEnqueueInterceptor is called with the storeID and rangeID of
531+
// the replica right after a replica is enqueued (before it is processed)
532+
BaseQueuePostEnqueueInterceptor func(storeID roachpb.StoreID, rangeID roachpb.RangeID)
533+
530534
// InjectReproposalError injects an error in tryReproposeWithNewLeaseIndexRaftMuLocked.
531535
// If nil is returned, reproposal will be attempted.
532536
InjectReproposalError func(p *ProposalData) error

0 commit comments

Comments
 (0)