Skip to content

Commit 9315a46

Browse files
committed
kvserver: add TestPriorityInversionRequeue
Previously, we added priority inversion requeuing mechanism. This commit adds a unit test that forces the race condition we suspected to be happening in escalations involving priority inversion and asserts that priority inversion occurs and that the replica is correctly requeued. Test set up: 1. range’s leaseholder replica is rebalanced from one store to another. 2. new leaseholder enqueues the replica for repair with high priority (e.g. to finalize the atomic replication change or remove a learner replica) 3. before processing, the old leaseholder completes the change (exits the joint config or removes the learner). 4. when the new leaseholder processes the replica, it computes a ConsiderRebalance action, resulting in a priority inversion and potentially blocking other high-priority work.
1 parent 44fe79f commit 9315a46

File tree

3 files changed

+123
-0
lines changed

3 files changed

+123
-0
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,6 +1001,9 @@ func (bq *baseQueue) addInternal(
10011001
default:
10021002
// No need to signal again.
10031003
}
1004+
if postEnqueueInterceptor := bq.store.TestingKnobs().BaseQueuePostEnqueueInterceptor; postEnqueueInterceptor != nil {
1005+
postEnqueueInterceptor(bq.store.StoreID(), desc.RangeID)
1006+
}
10041007
// Note that we are bumping enqueueAdd here instead of during defer to avoid
10051008
// treating requeuing a processing replica as newly added. They will be
10061009
// re-added to the queue later which will double count them.

