Skip to content

Commit 9e0ba6b

Browse files
craig[bot]wenyihu6
andcommitted
152596: kvserver: requeue on priority inversion for replicate queue r=tbg a=wenyihu6 Part of: #152022 Release note: none --- **kvserver: add PriorityInversionRequeue** This commit adds a new cluster setting PriorityInversionRequeue that controls whether the replicate queue should requeue replicas when their priority at enqueue time differs significantly from their priority at processing time (e.g. dropping from top 3 to the lowest priority). --- **kvserver: requeue on priority inversion for replicate queue** Previously, a replica could enter the queue with high priority but, by the time it was processed, the action planned for this replica may have a low priority, causing us to perform low priority work. Specifically, we are mostly worried about cases where the priority changes from any of the repair actions to consider rebalance. Rebalancing could take a long time and block other ranges enqueued with actual repair action needed. This commit ensures that such replicas are requeued instead, avoiding priority inversions. --- **kvserver: use priorityInversionLogEveryN** Previously, replicateQueue used V(2) to log info on priority inverted replicas because I wanted visibility into every case without missing any replicas. On reflection, the individual cases aren’t that interesting - it’s the overall volume that matters, which we can track through metrics. This commit changes it so that we just rate limit priority inversions every 3 seconds. --- **kvserver: improve comments for PriorityInversionRequeue** This commit improves the comments for PriorityInversionRequeue and clarifies the contracts around action.Priority(). --- **allocator: small refactor for CheckPriorityInversion** This commit refactors CheckPriorityInversion. --- **allocator: add TestAllocatorPriorityInvariance** This commit adds the TestAllocatorPriorityInvariance test, which acts as a regression safeguard when new actions are added to AllocatorAction, ensuring the contract is upheld. See action.Priority() and ComputeAction() for more details on the contract. --- **kvserver: guard inversion check and requeue behind PriorityInversionRequeue** Previously, we introduced the PriorityInversionRequeue cluster setting, intended for backport, to handle cases where a range was enqueued with a high-priority repair action but, at processing time, a low-priority rebalance action was computed. In such cases, the caller re-adds the range to the queue under its updated priority. Although the cluster setting guards this requeue behavior, the inversion check always ran unconditionally, reducing backport safety. This commit updates the logic so that the cluster setting guard both the inversion check and the requeue behavior. --- **kvserver: move priority inversion check before applyChange** Previously, we checked for priority inversion before planning errors, which meant we could return requeue = true even when a planning error occurred. This commit changes it so that planning errors should take higher precedence over a priority inversion error. rq.processOneChange now returns early if there is a planning error and only check for priority inversion right before applying a change. --- **kvserver: check for requeue before error checking in rq.process** Previously, we checked for requeue right before returning for both nil and non-nil errors, making the code harder to follow. This commit refactors replicateQueue.process to requeue replicas before checking for errors. 152792: kvserver: add onProcessResult and onEnqueueResult to processCallback r=tbg a=wenyihu6 Part of: #151847 Epic: none --- **kvserver: use non-blocking send on errors for maybeBackpressureBatch** maybeBackpressureBatch registers a callback with the split queue for replicas that are too large relative to their split size. This backpressures the range to stop it from growing and prevent new writes from blocking a pending split. The callback is invoked when the split queue finishes processing the replica. Previously, the error channel used in the callback had a size of 1 and performed blocking sends. This was safe because the base queue only sent a single error, and by the time maybeBackpressureBatch returned, the callback was guaranteed to have been consumed, and no additional sends would occur. Future commits will allow the callback to be invoked multiple times (although it should only twice at most). To be safe and avoid potential deadlocks from multiple sends after maybeBackpressureBatch already returns, this commit makes the error send non-blocking. If the channel is already full, the error is dropped, which is acceptable since we only care about observing the completion of the replica processing at least once. --- **kvserver: return baseQueueAsyncRateLimited from bq.Async** baseQueue.Async may return immediately as a noop if the semaphore does not available capacity and the wait parameter is false. Previously, this case returned no error, leaving the caller unaware that the request was dropped. This commit changes the behavior to return a baseQueueAsyncRateLimited error, allowing callers to detect and handle the condition. --- **kvserver: add onProcessResult and onEnqueueResult to processCallback** The base queue already supports registering callbacks that are invoked with the processing result of replicas once they are processed. However, replicas may fail before reaching that stage (e.g., failing to enqueue or dropped early). This commit extends the mechanism to also report enqueue results, allowing callers to detect failures earlier. Currently, only decommissioningNudger.maybeEnqueueProblemRange uses this. Note that one behavior change is introduced: previously, a registered callback would fire only once with the processing result and not again if the replica was later processed by the purgatory queue. With this change, the callback may now be invoked twice. --- **kvserver: add TestBaseQueueCallback** This commit adds TestBaseQueueCallbackOnEnqueueResult and TestBaseQueueCallbackOnProcessResult to verify that callbacks are correctly invoked with both enqueue and process results. Co-authored-by: wenyihu6 <[email protected]>
3 parents d71deec + b632b95 + a5d7d88 commit 9e0ba6b

