Skip to content

Commit b4b486d

Browse files
committed
kvserver: drop excess replicas when lowering ReplicateQueueMaxSize
Previously, the ReplicateQueueMaxSize cluster setting allowed dynamic adjustment of the replicate queue’s maximum size. However, decreasing this setting did not properly drop excess replicas. This commit fixes that by removing replicas when the queue’s max size is lowered.
1 parent 2c8a6da commit b4b486d

File tree

2 files changed

+30
-5
lines changed

2 files changed

+30
-5
lines changed

pkg/kv/kvserver/queue.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,18 @@ func (bq *baseQueue) SetDisabled(disabled bool) {
547547
// SetMaxSize sets the max size of the queue.
548548
func (bq *baseQueue) SetMaxSize(maxSize int64) {
549549
bq.mu.Lock()
550+
defer bq.mu.Unlock()
550551
bq.mu.maxSize = maxSize
551-
bq.mu.Unlock()
552+
// Drop replicas until no longer exceeding the max size. Note: We call
553+
// removeLocked to match the behavior of addInternal. In theory, only
554+
// removeFromQueueLocked should be triggered in removeLocked, since the item
555+
// is in the priority queue, it should not be processing or in the purgatory
556+
// queue. To be safe, however, we use removeLocked.
557+
for int64(bq.mu.priorityQ.Len()) > maxSize {
558+
pqLen := bq.mu.priorityQ.Len()
559+
bq.droppedDueToSize.Inc(1)
560+
bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
561+
}
552562
}
553563

554564
// lockProcessing locks all processing in the baseQueue. It returns

pkg/kv/kvserver/store_rebalancer_test.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1885,15 +1885,19 @@ func TestReplicateQueueMaxSize(t *testing.T) {
18851885
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 1)
18861886
replicateQueue := newReplicateQueue(tc.store, a)
18871887

1888-
// Function to add a replica and verify queue state
1888+
// Helper function to add a replica and verify queue state.
1889+
verify := func(expectedLength int, expectedDropped int64) {
1890+
require.Equal(t, expectedLength, replicateQueue.Length())
1891+
require.Equal(t, expectedDropped, replicateQueue.droppedDueToSize.Count())
1892+
require.Equal(t, expectedDropped, tc.store.metrics.ReplicateQueueDroppedDueToSize.Count())
1893+
}
1894+
18891895
addReplicaAndVerify := func(rangeID roachpb.RangeID, expectedLength int, expectedDropped int64) {
18901896
r.Desc().RangeID = rangeID
18911897
enqueued, err := replicateQueue.testingAdd(context.Background(), r, 0.0)
18921898
require.NoError(t, err)
18931899
require.True(t, enqueued)
1894-
require.Equal(t, expectedLength, replicateQueue.Length())
1895-
require.Equal(t, expectedDropped, replicateQueue.droppedDueToSize.Count())
1896-
require.Equal(t, expectedDropped, tc.store.metrics.ReplicateQueueDroppedDueToSize.Count())
1900+
verify(expectedLength, expectedDropped)
18971901
}
18981902

18991903
// First replica should be added.
@@ -1912,4 +1916,15 @@ func TestReplicateQueueMaxSize(t *testing.T) {
19121916

19131917
// Add one more to exceed the max size. Should be dropped.
19141918
addReplicaAndVerify(102 /* rangeID */, 100 /* expectedLength */, 3 /* expectedDropped */)
1919+
1920+
// Reset to the same size should not change the queue length.
1921+
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 100)
1922+
verify(100 /* expectedLength */, 3 /* expectedDropped */)
1923+
1924+
// Decrease the max size to 10 which should drop 90 replicas.
1925+
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 10)
1926+
verify(10 /* expectedLength */, 93 /* expectedDropped: 3 + 90 */)
1927+
1928+
// Should drop another one now that max size is 10.
1929+
addReplicaAndVerify(103 /* rangeID */, 10 /* expectedLength */, 94 /* expectedDropped: 3 + 90 + 1 */)
19151930
}

0 commit comments

Comments
 (0)