@@ -2539,3 +2539,119 @@ func TestReplicateQueueDecommissionScannerDisabled(t *testing.T) {
2539
2539
afterProcessSuccess := getDecommissioningNudgerMetricValue (t , tc , "process_success" )
2540
2540
require .Greater (t , afterProcessSuccess , int64 (0 ))
2541
2541
}
2542
+
2543
+ // TestPriorityInversionRequeue tests that the replicate queue correctly handles
2544
+ // priority inversions by requeuing replicas when the PriorityInversionRequeue
2545
+ // setting is enabled.
2546
+ //
2547
+ // This test specifically targets a race condition where:
2548
+ // 1. A replica is enqueued for a high-priority repair action
2549
+ // (FinalizeAtomicReplicationChange or RemoveLearner).
2550
+ // 2. By the time the replica is processed, the repair is no longer needed and
2551
+ // only a low-priority rebalance action (ConsiderRebalance) is computed.
2552
+ // 3. This creates a priority inversion where a low-priority action blocks
2553
+ // other higher-priority replicas in the queue from being processed.
2554
+ //
2555
+ // The race occurs during range rebalancing:
2556
+ // 1. A leaseholder replica of a range is rebalanced from one store to another.
2557
+ // 2. The new leaseholder enqueues the replica for repair (e.g. to finalize
2558
+ // the atomic replication change or remove a learner replica).
2559
+ // 3. Before processing, the old leaseholder has left the atomic joint config
2560
+ // state or removed the learner replica. 4. When the new leaseholder processes
2561
+ // the replica, it computes a ConsiderRebalance action, causing priority
2562
+ // inversion.
2563
+ //
2564
+ // With PriorityInversionRequeue enabled, the queue should detect this condition
2565
+ // and requeue the replica at the correct priority. The test validates this
2566
+ // behavior through metrics that track priority inversions and requeuing events.
2567
+ func TestPriorityInversionRequeue (t * testing.T ) {
2568
+ defer leaktest .AfterTest (t )()
2569
+ defer log .Scope (t ).Close (t )
2570
+
2571
+ ctx := context .Background ()
2572
+ settings := cluster .MakeTestingClusterSettings ()
2573
+ kvserver .PriorityInversionRequeue .Override (ctx , & settings .SV , true )
2574
+
2575
+ var scratchRangeID int64
2576
+ atomic .StoreInt64 (& scratchRangeID , - 1 )
2577
+ require .NoError (t , log .SetVModule ("queue=5,replicate_queue=5,replica_command=5,replicate=5,replica=5" ))
2578
+
2579
+ const newLeaseholderStoreAndNodeID = 4
2580
+ var waitUntilLeavingJoint = func () {}
2581
+
2582
+ tc := testcluster .StartTestCluster (t , 4 , base.TestClusterArgs {
2583
+ ReplicationMode : base .ReplicationManual ,
2584
+ ServerArgs : base.TestServerArgs {
2585
+ Settings : settings ,
2586
+ Knobs : base.TestingKnobs {
2587
+ Store : & kvserver.StoreTestingKnobs {
2588
+ BaseQueueDisabledBypassFilter : func (rangeID roachpb.RangeID ) bool {
2589
+ // Disable the replicate queue except for the scratch range on the new leaseholder.
2590
+ t .Logf ("range %d is added to replicate queue store" , rangeID )
2591
+ return rangeID == roachpb .RangeID (atomic .LoadInt64 (& scratchRangeID ))
2592
+ },
2593
+ BaseQueuePostEnqueueInterceptor : func (storeID roachpb.StoreID , rangeID roachpb.RangeID ) {
2594
+ // After enqueuing, wait for the old leaseholder to leave the atomic
2595
+ // joint config state or remove the learner replica to force the
2596
+ // priority inversion.
2597
+ t .Logf ("waiting for %d to leave joint config" , rangeID )
2598
+ if storeID == 4 && rangeID == roachpb .RangeID (atomic .LoadInt64 (& scratchRangeID )) {
2599
+ waitUntilLeavingJoint ()
2600
+ }
2601
+ },
2602
+ },
2603
+ },
2604
+ },
2605
+ })
2606
+ defer tc .Stopper ().Stop (ctx )
2607
+
2608
+ scratchKey := tc .ScratchRange (t )
2609
+
2610
+ // Wait until the old leaseholder has left the atomic joint config state or
2611
+ // removed the learner replica.
2612
+ waitUntilLeavingJoint = func () {
2613
+ testutils .SucceedsSoon (t , func () error {
2614
+ rangeDesc := tc .LookupRangeOrFatal (t , scratchKey )
2615
+ replicas := rangeDesc .Replicas ()
2616
+ t .Logf ("range %v: waiting to leave joint conf" , rangeDesc )
2617
+ if replicas .InAtomicReplicationChange () || len (replicas .LearnerDescriptors ()) != 0 {
2618
+ return errors .Newf ("in between atomic changes: %v" , replicas )
2619
+ }
2620
+ return nil
2621
+ })
2622
+ }
2623
+
2624
+ scratchRange := tc .LookupRangeOrFatal (t , scratchKey )
2625
+ tc .AddVotersOrFatal (t , scratchRange .StartKey .AsRawKey (), tc .Targets (1 , 2 )... )
2626
+ atomic .StoreInt64 (& scratchRangeID , int64 (scratchRange .RangeID ))
2627
+ lh , err := tc .FindRangeLeaseHolder (scratchRange , nil )
2628
+ require .NoError (t , err )
2629
+
2630
+ // Rebalance the leaseholder replica to a new store. This will cause the race
2631
+ // condition where the new leaseholder can enqueue a replica to replicate
2632
+ // queue with high priority but compute a low priority action at processing
2633
+ // time.
2634
+ t .Logf ("rebalancing range %d from s%d to s%d" , scratchRange , lh .StoreID , newLeaseholderStoreAndNodeID )
2635
+ _ , err = tc .RebalanceVoter (
2636
+ ctx ,
2637
+ scratchRange .StartKey .AsRawKey (),
2638
+ roachpb.ReplicationTarget {StoreID : lh .StoreID , NodeID : lh .NodeID }, /* src */
2639
+ roachpb.ReplicationTarget {StoreID : newLeaseholderStoreAndNodeID , NodeID : newLeaseholderStoreAndNodeID }, /* dest */
2640
+ )
2641
+ require .NoError (t , err )
2642
+
2643
+ // Wait until the priority inversion is detected and the replica is requeued.
2644
+ testutils .SucceedsSoon (t , func () error {
2645
+ store := tc .GetFirstStoreFromServer (t , 3 )
2646
+ if c := store .ReplicateQueueMetrics ().PriorityInversionTotal .Count (); c == 0 {
2647
+ return errors .New ("expected non-zero priority inversion total count but got 0" )
2648
+ }
2649
+ if c := store .ReplicateQueueMetrics ().PriorityInversionForConsiderRebalance .Count (); c == 0 {
2650
+ return errors .New ("expected non-zero priority inversion count for consider rebalance but got 0" )
2651
+ }
2652
+ if c := store .ReplicateQueueMetrics ().RequeueDueToPriorityInversion .Count (); c == 0 {
2653
+ return errors .New ("expected to requeue due to priority inversion but got 0" )
2654
+ }
2655
+ return nil
2656
+ })
2657
+ }
0 commit comments