File tree

10 files changed

+847
-61
lines changed

10 files changed

+847
-61
lines changed

pkg/kv/kvserver/allocator/allocatorimpl/allocator.go

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ const (
139139
AllocatorConsiderRebalance
140140
AllocatorRangeUnavailable
141141
AllocatorFinalizeAtomicReplicationChange
142+
AllocatorMaxPriority
142143
)
143144

144145
// Add indicates an action adding a replica.
@@ -252,10 +253,27 @@ func (a AllocatorAction) SafeValue() {}
252253
// range. Within a given range, the ordering of the various checks inside
253254
// `Allocator.computeAction` determines which repair/rebalancing actions are
254255
// taken before the others.
256+
//
257+
// NB: Priorities should be non-negative and should be spaced in multiples of
258+
// 100 unless you believe they should belong to the same priority category.
259+
// AllocatorNoop should have the lowest priority. CheckPriorityInversion depends
260+
// on this contract. In most cases, the allocator returns a priority that
261+
// matches the definitions below. For AllocatorAddVoter,
262+
// AllocatorRemoveDeadVoter, and AllocatorRemoveVoter, the priority may be
263+
// adjusted (see ComputeAction for details), but the adjustment is expected to
264+
// be small (<49).
265+
//
266+
// Exceptions: AllocatorFinalizeAtomicReplicationChange, AllocatorRemoveLearner,
267+
// and AllocatorReplaceDeadVoter violates the spacing of 100. These cases
268+
// predate this comment, so we allow them as they belong to the same general
269+
// priority category.
255270
func (a AllocatorAction) Priority() float64 {
271+
const maxPriority = 12002
256272
switch a {
273+
case AllocatorMaxPriority:
274+
return maxPriority
257275
case AllocatorFinalizeAtomicReplicationChange:
258-
return 12002
276+
return maxPriority
259277
case AllocatorRemoveLearner:
260278
return 12001
261279
case AllocatorReplaceDeadVoter:
@@ -975,6 +993,49 @@ func (a *Allocator) ComputeAction(
975993
return action, priority
976994
}
977995

996+
// computeAction determines the action to take on a range along with its
997+
// priority.
998+
//
999+
// NB: The returned priority may include a small adjustment and therefore might
1000+
// not exactly match action.Priority(). See AllocatorAddVoter,
1001+
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter below. The adjustment should
1002+
// be <49 with two assumptions below. New uses on this contract should be
1003+
// avoided since the assumptions are not strong guarantees (especially the
1004+
// second one).
1005+
//
1006+
// The claim that the adjustment is < 49 has two assumptions:
1007+
// 1. min(num_replicas,total_nodes) in zone configuration is < 98.
1008+
// 2. when ranges are not under-replicated, the difference between
1009+
// min(num_replicas,total_nodes)/2-1 and existing_replicas is < 49.
1010+
//
1011+
// neededVoters <= min(num_replicas,total_nodes)
1012+
// desiredQuorum = neededVoters/2-1
1013+
// quorum = haveVoters/2-1
1014+
//
1015+
// For AllocatorAddVoter, we know haveVoters < neededVoters
1016+
// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
1017+
// To find the worst case (largest adjustment),
1018+
// 1. haveVoters = neededVoters-1,
1019+
// adjustment = neededVoters/2-1-(neededVoters-1)
1020+
// = neededVoters/2-neededVoters = -neededVoters/2
1021+
// 2. haveVoters = 0
1022+
// adjustement = neededVoters/2-1
1023+
//
1024+
// In order for adjustment to be <49, neededVoters/2<49 => neededVoters<98.
1025+
// Hence the first assumption.
1026+
//
1027+
// For AllocatorRemoveDeadVoter, we know haveVoters >= neededVoters
1028+
// adjustment = desiredQuorum-haveVoters = neededVoters/2-1-haveVoters
1029+
// To find the worst case (largest adjustment),
1030+
// 1. neededVoters/2-1 is much larger than haveVoters: given haveVoters >=
1031+
// neededVoters, haveVoters/2-1 >= neededVoters/2-1. So this case is impossible.
1032+
// 2. neededVoters/2-1 is much smaller than haveVoters: since ranges could be
1033+
// over-replicated, theoretically speaking, there may be no upper bounds on
1034+
// haveVoters. In order for adjustment to be < 49, we can only make an
1035+
// assumption here that the difference between neededVoters/2-1 and haveVoters
1036+
// cannot be >= 49 in this case.
1037+
//
1038+
// For AllocatorRemoveVoter, adjustment is haveVoters%2 = 0 or 1 < 49.
9781039
func (a *Allocator) computeAction(
9791040
ctx context.Context,
9801041
storePool storepool.AllocatorStorePool,
@@ -3245,3 +3306,65 @@ func replDescsToStoreIDs(descs []roachpb.ReplicaDescriptor) []roachpb.StoreID {
32453306
}
32463307
return ret
32473308
}
3309+
3310+
// roundToNearestPriorityCategory rounds a priority to the nearest 100. n should
3311+
// be non-negative.
3312+
func roundToNearestPriorityCategory(n float64) float64 {
3313+
return math.Round(n/100.0) * 100
3314+
}
3315+
3316+
// CheckPriorityInversion returns whether there was a priority inversion (and
3317+
// the range should not be processed at this time, since doing so could starve
3318+
// higher-priority items), and whether the caller should re-add the range to the
3319+
// queue (presumably under its new priority). A priority inversion happens if
3320+
// the priority at enqueue time is higher than the priority corresponding to the
3321+
// action computed at processing time. Caller should re-add the range to the
3322+
// queue if it has gone from a repair action to lowest priority
3323+
// (AllocatorConsiderRebalance).
3324+
//
3325+
// Note: Changing from AllocatorRangeUnavailable/AllocatorNoop to
3326+
// AllocatorConsiderRebalance is not treated as a priority inversion. Going from
3327+
// a repair action to AllocatorRangeUnavailable/AllocatorNoop is considered a
3328+
// priority inversion but shouldRequeue = false.
3329+
//
3330+
// INVARIANT: shouldRequeue => isInversion
3331+
func CheckPriorityInversion(
3332+
priorityAtEnqueue float64, actionAtProcessing AllocatorAction,
3333+
) (isInversion bool, shouldRequeue bool) {
3334+
// NB: priorityAtEnqueue is -1 for callers such as scatter, dry runs, and
3335+
// manual queue runs. Priority inversion does not apply to these calls.
3336+
if priorityAtEnqueue == -1 {
3337+
return false, false
3338+
}
3339+
3340+
// NB: we need to check for when priorityAtEnqueue falls within the range
3341+
// of the allocator actions because store.Enqueue might enqueue things with
3342+
// a very high priority (1e5). In those cases, we do not want to requeue
3343+
// these actions or count it as an inversion.
3344+
withinPriorityRange := func(priority float64) bool {
3345+
return AllocatorNoop.Priority() <= priority && priority <= AllocatorMaxPriority.Priority()
3346+
}
3347+
if !withinPriorityRange(priorityAtEnqueue) {
3348+
return false, false
3349+
}
3350+
3351+
if priorityAtEnqueue > AllocatorConsiderRebalance.Priority() && actionAtProcessing == AllocatorConsiderRebalance {
3352+
return true, true
3353+
}
3354+
3355+
// NB: Usually, the priority at enqueue time should correspond to
3356+
// action.Priority(). However, for AllocatorAddVoter,
3357+
// AllocatorRemoveDeadVoter, AllocatorRemoveVoter, the priority can be
3358+
// adjusted at enqueue time (See ComputeAction for more details). However, we
3359+
// expect the adjustment to be relatively small (<49). So we round the
3360+
// priority to the nearest 100 to compare against
3361+
// actionAtProcessing.Priority(). Without this rounding, we might treat going
3362+
// from 10000 to 999 as an inversion, but it was just due to the adjustment.
3363+
// Note that priorities at AllocatorFinalizeAtomicReplicationChange,
3364+
// AllocatorRemoveLearner, and AllocatorReplaceDeadVoter will be rounded to
3365+
// the same priority. They are so close to each other, so we don't really
3366+
// count it as an inversion among them.
3367+
normPriorityAtEnqueue := roundToNearestPriorityCategory(priorityAtEnqueue)
3368+
isInversion = normPriorityAtEnqueue > actionAtProcessing.Priority()
3369+
return isInversion, false
3370+
}

pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9629,3 +9629,181 @@ func TestAllocatorRebalanceTargetVoterConstraintUnsatisfied(t *testing.T) {
96299629
})
96309630
}
96319631
}
9632+
9633+
// TestRoundToNearestPriorityCategory tests the RoundToNearestPriorityCategory
9634+
// function.
9635+
func TestRoundToNearestPriorityCategory(t *testing.T) {
9636+
defer leaktest.AfterTest(t)()
9637+
9638+
testCases := []struct {
9639+
name string
9640+
input float64
9641+
expected float64
9642+
}{
9643+
{
9644+
name: "zero",
9645+
input: 0.0,
9646+
expected: 0.0,
9647+
},
9648+
{
9649+
name: "exact multiple of 100",
9650+
input: 100.0,
9651+
expected: 100.0,
9652+
},
9653+
{
9654+
name: "round down to nearest 100",
9655+
input: 149.0,
9656+
expected: 100.0,
9657+
},
9658+
{
9659+
name: "round up to nearest 100",
9660+
input: 151.0,
9661+
expected: 200.0,
9662+
},
9663+
{
9664+
name: "negative exact multiple of 100",
9665+
input: -200.0,
9666+
expected: -200.0,
9667+
},
9668+
{
9669+
name: "negative round down to nearest 100",
9670+
input: -249.0,
9671+
expected: -200.0,
9672+
},
9673+
{
9674+
name: "negative round up to nearest 100",
9675+
input: -251.0,
9676+
expected: -300.0,
9677+
},
9678+
}
9679+
9680+
for _, tc := range testCases {
9681+
t.Run(tc.name, func(t *testing.T) {
9682+
require.Equal(t, tc.expected, roundToNearestPriorityCategory(tc.input))
9683+
})
9684+
}
9685+
}
9686+
9687+
// TestCheckPriorityInversion tests the CheckPriorityInversion function.
9688+
func TestCheckPriorityInversion(t *testing.T) {
9689+
defer leaktest.AfterTest(t)()
9690+
9691+
for action := AllocatorNoop; action <= AllocatorFinalizeAtomicReplicationChange; action++ {
9692+
t.Run(action.String(), func(t *testing.T) {
9693+
if action == AllocatorConsiderRebalance || action == AllocatorNoop || action == AllocatorRangeUnavailable {
9694+
inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
9695+
require.False(t, inversion)
9696+
require.False(t, requeue)
9697+
} else {
9698+
inversion, requeue := CheckPriorityInversion(action.Priority(), AllocatorConsiderRebalance)
9699+
require.True(t, inversion)
9700+
require.True(t, requeue)
9701+
}
9702+
})
9703+
}
9704+
9705+
testCases := []struct {
9706+
name string
9707+
priorityAtEnqueue float64
9708+
actionAtProcessing AllocatorAction
9709+
expectedInversion bool
9710+
expectedRequeue bool
9711+
}{
9712+
{
9713+
name: "AllocatorNoop at processing is noop",
9714+
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
9715+
actionAtProcessing: AllocatorNoop,
9716+
expectedInversion: true,
9717+
expectedRequeue: false,
9718+
},
9719+
{
9720+
name: "AllocatorRangeUnavailable at processing is noop",
9721+
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
9722+
actionAtProcessing: AllocatorRangeUnavailable,
9723+
expectedInversion: true,
9724+
expectedRequeue: false,
9725+
},
9726+
{
9727+
name: "priority -1 bypasses",
9728+
priorityAtEnqueue: -1,
9729+
actionAtProcessing: AllocatorConsiderRebalance,
9730+
expectedInversion: false,
9731+
expectedRequeue: false,
9732+
},
9733+
{
9734+
name: "priority increase",
9735+
priorityAtEnqueue: 0,
9736+
actionAtProcessing: AllocatorFinalizeAtomicReplicationChange,
9737+
expectedInversion: false,
9738+
expectedRequeue: false,
9739+
},
9740+
{
9741+
name: "above range priority(1e5)",
9742+
priorityAtEnqueue: 1e5,
9743+
actionAtProcessing: AllocatorConsiderRebalance,
9744+
expectedInversion: false,
9745+
expectedRequeue: false,
9746+
},
9747+
{
9748+
name: "below range priority at -10",
9749+
priorityAtEnqueue: -10,
9750+
actionAtProcessing: -100,
9751+
expectedInversion: false,
9752+
expectedRequeue: false,
9753+
},
9754+
{
9755+
name: "inversion but small priority changes",
9756+
priorityAtEnqueue: AllocatorFinalizeAtomicReplicationChange.Priority(),
9757+
actionAtProcessing: AllocatorReplaceDecommissioningNonVoter,
9758+
expectedInversion: true,
9759+
expectedRequeue: false,
9760+
},
9761+
{
9762+
name: "inversion but small priority changes",
9763+
priorityAtEnqueue: AllocatorRemoveDeadVoter.Priority(),
9764+
actionAtProcessing: AllocatorAddNonVoter,
9765+
expectedInversion: true,
9766+
expectedRequeue: false,
9767+
},
9768+
{
9769+
name: "inversion but small priority changes",
9770+
priorityAtEnqueue: AllocatorConsiderRebalance.Priority(),
9771+
actionAtProcessing: AllocatorNoop,
9772+
expectedInversion: false,
9773+
expectedRequeue: false,
9774+
},
9775+
}
9776+
for _, tc := range testCases {
9777+
t.Run(tc.name, func(t *testing.T) {
9778+
inversion, requeue := CheckPriorityInversion(tc.priorityAtEnqueue, tc.actionAtProcessing)
9779+
require.Equal(t, tc.expectedInversion, inversion)
9780+
require.Equal(t, tc.expectedRequeue, requeue)
9781+
})
9782+
}
9783+
}
9784+
9785+
// TestAllocatorPriorityInvariance verifies that allocator priorities remain
9786+
// spaced in multiples of 100. This prevents regressions against the contract
9787+
// relied on by CheckPriorityInversion. For details, see the comment above
9788+
// action.Priority().
9789+
func TestAllocatorPriorityInvariance(t *testing.T) {
9790+
defer leaktest.AfterTest(t)()
9791+
9792+
exceptions := map[AllocatorAction]struct{}{
9793+
AllocatorFinalizeAtomicReplicationChange: {},
9794+
AllocatorRemoveLearner: {},
9795+
AllocatorReplaceDeadVoter: {},
9796+
}
9797+
lowestPriority := AllocatorNoop.Priority()
9798+
for action := AllocatorNoop; action < AllocatorMaxPriority; action++ {
9799+
require.GreaterOrEqualf(t, action.Priority(), lowestPriority,
9800+
"priority %f is less than AllocatorNoop: likely violating contract",
9801+
action.Priority())
9802+
if _, ok := exceptions[action]; !ok {
9803+
require.Equalf(t, int(action.Priority())%100, 0,
9804+
"priority %f is not a multiple of 100: likely violating contract",
9805+
action.Priority())
9806+
9807+
}
9808+
}
9809+
}

pkg/kv/kvserver/mvcc_gc_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -878,7 +878,7 @@ func (mgcq *mvccGCQueue) scanReplicasForHiPriGCHints(
878878
if !isLeaseHolder {
879879
return true
880880
}
881-
added, _ := mgcq.addInternal(ctx, desc, replica.ReplicaID(), deleteRangePriority)
881+
added, _ := mgcq.addInternal(ctx, desc, replica.ReplicaID(), deleteRangePriority, noopProcessCallback)
882882
if added {
883883
mgcq.store.metrics.GCEnqueueHighPriority.Inc(1)
884884
foundReplicas++

0 commit comments

Comments
 (0)