Skip to content

Commit 7d9f922

Browse files
committed
kvserver: invoke callback when dropping replicas in bq.SetMaxSize
Previously, when SetMaxSize shrank the queue, replicas were dropped from the priority queue without invoking their callbacks. This commit ensures callbacks are properly invoked when SetMaxSize drops replicas. Replicas removed via removeFromReplicaSetLocked (such as when a replica is destroyed) still don’t always have their callbacks invoked. While the natural place to invoke the callback would be at removeFromReplicaSetLocked, invoking callbacks while holding a lock risks blocking for too long. (We are doing this already for addInternal though.) This PR focuses specifically on handling the SetMaxSize case since this PR is intended to be backported. We can follow up with a more complex but more principled approach on master if needed in the future.
1 parent c013c36 commit 7d9f922

File tree

2 files changed

+86
-15
lines changed

2 files changed

+86
-15
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ type processCallback struct {
155155
// 4. If the replica is already in the queue and processing.
156156
// - May be skipped if the replica is already in queue and no priority changes
157157
// occur.
158+
//
159+
// Note that the callback may be invoked with (during bq.addInternal) or
160+
// without holding the lock (bq.AddAsyncWithCallback, bq.SetMaxSize, defer in
161+
// bq.addInternal) on baseQueue.mu. Important that the callback does not take
162+
// too long to execute.
158163
onEnqueueResult func(indexOnHeap int, err error)
159164

160165
// onProcessResult is called with the result of any process attempts. It is
@@ -164,6 +169,10 @@ type processCallback struct {
164169
// re-processing.
165170
// - May be skipped if the replica is removed with removeFromReplicaSetLocked
166171
// or registered with a new replica id before processing begins.
172+
//
173+
// Note that the callback may be invoked with (during bq.MaybeAddCallback) or
174+
// without holding the lock (bq.finishProcessingReplica) on baseQueue.mu.
175+
// Important that the callback does not take too long to execute.
167176
onProcessResult func(err error)
168177
}
169178

@@ -623,18 +632,37 @@ func (bq *baseQueue) SetDisabled(disabled bool) {
623632

624633
// SetMaxSize sets the max size of the queue.
625634
func (bq *baseQueue) SetMaxSize(maxSize int64) {
626-
bq.mu.Lock()
627-
defer bq.mu.Unlock()
628-
bq.mu.maxSize = maxSize
629-
// Drop replicas until no longer exceeding the max size. Note: We call
630-
// removeLocked to match the behavior of addInternal. In theory, only
631-
// removeFromQueueLocked should be triggered in removeLocked, since the item
632-
// is in the priority queue, it should not be processing or in the purgatory
633-
// queue. To be safe, however, we use removeLocked.
634-
for int64(bq.mu.priorityQ.Len()) > maxSize {
635-
pqLen := bq.mu.priorityQ.Len()
636-
bq.full.Inc(1)
637-
bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
635+
var droppedItems []*replicaItem
636+
func() {
637+
bq.mu.Lock()
638+
defer bq.mu.Unlock()
639+
bq.mu.maxSize = maxSize
640+
currentLen := int64(bq.mu.priorityQ.Len())
641+
if currentLen > maxSize {
642+
itemsToDrop := currentLen - maxSize
643+
droppedItems = make([]*replicaItem, 0, itemsToDrop)
644+
645+
// Drop lower-priority replicas until no longer exceeding the max size.
646+
// Note: We call removeLocked to match the behavior of addInternal. In
647+
// theory, only removeFromQueueLocked should be triggered in removeLocked,
648+
// since the item is in the priority queue, it should not be processing or
649+
// in the purgatory queue. To be safe, however, we use removeLocked.
650+
for int64(bq.mu.priorityQ.Len()) > maxSize {
651+
lastIdx := bq.mu.priorityQ.Len() - 1
652+
item := bq.mu.priorityQ.sl[lastIdx]
653+
droppedItems = append(droppedItems, item)
654+
bq.removeLocked(item)
655+
}
656+
}
657+
}()
658+
659+
// Notify callbacks outside the lock to avoid holding onto the lock for too
660+
// long.
661+
for _, item := range droppedItems {
662+
bq.updateMetricsOnDroppedDueToFullQueue()
663+
for _, cb := range item.callbacks {
664+
cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize)
665+
}
638666
}
639667
}
640668

@@ -805,6 +833,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueAdd() {
805833
}
806834
}
807835

836+
// updateMetricsOnDroppedDueToFullQueue updates the metrics when a replica is
837+
// dropped due to a full queue size.
838+
func (bq *baseQueue) updateMetricsOnDroppedDueToFullQueue() {
839+
if bq.full != nil {
840+
bq.full.Inc(1)
841+
}
842+
}
843+
808844
func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
809845
ctx = repl.AnnotateCtx(ctx)
810846
ctx = bq.AnnotateCtx(ctx)
@@ -983,9 +1019,7 @@ func (bq *baseQueue) addInternal(
9831019
// scan.
9841020
if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize {
9851021
replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
986-
if bq.full != nil {
987-
bq.full.Inc(1)
988-
}
1022+
bq.updateMetricsOnDroppedDueToFullQueue()
9891023
log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
9901024
priority, replicaItemToDrop.replicaID)
9911025
// TODO(wenyihu6): when we introduce base queue max size cluster setting,

pkg/kv/kvserver/queue_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1496,6 +1496,43 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
14961496
require.Equal(t, int64(1), bq.enqueueAdd.Count())
14971497
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
14981498
})
1499+
t.Run("queuesizeshrinking", func(t *testing.T) {
1500+
testQueue := &testQueueImpl{}
1501+
const oldMaxSize = 15
1502+
const newMaxSize = 5
1503+
expectedEnqueueErrorCount := oldMaxSize - newMaxSize
1504+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: oldMaxSize})
1505+
r, err := tc.store.GetReplica(1)
1506+
require.NoError(t, err)
1507+
var enqueueErrorCount atomic.Int64
1508+
// Max size is 10, so the replica should be enqueued.
1509+
for i := 0; i < oldMaxSize; i++ {
1510+
r.Desc().RangeID = roachpb.RangeID(i + 1)
1511+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1512+
onEnqueueResult: func(indexOnHeap int, err error) {
1513+
if err != nil {
1514+
enqueueErrorCount.Add(1)
1515+
}
1516+
},
1517+
onProcessResult: func(err error) {
1518+
t.Fatal("unexpected call to onProcessResult")
1519+
},
1520+
})
1521+
require.True(t, queued)
1522+
}
1523+
require.Equal(t, int64(oldMaxSize), bq.enqueueAdd.Count())
1524+
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
1525+
1526+
// Set max size to 5 and add more replicas.
1527+
bq.SetMaxSize(newMaxSize)
1528+
testutils.SucceedsSoon(t, func() error {
1529+
if enqueueErrorCount.Load() != int64(expectedEnqueueErrorCount) {
1530+
return errors.Errorf("expected %d enqueue errors; got %d",
1531+
expectedEnqueueErrorCount, enqueueErrorCount.Load())
1532+
}
1533+
return nil
1534+
})
1535+
})
14991536
}
15001537

15011538
// TestBaseQueueCallbackOnProcessResult tests that the processCallback is

0 commit comments

Comments
 (0)