Skip to content

Commit e9b11cf

Browse files
authored
Merge pull request #153180 from wenyihu6/backportrelease-25.2.6-rc-153008
release-25.2.6-rc: kvserver: invoke callback when SetMaxSize drops replicas
2 parents 8cc1e49 + a857ffc commit e9b11cf

File tree

2 files changed

+86
-15
lines changed

2 files changed

+86
-15
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ type processCallback struct {
156156
// 4. If the replica is already in the queue and processing.
157157
// - May be skipped if the replica is already in queue and no priority changes
158158
// occur.
159+
//
160+
// Note that the callback may be invoked with (during bq.addInternal) or
161+
// without holding the lock (bq.AddAsyncWithCallback, bq.SetMaxSize, defer in
162+
// bq.addInternal) on baseQueue.mu. Important that the callback does not take
163+
// too long to execute.
159164
onEnqueueResult func(indexOnHeap int, err error)
160165

161166
// onProcessResult is called with the result of any process attempts. It is
@@ -165,6 +170,10 @@ type processCallback struct {
165170
// re-processing.
166171
// - May be skipped if the replica is removed with removeFromReplicaSetLocked
167172
// or registered with a new replica id before processing begins.
173+
//
174+
// Note that the callback may be invoked with (during bq.MaybeAddCallback) or
175+
// without holding the lock (bq.finishProcessingReplica) on baseQueue.mu.
176+
// Important that the callback does not take too long to execute.
168177
onProcessResult func(err error)
169178
}
170179

@@ -624,18 +633,37 @@ func (bq *baseQueue) SetDisabled(disabled bool) {
624633

625634
// SetMaxSize sets the max size of the queue.
626635
func (bq *baseQueue) SetMaxSize(maxSize int64) {
627-
bq.mu.Lock()
628-
defer bq.mu.Unlock()
629-
bq.mu.maxSize = maxSize
630-
// Drop replicas until no longer exceeding the max size. Note: We call
631-
// removeLocked to match the behavior of addInternal. In theory, only
632-
// removeFromQueueLocked should be triggered in removeLocked, since the item
633-
// is in the priority queue, it should not be processing or in the purgatory
634-
// queue. To be safe, however, we use removeLocked.
635-
for int64(bq.mu.priorityQ.Len()) > maxSize {
636-
pqLen := bq.mu.priorityQ.Len()
637-
bq.full.Inc(1)
638-
bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
636+
var droppedItems []*replicaItem
637+
func() {
638+
bq.mu.Lock()
639+
defer bq.mu.Unlock()
640+
bq.mu.maxSize = maxSize
641+
currentLen := int64(bq.mu.priorityQ.Len())
642+
if currentLen > maxSize {
643+
itemsToDrop := currentLen - maxSize
644+
droppedItems = make([]*replicaItem, 0, itemsToDrop)
645+
646+
// Drop lower-priority replicas until no longer exceeding the max size.
647+
// Note: We call removeLocked to match the behavior of addInternal. In
648+
// theory, only removeFromQueueLocked should be triggered in removeLocked,
649+
// since the item is in the priority queue, it should not be processing or
650+
// in the purgatory queue. To be safe, however, we use removeLocked.
651+
for int64(bq.mu.priorityQ.Len()) > maxSize {
652+
lastIdx := bq.mu.priorityQ.Len() - 1
653+
item := bq.mu.priorityQ.sl[lastIdx]
654+
droppedItems = append(droppedItems, item)
655+
bq.removeLocked(item)
656+
}
657+
}
658+
}()
659+
660+
// Notify callbacks outside the lock to avoid holding onto the lock for too
661+
// long.
662+
for _, item := range droppedItems {
663+
bq.updateMetricsOnDroppedDueToFullQueue()
664+
for _, cb := range item.callbacks {
665+
cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize)
666+
}
639667
}
640668
}
641669

@@ -805,6 +833,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueAdd() {
805833
}
806834
}
807835

836+
// updateMetricsOnDroppedDueToFullQueue updates the metrics when a replica is
837+
// dropped due to a full queue size.
838+
func (bq *baseQueue) updateMetricsOnDroppedDueToFullQueue() {
839+
if bq.full != nil {
840+
bq.full.Inc(1)
841+
}
842+
}
843+
808844
func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
809845
ctx = repl.AnnotateCtx(ctx)
810846
ctx = bq.AnnotateCtx(ctx)
@@ -983,9 +1019,7 @@ func (bq *baseQueue) addInternal(
9831019
// scan.
9841020
if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize {
9851021
replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
986-
if bq.full != nil {
987-
bq.full.Inc(1)
988-
}
1022+
bq.updateMetricsOnDroppedDueToFullQueue()
9891023
log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
9901024
priority, replicaItemToDrop.replicaID)
9911025
// TODO(wenyihu6): when we introduce base queue max size cluster setting,

pkg/kv/kvserver/queue_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,6 +1501,43 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
15011501
require.Equal(t, int64(1), bq.enqueueAdd.Count())
15021502
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
15031503
})
1504+
t.Run("queuesizeshrinking", func(t *testing.T) {
1505+
testQueue := &testQueueImpl{}
1506+
const oldMaxSize = 15
1507+
const newMaxSize = 5
1508+
expectedEnqueueErrorCount := oldMaxSize - newMaxSize
1509+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: oldMaxSize})
1510+
r, err := tc.store.GetReplica(1)
1511+
require.NoError(t, err)
1512+
var enqueueErrorCount atomic.Int64
1513+
// Max size is 10, so the replica should be enqueued.
1514+
for i := 0; i < oldMaxSize; i++ {
1515+
r.Desc().RangeID = roachpb.RangeID(i + 1)
1516+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1517+
onEnqueueResult: func(indexOnHeap int, err error) {
1518+
if err != nil {
1519+
enqueueErrorCount.Add(1)
1520+
}
1521+
},
1522+
onProcessResult: func(err error) {
1523+
t.Fatal("unexpected call to onProcessResult")
1524+
},
1525+
})
1526+
require.True(t, queued)
1527+
}
1528+
require.Equal(t, int64(oldMaxSize), bq.enqueueAdd.Count())
1529+
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
1530+
1531+
// Set max size to 5 and add more replicas.
1532+
bq.SetMaxSize(newMaxSize)
1533+
testutils.SucceedsSoon(t, func() error {
1534+
if enqueueErrorCount.Load() != int64(expectedEnqueueErrorCount) {
1535+
return errors.Errorf("expected %d enqueue errors; got %d",
1536+
expectedEnqueueErrorCount, enqueueErrorCount.Load())
1537+
}
1538+
return nil
1539+
})
1540+
})
15041541
}
15051542

15061543
// TestBaseQueueCallbackOnProcessResult tests that the processCallback is

0 commit comments

Comments
 (0)