Skip to content

Commit a857ffc

Browse files
committed
kvserver: invoke callback when dropping replicas in bq.SetMaxSize
Previously, when SetMaxSize shrank the queue, replicas were dropped from the priority queue without invoking their callbacks. This commit ensures callbacks are properly invoked when SetMaxSize drops replicas. Replicas removed via removeFromReplicaSetLocked (such as when a replica is destroyed) still don’t always have their callbacks invoked. While the natural place to invoke the callback would be at removeFromReplicaSetLocked, invoking callbacks while holding a lock risks blocking for too long. (We are doing this already for addInternal though.) This PR focuses specifically on handling the SetMaxSize case since this PR is intended to be backported. We can follow up with a more complex but more principled approach on master if needed in the future.
1 parent 8cc1e49 commit a857ffc

File tree

2 files changed

+86
-15
lines changed

2 files changed

+86
-15
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ type processCallback struct {
156156
// 4. If the replica is already in the queue and processing.
157157
// - May be skipped if the replica is already in queue and no priority changes
158158
// occur.
159+
//
160+
// Note that the callback may be invoked with (during bq.addInternal) or
161+
// without holding the lock (bq.AddAsyncWithCallback, bq.SetMaxSize, defer in
162+
// bq.addInternal) on baseQueue.mu. Important that the callback does not take
163+
// too long to execute.
159164
onEnqueueResult func(indexOnHeap int, err error)
160165

161166
// onProcessResult is called with the result of any process attempts. It is
@@ -165,6 +170,10 @@ type processCallback struct {
165170
// re-processing.
166171
// - May be skipped if the replica is removed with removeFromReplicaSetLocked
167172
// or registered with a new replica id before processing begins.
173+
//
174+
// Note that the callback may be invoked with (during bq.MaybeAddCallback) or
175+
// without holding the lock (bq.finishProcessingReplica) on baseQueue.mu.
176+
// Important that the callback does not take too long to execute.
168177
onProcessResult func(err error)
169178
}
170179

@@ -624,18 +633,37 @@ func (bq *baseQueue) SetDisabled(disabled bool) {
624633

625634
// SetMaxSize sets the max size of the queue.
626635
func (bq *baseQueue) SetMaxSize(maxSize int64) {
627-
bq.mu.Lock()
628-
defer bq.mu.Unlock()
629-
bq.mu.maxSize = maxSize
630-
// Drop replicas until no longer exceeding the max size. Note: We call
631-
// removeLocked to match the behavior of addInternal. In theory, only
632-
// removeFromQueueLocked should be triggered in removeLocked, since the item
633-
// is in the priority queue, it should not be processing or in the purgatory
634-
// queue. To be safe, however, we use removeLocked.
635-
for int64(bq.mu.priorityQ.Len()) > maxSize {
636-
pqLen := bq.mu.priorityQ.Len()
637-
bq.full.Inc(1)
638-
bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
636+
var droppedItems []*replicaItem
637+
func() {
638+
bq.mu.Lock()
639+
defer bq.mu.Unlock()
640+
bq.mu.maxSize = maxSize
641+
currentLen := int64(bq.mu.priorityQ.Len())
642+
if currentLen > maxSize {
643+
itemsToDrop := currentLen - maxSize
644+
droppedItems = make([]*replicaItem, 0, itemsToDrop)
645+
646+
// Drop lower-priority replicas until no longer exceeding the max size.
647+
// Note: We call removeLocked to match the behavior of addInternal. In
648+
// theory, only removeFromQueueLocked should be triggered in removeLocked,
649+
// since the item is in the priority queue, it should not be processing or
650+
// in the purgatory queue. To be safe, however, we use removeLocked.
651+
for int64(bq.mu.priorityQ.Len()) > maxSize {
652+
lastIdx := bq.mu.priorityQ.Len() - 1
653+
item := bq.mu.priorityQ.sl[lastIdx]
654+
droppedItems = append(droppedItems, item)
655+
bq.removeLocked(item)
656+
}
657+
}
658+
}()
659+
660+
// Notify callbacks outside the lock to avoid holding onto the lock for too
661+
// long.
662+
for _, item := range droppedItems {
663+
bq.updateMetricsOnDroppedDueToFullQueue()
664+
for _, cb := range item.callbacks {
665+
cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize)
666+
}
639667
}
640668
}
641669

@@ -805,6 +833,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueAdd() {
805833
}
806834
}
807835

836+
// updateMetricsOnDroppedDueToFullQueue updates the metrics when a replica is
837+
// dropped due to a full queue size.
838+
func (bq *baseQueue) updateMetricsOnDroppedDueToFullQueue() {
839+
if bq.full != nil {
840+
bq.full.Inc(1)
841+
}
842+
}
843+
808844
func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
809845
ctx = repl.AnnotateCtx(ctx)
810846
ctx = bq.AnnotateCtx(ctx)
@@ -983,9 +1019,7 @@ func (bq *baseQueue) addInternal(
9831019
// scan.
9841020
if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize {
9851021
replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
986-
if bq.full != nil {
987-
bq.full.Inc(1)
988-
}
1022+
bq.updateMetricsOnDroppedDueToFullQueue()
9891023
log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
9901024
priority, replicaItemToDrop.replicaID)
9911025
// TODO(wenyihu6): when we introduce base queue max size cluster setting,

pkg/kv/kvserver/queue_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,6 +1501,43 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
15011501
require.Equal(t, int64(1), bq.enqueueAdd.Count())
15021502
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
15031503
})
1504+
t.Run("queuesizeshrinking", func(t *testing.T) {
1505+
testQueue := &testQueueImpl{}
1506+
const oldMaxSize = 15
1507+
const newMaxSize = 5
1508+
expectedEnqueueErrorCount := oldMaxSize - newMaxSize
1509+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: oldMaxSize})
1510+
r, err := tc.store.GetReplica(1)
1511+
require.NoError(t, err)
1512+
var enqueueErrorCount atomic.Int64
1513+
// Max size is 10, so the replica should be enqueued.
1514+
for i := 0; i < oldMaxSize; i++ {
1515+
r.Desc().RangeID = roachpb.RangeID(i + 1)
1516+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1517+
onEnqueueResult: func(indexOnHeap int, err error) {
1518+
if err != nil {
1519+
enqueueErrorCount.Add(1)
1520+
}
1521+
},
1522+
onProcessResult: func(err error) {
1523+
t.Fatal("unexpected call to onProcessResult")
1524+
},
1525+
})
1526+
require.True(t, queued)
1527+
}
1528+
require.Equal(t, int64(oldMaxSize), bq.enqueueAdd.Count())
1529+
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
1530+
1531+
// Set max size to 5 and add more replicas.
1532+
bq.SetMaxSize(newMaxSize)
1533+
testutils.SucceedsSoon(t, func() error {
1534+
if enqueueErrorCount.Load() != int64(expectedEnqueueErrorCount) {
1535+
return errors.Errorf("expected %d enqueue errors; got %d",
1536+
expectedEnqueueErrorCount, enqueueErrorCount.Load())
1537+
}
1538+
return nil
1539+
})
1540+
})
15041541
}
15051542

15061543
// TestBaseQueueCallbackOnProcessResult tests that the processCallback is

0 commit comments

Comments
 (0)