pkg/kv/kvserver/replicate_queue_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2540,3 +2540,119 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
25402540
afterProcessSuccess := getDecommissioningNudgerMetricValue(t, tc, "process_success")
25412541
require.Greater(t, afterProcessSuccess, int64(0))
25422542
}
2543+
2544+
// TestPriorityInversionRequeue tests that the replicate queue correctly handles
2545+
// priority inversions by requeuing replicas when the PriorityInversionRequeue
2546+
// setting is enabled.
2547+
//
2548+
// This test specifically targets a race condition where:
2549+
// 1. A replica is enqueued for a high-priority repair action
2550+
// (FinalizeAtomicReplicationChange or RemoveLearner).
2551+
// 2. By the time the replica is processed, the repair is no longer needed and
2552+
// only a low-priority rebalance action (ConsiderRebalance) is computed.
2553+
// 3. This creates a priority inversion where a low-priority action blocks
2554+
// other higher-priority replicas in the queue from being processed.
2555+
//
2556+
// The race occurs during range rebalancing:
2557+
// 1. A leaseholder replica of a range is rebalanced from one store to another.
2558+
// 2. The new leaseholder enqueues the replica for repair (e.g. to finalize
2559+
// the atomic replication change or remove a learner replica).
2560+
// 3. Before processing, the old leaseholder has left the atomic joint config
2561+
// state or removed the learner replica. 4. When the new leaseholder processes
2562+
// the replica, it computes a ConsiderRebalance action, causing priority
2563+
// inversion.
2564+
//
2565+
// With PriorityInversionRequeue enabled, the queue should detect this condition
2566+
// and requeue the replica at the correct priority. The test validates this
2567+
// behavior through metrics that track priority inversions and requeuing events.
2568+
func TestPriorityInversionRequeue(t *testing.T) {
2569+
defer leaktest.AfterTest(t)()
2570+
defer log.Scope(t).Close(t)
2571+
2572+
ctx := context.Background()
2573+
settings := cluster.MakeTestingClusterSettings()
2574+
kvserver.PriorityInversionRequeue.Override(ctx, &settings.SV, true)
2575+
2576+
var scratchRangeID int64
2577+
atomic.StoreInt64(&scratchRangeID, -1)
2578+
require.NoError(t, log.SetVModule("queue=5,replicate_queue=5,replica_command=5,replicate=5,replica=5"))
2579+
2580+
const newLeaseholderStoreAndNodeID = 4
2581+
var waitUntilLeavingJoint = func() {}
2582+
2583+
tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
2584+
ReplicationMode: base.ReplicationManual,
2585+
ServerArgs: base.TestServerArgs{
2586+
Settings: settings,
2587+
Knobs: base.TestingKnobs{
2588+
Store: &kvserver.StoreTestingKnobs{
2589+
BaseQueueDisabledBypassFilter: func(rangeID roachpb.RangeID) bool {
2590+
// Disable the replicate queue except for the scratch range on the new leaseholder.
2591+
t.Logf("range %d is added to replicate queue store", rangeID)
2592+
return rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID))
2593+
},
2594+
BaseQueuePostEnqueueInterceptor: func(storeID roachpb.StoreID, rangeID roachpb.RangeID) {
2595+
// After enqueuing, wait for the old leaseholder to leave the atomic
2596+
// joint config state or remove the learner replica to force the
2597+
// priority inversion.
2598+
t.Logf("waiting for %d to leave joint config", rangeID)
2599+
if storeID == 4 && rangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) {
2600+
waitUntilLeavingJoint()
2601+
}
2602+
},
2603+
},
2604+
},
2605+
},
2606+
})
2607+
defer tc.Stopper().Stop(ctx)
2608+
2609+
scratchKey := tc.ScratchRange(t)
2610+
2611+
// Wait until the old leaseholder has left the atomic joint config state or
2612+
// removed the learner replica.
2613+
waitUntilLeavingJoint = func() {
2614+
testutils.SucceedsSoon(t, func() error {
2615+
rangeDesc := tc.LookupRangeOrFatal(t, scratchKey)
2616+
replicas := rangeDesc.Replicas()
2617+
t.Logf("range %v: waiting to leave joint conf", rangeDesc)
2618+
if replicas.InAtomicReplicationChange() || len(replicas.LearnerDescriptors()) != 0 {
2619+
return errors.Newf("in between atomic changes: %v", replicas)
2620+
}
2621+
return nil
2622+
})
2623+
}
2624+
2625+
scratchRange := tc.LookupRangeOrFatal(t, scratchKey)
2626+
tc.AddVotersOrFatal(t, scratchRange.StartKey.AsRawKey(), tc.Targets(1, 2)...)
2627+
atomic.StoreInt64(&scratchRangeID, int64(scratchRange.RangeID))
2628+
lh, err := tc.FindRangeLeaseHolder(scratchRange, nil)
2629+
require.NoError(t, err)
2630+
2631+
// Rebalance the leaseholder replica to a new store. This will cause the race
2632+
// condition where the new leaseholder can enqueue a replica to replicate
2633+
// queue with high priority but compute a low priority action at processing
2634+
// time.
2635+
t.Logf("rebalancing range %d from s%d to s%d", scratchRange, lh.StoreID, newLeaseholderStoreAndNodeID)
2636+
_, err = tc.RebalanceVoter(
2637+
ctx,
2638+
scratchRange.StartKey.AsRawKey(),
2639+
roachpb.ReplicationTarget{StoreID: lh.StoreID, NodeID: lh.NodeID}, /* src */
2640+
roachpb.ReplicationTarget{StoreID: newLeaseholderStoreAndNodeID, NodeID: newLeaseholderStoreAndNodeID}, /* dest */
2641+
)
2642+
require.NoError(t, err)
2643+
2644+
// Wait until the priority inversion is detected and the replica is requeued.
2645+
testutils.SucceedsSoon(t, func() error {
2646+
store := tc.GetFirstStoreFromServer(t, 3)
2647+
if c := store.ReplicateQueueMetrics().PriorityInversionTotal.Count(); c == 0 {
2648+
return errors.New("expected non-zero priority inversion total count but got 0")
2649+
}
2650+
if c := store.ReplicateQueueMetrics().PriorityInversionForConsiderRebalance.Count(); c == 0 {
2651+
return errors.New("expected non-zero priority inversion count for consider rebalance but got 0")
2652+
}
2653+
if c := store.ReplicateQueueMetrics().RequeueDueToPriorityInversion.Count(); c == 0 {
2654+
return errors.New("expected to requeue due to priority inversion but got 0")
2655+
}
2656+
return nil
2657+
})
2658+
}

pkg/kv/kvserver/testing_knobs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,10 @@ type StoreTestingKnobs struct {
535535
// rangeID should ignore the queue being disabled, and be processed anyway.
536536
BaseQueueDisabledBypassFilter func(rangeID roachpb.RangeID) bool
537537

538+
// BaseQueuePostEnqueueInterceptor is called with the storeID and rangeID of
539+
// the replica right after a replica is enqueued (before it is processed)
540+
BaseQueuePostEnqueueInterceptor func(storeID roachpb.StoreID, rangeID roachpb.RangeID)
541+
538542
// InjectReproposalError injects an error in tryReproposeWithNewLeaseIndexRaftMuLocked.
539543
// If nil is returned, reproposal will be attempted.
540544
InjectReproposalError func(p *ProposalData) error

0 commit comments

Comments
 (0)