Skip to content

Commit 917f8fe

Browse files
authored
Merge pull request #153179 from wenyihu6/backportrelease-24.3.20-rc-153008
release-24.3.20-rc: kvserver: invoke callback when SetMaxSize drops replicas
2 parents ddd5aa3 + 5280ea4 commit 917f8fe

File tree

2 files changed

+86
-15
lines changed

2 files changed

+86
-15
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ type processCallback struct {
156156
// 4. If the replica is already in the queue and processing.
157157
// - May be skipped if the replica is already in queue and no priority changes
158158
// occur.
159+
//
160+
// Note that the callback may be invoked with (during bq.addInternal) or
161+
// without holding the lock (bq.AddAsyncWithCallback, bq.SetMaxSize, defer in
162+
// bq.addInternal) on baseQueue.mu. Important that the callback does not take
163+
// too long to execute.
159164
onEnqueueResult func(indexOnHeap int, err error)
160165

161166
// onProcessResult is called with the result of any process attempts. It is
@@ -165,6 +170,10 @@ type processCallback struct {
165170
// re-processing.
166171
// - May be skipped if the replica is removed with removeFromReplicaSetLocked
167172
// or registered with a new replica id before processing begins.
173+
//
174+
// Note that the callback may be invoked with (during bq.MaybeAddCallback) or
175+
// without holding the lock (bq.finishProcessingReplica) on baseQueue.mu.
176+
// Important that the callback does not take too long to execute.
168177
onProcessResult func(err error)
169178
}
170179

@@ -628,18 +637,37 @@ func (bq *baseQueue) SetDisabled(disabled bool) {
628637

629638
// SetMaxSize sets the max size of the queue.
630639
func (bq *baseQueue) SetMaxSize(maxSize int64) {
631-
bq.mu.Lock()
632-
defer bq.mu.Unlock()
633-
bq.mu.maxSize = maxSize
634-
// Drop replicas until no longer exceeding the max size. Note: We call
635-
// removeLocked to match the behavior of addInternal. In theory, only
636-
// removeFromQueueLocked should be triggered in removeLocked, since the item
637-
// is in the priority queue, it should not be processing or in the purgatory
638-
// queue. To be safe, however, we use removeLocked.
639-
for int64(bq.mu.priorityQ.Len()) > maxSize {
640-
pqLen := bq.mu.priorityQ.Len()
641-
bq.full.Inc(1)
642-
bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
640+
var droppedItems []*replicaItem
641+
func() {
642+
bq.mu.Lock()
643+
defer bq.mu.Unlock()
644+
bq.mu.maxSize = maxSize
645+
currentLen := int64(bq.mu.priorityQ.Len())
646+
if currentLen > maxSize {
647+
itemsToDrop := currentLen - maxSize
648+
droppedItems = make([]*replicaItem, 0, itemsToDrop)
649+
650+
// Drop lower-priority replicas until no longer exceeding the max size.
651+
// Note: We call removeLocked to match the behavior of addInternal. In
652+
// theory, only removeFromQueueLocked should be triggered in removeLocked,
653+
// since the item is in the priority queue, it should not be processing or
654+
// in the purgatory queue. To be safe, however, we use removeLocked.
655+
for int64(bq.mu.priorityQ.Len()) > maxSize {
656+
lastIdx := bq.mu.priorityQ.Len() - 1
657+
item := bq.mu.priorityQ.sl[lastIdx]
658+
droppedItems = append(droppedItems, item)
659+
bq.removeLocked(item)
660+
}
661+
}
662+
}()
663+
664+
// Notify callbacks outside the lock to avoid holding onto the lock for too
665+
// long.
666+
for _, item := range droppedItems {
667+
bq.updateMetricsOnDroppedDueToFullQueue()
668+
for _, cb := range item.callbacks {
669+
cb.onEnqueueResult(-1 /*indexOnHeap*/, errDroppedDueToFullQueueSize)
670+
}
643671
}
644672
}
645673

@@ -809,6 +837,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueAdd() {
809837
}
810838
}
811839

840+
// updateMetricsOnDroppedDueToFullQueue updates the metrics when a replica is
841+
// dropped due to a full queue size.
842+
func (bq *baseQueue) updateMetricsOnDroppedDueToFullQueue() {
843+
if bq.full != nil {
844+
bq.full.Inc(1)
845+
}
846+
}
847+
812848
func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
813849
ctx = repl.AnnotateCtx(ctx)
814850
ctx = bq.AnnotateCtx(ctx)
@@ -987,9 +1023,7 @@ func (bq *baseQueue) addInternal(
9871023
// scan.
9881024
if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize {
9891025
replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
990-
if bq.full != nil {
991-
bq.full.Inc(1)
992-
}
1026+
bq.updateMetricsOnDroppedDueToFullQueue()
9931027
log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
9941028
priority, replicaItemToDrop.replicaID)
9951029
// TODO(wenyihu6): when we introduce base queue max size cluster setting,

pkg/kv/kvserver/queue_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,6 +1503,43 @@ func TestBaseQueueCallbackOnEnqueueResult(t *testing.T) {
15031503
require.Equal(t, int64(1), bq.enqueueAdd.Count())
15041504
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
15051505
})
1506+
t.Run("queuesizeshrinking", func(t *testing.T) {
1507+
testQueue := &testQueueImpl{}
1508+
const oldMaxSize = 15
1509+
const newMaxSize = 5
1510+
expectedEnqueueErrorCount := oldMaxSize - newMaxSize
1511+
bq := makeTestBaseQueue("test", testQueue, tc.store, queueConfig{maxSize: oldMaxSize})
1512+
r, err := tc.store.GetReplica(1)
1513+
require.NoError(t, err)
1514+
var enqueueErrorCount atomic.Int64
1515+
// Max size is 10, so the replica should be enqueued.
1516+
for i := 0; i < oldMaxSize; i++ {
1517+
r.Desc().RangeID = roachpb.RangeID(i + 1)
1518+
queued, _ := bq.testingAddWithCallback(ctx, r, 1.0, processCallback{
1519+
onEnqueueResult: func(indexOnHeap int, err error) {
1520+
if err != nil {
1521+
enqueueErrorCount.Add(1)
1522+
}
1523+
},
1524+
onProcessResult: func(err error) {
1525+
t.Fatal("unexpected call to onProcessResult")
1526+
},
1527+
})
1528+
require.True(t, queued)
1529+
}
1530+
require.Equal(t, int64(oldMaxSize), bq.enqueueAdd.Count())
1531+
require.Equal(t, int64(0), bq.enqueueUnexpectedError.Count())
1532+
1533+
// Set max size to 5 and add more replicas.
1534+
bq.SetMaxSize(newMaxSize)
1535+
testutils.SucceedsSoon(t, func() error {
1536+
if enqueueErrorCount.Load() != int64(expectedEnqueueErrorCount) {
1537+
return errors.Errorf("expected %d enqueue errors; got %d",
1538+
expectedEnqueueErrorCount, enqueueErrorCount.Load())
1539+
}
1540+
return nil
1541+
})
1542+
})
15061543
}
15071544

15081545
// TestBaseQueueCallbackOnProcessResult tests that the processCallback is

0 commit comments

Comments
 (0)