Skip to content

Commit 4acb38f

Browse files
committed
kvserver: requeue on priority inversion for replicate queue
Previously, a replica could enter the queue with high priority but, by the time it was processed, the action planned for this replica may have a low priority, causing us to perform low priority work. Specifically, we are mostly worried about cases where the priority changes from any of the repair actions to consider rebalance. Rebalancing could take a long time and block other ranges enqueued with actual repair action needed. This commit ensures that such replicas are requeued instead, avoiding priority inversions.
1 parent 745c66e commit 4acb38f

File tree

3 files changed

+224
-0
lines changed

3 files changed

+224
-0
lines changed

pkg/kv/kvserver/allocator/allocatorimpl/allocator.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3245,3 +3245,62 @@ func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID {
32453245
}
32463246
return ret
32473247
}
3248+
3249+
// roundToNearestPriorityCategory rounds a priority to the nearest 100. n should be non-negative.
3250+
func roundToNearestPriorityCategory(n float64) float64 {
3251+
return math.Round(n/100.0) * 100
3252+
}
3253+
3254+
// WithinPriorityRange checks if a priority is within the range of possible
3255+
// priorities for the allocator actions.
3256+
func withinPriorityRange(priority float64) bool {
3257+
return AllocatorNoop.Priority() <= priority && priority <= AllocatorFinalizeAtomicReplicationChange.Priority()
3258+
}
3259+
3260+
// CheckPriorityInversion checks if the priority at enqueue time is higher than
3261+
// the priority corresponding to the action computed at processing time. It
3262+
// returns whether there was a priority inversion and whether the caller should
3263+
// skip the processing of the range since the inversion is considered unfair.
3264+
// Currently, we only consider the inversion as unfair if it has gone from a
3265+
// repair action to lowest priority (AllocatorConsiderRebalance). We let
3266+
// AllocatorRangeUnavailable, AllocatorNoop pass through since they are noop.
3267+
//
3268+
// NB: If shouldRequeue is true, isInversion must be true.
3269+
func CheckPriorityInversion(
3270+
priorityAtEnqueue float64, actionAtProcessing AllocatorAction,
3271+
) (isInversion bool, shouldRequeue bool) {
3272+
// priorityAtEnqueue of -1 is a special case reserved for processing logic to
3273+
// run even if there’s a priority inversion. If the priority is not -1, the
3274+
// range may be re-queued to be processed with the correct priority. It is
3275+
// used for things that call into baseQueue.process without going through the
3276+
// replicate priority queue. For example, s.ReplicateQueueDryRun or
3277+
// r.scatterRangeAndRandomizeLeases.
3278+
3279+
// NB: we need to check for when priorityAtEnqueue falls within the range
3280+
// of the allocator actions because store.Enqueue might enqueue things with
3281+
// a very high priority (1e5). In those cases, we do not want to requeue
3282+
// these actions or count it as an inversion.
3283+
if priorityAtEnqueue == -1 || !withinPriorityRange(priorityAtEnqueue) {
3284+
return false, false
3285+
}
3286+
3287+
if priorityAtEnqueue > AllocatorConsiderRebalance.Priority() && actionAtProcessing == AllocatorConsiderRebalance {
3288+
return true, true
3289+
}
3290+
3291+
// NB: Usually, the priority at enqueue time should correspond to
3292+
// action.Priority(). However, for AllocatorAddVoter,
3293+
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter, the priority can be
3294+
// adjusted at enqueue time (See ComputeAction for more details). However, we
3295+
// expect the adjustment to be relatively small (<100). So we round the
3296+
// priority to the nearest 100 to compare against
3297+
// actionAtProcessing.Priority(). Without this rounding, we might treat going
3298+
// from 10000 to 999 as an inversion, but it was just due to the adjustment.
3299+
// Note that priorities at AllocatorFinalizeAtomicReplicationChange,
3300+
// AllocatorRemoveLearner, and AllocatorReplaceDeadVoter will be rounded to
3301+
// the same priority. They are so close to each other, so we don't really
3302+
// count it as an inversion among them.
3303+
normPriorityAtEnqueue := roundToNearestPriorityCategory(priorityAtEnqueue)
3304+
isInversion = normPriorityAtEnqueue > actionAtProcessing.Priority()
3305+
return isInversion, false
3306+
}

pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9629,3 +9629,148 @@ func TestAllocatorRebalanceTargetVoterConstraintUnsatisfied(t *testing.T) {
96299629
})
96309630
}
96319631
}
9632+
9633+
// TestRoundToNearestPriorityCategory tests the RoundToNearestPriorityCategory
9634+
// function.
9635+
func TestRoundToNearestPriorityCategory(t *testing.T) {
9636+
defer leaktest.AfterTest(t)()
9637+
9638+
testCases := []struct {
9639+
name string
9640+
input float64
9641+
expected float64
9642+
}{
9643+
{
9644+
name: "zero",
9645+
input: 0.0,
9646+
expected: 0.0,
9647+
},
9648+
{
9649+
name: "exact multiple of 100",
9650+
input: 100.0,
9651+
expected: 100.0,
9652+
},
9653+
{
9654+
name: "round down to nearest 100",
9655+
input: 149.0,
9656+
expected: 100.0,
9657+
},
9658+
{
9659+
name: "round up to nearest 100",
9660+
input: 151.0,
9661+
expected: 200.0,
9662+
},
9663+
{
9664+
name: "negative exact multiple of 100",
9665+
input: -200.0,
9666+
expected: -200.0,
9667+
},
9668+
{
9669+
name: "negative round down to nearest 100",
9670+
input: -249.0,
9671+
expected: -200.0,
9672+
},
9673+
{
9674+
name: "negative round up to nearest 100",
9675+
input: -251.0,
9676+
expected: -300.0,
9677+
},
9678+
}
9679+
9680+
for _, tc := range testCases {
9681+
t.Run(tc.name, func(t *testing.T) {
9682+
require.Equal(t, tc.expected, roundToNearestPriorityCategory(tc.input))
9683+
})
9684+
}
9685+
}
9686+
9687+
// TestCheckPriorityInversion tests the CheckPriorityInversion function.
9688+
func TestCheckPriorityInversion(t *testing.T) {
9689+
defer leaktest.AfterTest(t)()
9690+
9691+
for action := AllocatorNoop; action <= AllocatorFinalizeAtomicReplicationChange; action++ {
9692+
t.Run(action.String(), func(t *testing.T) {
9693+
if action == AllocatorConsiderRebalance || action == AllocatorNoop || action == AllocatorRangeUnavailable {
9694+
inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
9695+
require.False(t, inversion)
9696+
require.False(t, requeue)
9697+
} else {
9698+
inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
9699+
require.True(t, inversion)
9700+
require.True(t, requeue)
9701+
}
9702+
})
9703+
}
9704+
9705+
testCases := []struct {
9706+
name string
9707+
priorityAtEnqueue float64
9708+
actionAtProcessing AllocatorAction
9709+
expectedInversion bool
9710+
expectedRequeue bool
9711+
}{
9712+
{
9713+
name: "AllocatorNoop at processing is noop",
9714+
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
9715+
actionAtProcessing: AllocatorNoop,
9716+
expectedInversion: true,
9717+
expectedRequeue: false,
9718+
},
9719+
{
9720+
name: "AllocatorRangeUnavailable at processing is noop",
9721+
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
9722+
actionAtProcessing: AllocatorRangeUnavailable,
9723+
expectedInversion: true,
9724+
expectedRequeue: false,
9725+
},
9726+
{
9727+
name: "priority -1 bypasses",
9728+
priorityAtEnqueue: -1,
9729+
actionAtProcessing: AllocatorConsiderRebalance,
9730+
expectedInversion: false,
9731+
expectedRequeue: false,
9732+
},
9733+
{
9734+
name: "above range priority(1e5)",
9735+
priorityAtEnqueue: 1e5,
9736+
actionAtProcessing: AllocatorConsiderRebalance,
9737+
expectedInversion: false,
9738+
expectedRequeue: false,
9739+
},
9740+
{
9741+
name: "below range priority at -10",
9742+
priorityAtEnqueue: -10,
9743+
actionAtProcessing: -100,
9744+
expectedInversion: false,
9745+
expectedRequeue: false,
9746+
},
9747+
{
9748+
name: "inversion but small priority changes",
9749+
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
9750+
actionAtProcessing: AllocatorReplaceDecommissioningNonVoter,
9751+
expectedInversion: true,
9752+
expectedRequeue: false,
9753+
},
9754+
{
9755+
name: "inversion but small priority changes",
9756+
priorityAtEnqueue: AllocatorRemoveDeadVoter.Priority(),
9757+
actionAtProcessing: AllocatorAddNonVoter,
9758+
expectedInversion: true,
9759+
expectedRequeue: false,
9760+
},
9761+
{
9762+
name: "inversion but small priority changes",
9763+
priorityAtEnqueue: AllocatorConsiderRebalance.Priority(),
9764+
actionAtProcessing: AllocatorNoop,
9765+
expectedInversion: false,
9766+
expectedRequeue: false,
9767+
},
9768+
}
9769+
for _, tc := range testCases {
9770+
t.Run(tc.name, func(t *testing.T) {
9771+
inversion, requeue := CheckPriorityInversion(tc.priorityAtEnqueue, tc.actionAtProcessing)
9772+
require.Equal(t, tc.expectedInversion, inversion)
9773+
require.Equal(t, tc.expectedRequeue, requeue)
9774+
})
9775+
}
9776+
}

pkg/kv/kvserver/replicate_queue.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,10 @@ func (rq *replicateQueue) process(
722722
}
723723

724724
if err != nil {
725+
if requeue {
726+
log.KvDistribution.VEventf(ctx, 1, "re-queuing on errors: %v", err)
727+
rq.maybeAdd(ctx, repl, rq.store.Clock().NowAsClockTimestamp())
728+
}
725729
return false, err
726730
}
727731

@@ -916,6 +920,22 @@ func (rq *replicateQueue) processOneChange(
916920
) (requeue bool, _ error) {
917921
change, err := rq.planner.PlanOneChange(
918922
ctx, repl, desc, conf, plan.PlannerOptions{Scatter: scatter})
923+
924+
inversion, shouldRequeue := allocatorimpl.CheckPriorityInversion(priorityAtEnqueue, change.Action)
925+
if inversion {
926+
log.KvDistribution.VInfof(ctx, 2,
927+
"priority inversion during process: shouldRequeue = %t action=%s, priority=%v, enqueuePriority=%v",
928+
shouldRequeue, change.Action, change.Action.Priority(), priorityAtEnqueue)
929+
if shouldRequeue && PriorityInversionRequeue.Get(&rq.store.cfg.Settings.SV) {
930+
// Return true here to requeue the range. We can't return an error here
931+
// because rq.process only requeue when error is nil. See
932+
// replicateQueue.process for more details.
933+
return true /*requeue*/, maybeAnnotateDecommissionErr(
934+
errors.Errorf("requing due to priority inversion: action=%s, priority=%v, enqueuePriority=%v",
935+
change.Action, change.Action.Priority(), priorityAtEnqueue), change.Action)
936+
}
937+
}
938+
919939
// When there is an error planning a change, return the error immediately
920940
// and do not requeue. It is unlikely that the range or storepool state
921941
// will change quickly enough in order to not get the same error and

0 commit comments

Comments
 (0)