Skip to content

Commit 29896e5

Browse files
craig[bot]wenyihu6
andcommitted
Merge #152699
152699: kvserver: add ReplicateQueueMaxSize r=tbg a=wenyihu6 Informs: #151775 Release note: none --- **kvserver: add ReplicateQueueDroppedDueToSize** Previously, we had limited observability into when queues drop replicas due to reaching their maximum size. This commit adds a metric to track and observe such events. --- **kvserver: add BaseQueueMaxSize and ReplicateQueueMaxSize** Previously, the max base queue size was hardcoded to defaultQueueMaxSize (10000). Since replica item structs in the priority queue are small, we don’t see a strong reason for this fixed limit. As an incremental and backport friendly change, this commit makes the queue size configurable via a cluster setting. The replicate queue uses its own setting, allowing its size to be increased independently while leaving other queues unchanged. --- **kvserver: add TestBaseQueueMaxSize** This commit adds tests to (1) verify metric updates when replicas are dropped from the queue, and (2) ensure the cluster setting for configuring the base queue’s max size works correctly. --- **Revert "kvserver: add TestBaseQueueMaxSize"** This reverts commit a5f01e5. --- **Revert "kvserver: add BaseQueueMaxSize and ReplicateQueueMaxSize"** This reverts commit d89eaa7. --- **kvserver: add ReplicateQueueMaxSize** Previously, the maximum base queue size was hardcoded to defaultQueueMaxSize (10000). Since replica item structs are small, there’s little reason to enforce a fixed limit. This commit makes the replicate queue size configurable via a cluster setting ReplicateQueueMaxSize, allowing incremental and backport-friendly adjustments. Note that reducing the setting does not drop replicas appropirately; future commits will address this behavior. --- **kvserver: add TestReplicateQueueMaxSize** This commit adds tests to (1) verify metric updates when replicas are dropped from the queue, and (2) ensure the cluster setting for ReplicateQueueMaxSize works correctly. --- **kvserver: drop excess replicas when lowering ReplicateQueueMaxSize** Previously, the ReplicateQueueMaxSize cluster setting allowed dynamic adjustment of the replicate queue’s maximum size. However, decreasing this setting did not properly drop excess replicas. This commit fixes that by removing replicas when the queue’s max size is lowered. --- **kvserver: rename ReplicateQueueDroppedDueToSize to ReplicateQueueFull** This commit improves the clarity around the naming and description of the metrics. Co-authored-by: wenyihu6 <[email protected]>
2 parents bc9c804 + ac802fe commit 29896e5

File tree

5 files changed

+133
-1
lines changed

5 files changed

+133
-1
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13951,6 +13951,14 @@ layers:
1395113951
unit: COUNT
1395213952
aggregation: AVG
1395313953
derivative: NONE
13954+
- name: queue.replicate.queue_full
13955+
exported_name: queue_replicate_queue_full
13956+
description: Number of times a replica was dropped from the queue due to queue fullness
13957+
y_axis_label: Replicas
13958+
type: COUNTER
13959+
unit: COUNT
13960+
aggregation: AVG
13961+
derivative: NON_NEGATIVE_DERIVATIVE
1395413962
- name: queue.replicate.rebalancenonvoterreplica
1395513963
exported_name: queue_replicate_rebalancenonvoterreplica
1395613964
description: Number of non-voter replica rebalancer-initiated additions attempted by the replicate queue

pkg/kv/kvserver/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2192,6 +2192,12 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
21922192
Measurement: "Replicas",
21932193
Unit: metric.Unit_COUNT,
21942194
}
2195+
metaReplicateQueueFull = metric.Metadata{
2196+
Name: "queue.replicate.queue_full",
2197+
Help: "Number of times a replica was dropped from the queue due to queue fullness",
2198+
Measurement: "Replicas",
2199+
Unit: metric.Unit_COUNT,
2200+
}
21952201
metaReplicateQueueProcessingNanos = metric.Metadata{
21962202
Name: "queue.replicate.processingnanos",
21972203
Help: "Nanoseconds spent processing replicas in the replicate queue",
@@ -3185,6 +3191,7 @@ type StoreMetrics struct {
31853191
ReplicateQueueSuccesses *metric.Counter
31863192
ReplicateQueueFailures *metric.Counter
31873193
ReplicateQueuePending *metric.Gauge
3194+
ReplicateQueueFull *metric.Counter
31883195
ReplicateQueueProcessingNanos *metric.Counter
31893196
ReplicateQueuePurgatory *metric.Gauge
31903197
SplitQueueSuccesses *metric.Counter
@@ -3974,6 +3981,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
39743981
ReplicateQueueSuccesses: metric.NewCounter(metaReplicateQueueSuccesses),
39753982
ReplicateQueueFailures: metric.NewCounter(metaReplicateQueueFailures),
39763983
ReplicateQueuePending: metric.NewGauge(metaReplicateQueuePending),
3984+
ReplicateQueueFull: metric.NewCounter(metaReplicateQueueFull),
39773985
ReplicateQueueProcessingNanos: metric.NewCounter(metaReplicateQueueProcessingNanos),
39783986
ReplicateQueuePurgatory: metric.NewGauge(metaReplicateQueuePurgatory),
39793987
SplitQueueSuccesses: metric.NewCounter(metaSplitQueueSuccesses),

pkg/kv/kvserver/queue.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,11 @@ type queueConfig struct {
331331
failures *metric.Counter
332332
// pending is a gauge measuring current replica count pending.
333333
pending *metric.Gauge
334+
// full is a counter measuring replicas dropped due to exceeding the queue max
335+
// size.
336+
// NB: this metric may be nil for queues that are not interested in tracking
337+
// this.
338+
full *metric.Counter
334339
// processingNanos is a counter measuring total nanoseconds spent processing
335340
// replicas.
336341
processingNanos *metric.Counter
@@ -441,6 +446,7 @@ type baseQueue struct {
441446
purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors
442447
stopped bool
443448
disabled bool
449+
maxSize int64
444450
}
445451
}
446452

@@ -493,6 +499,7 @@ func newBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *b
493499
},
494500
}
495501
bq.mu.replicas = map[roachpb.RangeID]*replicaItem{}
502+
bq.mu.maxSize = int64(cfg.maxSize)
496503
bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV))
497504
cfg.disabledConfig.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) {
498505
bq.SetDisabled(!cfg.disabledConfig.Get(&store.cfg.Settings.SV))
@@ -537,6 +544,23 @@ func (bq *baseQueue) SetDisabled(disabled bool) {
537544
bq.mu.Unlock()
538545
}
539546

547+
// SetMaxSize sets the max size of the queue.
548+
func (bq *baseQueue) SetMaxSize(maxSize int64) {
549+
bq.mu.Lock()
550+
defer bq.mu.Unlock()
551+
bq.mu.maxSize = maxSize
552+
// Drop replicas until no longer exceeding the max size. Note: We call
553+
// removeLocked to match the behavior of addInternal. In theory, only
554+
// removeFromQueueLocked should be triggered in removeLocked, since the item
555+
// is in the priority queue, it should not be processing or in the purgatory
556+
// queue. To be safe, however, we use removeLocked.
557+
for int64(bq.mu.priorityQ.Len()) > maxSize {
558+
pqLen := bq.mu.priorityQ.Len()
559+
bq.full.Inc(1)
560+
bq.removeLocked(bq.mu.priorityQ.sl[pqLen-1])
561+
}
562+
}
563+
540564
// lockProcessing locks all processing in the baseQueue. It returns
541565
// a function to unlock processing.
542566
func (bq *baseQueue) lockProcessing() func() {
@@ -772,8 +796,11 @@ func (bq *baseQueue) addInternal(
772796
// guaranteed to be globally ordered. Ideally, we would remove the lowest
773797
// priority element, but it would require additional bookkeeping or a linear
774798
// scan.
775-
if pqLen := bq.mu.priorityQ.Len(); pqLen > bq.maxSize {
799+
if pqLen := bq.mu.priorityQ.Len(); int64(pqLen) > bq.mu.maxSize {
776800
replicaItemToDrop := bq.mu.priorityQ.sl[pqLen-1]
801+
if bq.full != nil {
802+
bq.full.Inc(1)
803+
}
777804
log.Dev.VInfof(ctx, 1, "dropping due to exceeding queue max size: priority=%0.3f, replica=%v",
778805
priority, replicaItemToDrop.replicaID)
779806
bq.removeLocked(replicaItemToDrop)

pkg/kv/kvserver/replicate_queue.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,24 @@ var EnqueueProblemRangeInReplicateQueueInterval = settings.RegisterDurationSetti
100100
0,
101101
)
102102

103+
// ReplicateQueueMaxSize is a setting that controls the max size of the
104+
// replicate queue. When this limit is exceeded, lower priority replicas (not
105+
// guaranteed to be the lowest) are dropped from the queue.
106+
var ReplicateQueueMaxSize = settings.RegisterIntSetting(
107+
settings.ApplicationLevel,
108+
"kv.replicate_queue.max_size",
109+
"maximum number of replicas that can be queued for replicate queue processing; "+
110+
"when this limit is exceeded, lower priority (not guaranteed to be the lowest) "+
111+
"replicas are dropped from the queue",
112+
defaultQueueMaxSize,
113+
settings.WithValidateInt(func(v int64) error {
114+
if v < defaultQueueMaxSize {
115+
return errors.Errorf("cannot be set to a value lower than %d: %d", defaultQueueMaxSize, v)
116+
}
117+
return nil
118+
}),
119+
)
120+
103121
var (
104122
metaReplicateQueueAddReplicaCount = metric.Metadata{
105123
Name: "queue.replicate.addreplica",
@@ -524,6 +542,7 @@ func (metrics *ReplicateQueueMetrics) trackResultByAllocatorAction(
524542
// additional replica to their range.
525543
type replicateQueue struct {
526544
*baseQueue
545+
maxSize *settings.IntSetting
527546
metrics ReplicateQueueMetrics
528547
allocator allocatorimpl.Allocator
529548
as *mmaintegration.AllocatorSync
@@ -549,6 +568,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
549568
storePool = store.cfg.StorePool
550569
}
551570
rq := &replicateQueue{
571+
maxSize: ReplicateQueueMaxSize,
552572
metrics: makeReplicateQueueMetrics(),
553573
planner: plan.NewReplicaPlanner(allocator, storePool,
554574
store.TestingKnobs().ReplicaPlannerKnobs),
@@ -578,11 +598,16 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
578598
successes: store.metrics.ReplicateQueueSuccesses,
579599
failures: store.metrics.ReplicateQueueFailures,
580600
pending: store.metrics.ReplicateQueuePending,
601+
full: store.metrics.ReplicateQueueFull,
581602
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
582603
purgatory: store.metrics.ReplicateQueuePurgatory,
583604
disabledConfig: kvserverbase.ReplicateQueueEnabled,
584605
},
585606
)
607+
rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV))
608+
ReplicateQueueMaxSize.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) {
609+
rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV))
610+
})
586611
updateFn := func() {
587612
select {
588613
case rq.updateCh <- timeutil.Now():

pkg/kv/kvserver/store_rebalancer_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,3 +1864,67 @@ func TestingRaftStatusFn(desc *roachpb.RangeDescriptor, storeID roachpb.StoreID)
18641864
}
18651865
return status
18661866
}
1867+
1868+
// TestReplicateQueueMaxSize tests the max size of the replicate queue and
1869+
// verifies that replicas are dropped when the max size is exceeded. It also
1870+
// checks that the metric ReplicateQueueDroppedDueToSize is updated correctly.
1871+
func TestReplicateQueueMaxSize(t *testing.T) {
1872+
defer leaktest.AfterTest(t)()
1873+
defer log.Scope(t).Close(t)
1874+
tc := testContext{}
1875+
stopper := stop.NewStopper()
1876+
ctx := context.Background()
1877+
defer stopper.Stop(ctx)
1878+
tc.Start(ctx, t, stopper)
1879+
1880+
r, err := tc.store.GetReplica(1)
1881+
require.NoError(t, err)
1882+
1883+
stopper, _, _, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, true /* deterministic */)
1884+
defer stopper.Stop(ctx)
1885+
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 1)
1886+
replicateQueue := newReplicateQueue(tc.store, a)
1887+
1888+
// Helper function to add a replica and verify queue state.
1889+
verify := func(expectedLength int, expectedDropped int64) {
1890+
require.Equal(t, expectedLength, replicateQueue.Length())
1891+
require.Equal(t, expectedDropped, replicateQueue.full.Count())
1892+
require.Equal(t, expectedDropped, tc.store.metrics.ReplicateQueueFull.Count())
1893+
}
1894+
1895+
addReplicaAndVerify := func(rangeID roachpb.RangeID, expectedLength int, expectedDropped int64) {
1896+
r.Desc().RangeID = rangeID
1897+
enqueued, err := replicateQueue.testingAdd(context.Background(), r, 0.0)
1898+
require.NoError(t, err)
1899+
require.True(t, enqueued)
1900+
verify(expectedLength, expectedDropped)
1901+
}
1902+
1903+
// First replica should be added.
1904+
addReplicaAndVerify(1 /* rangeID */, 1 /* expectedLength */, 0 /* expectedDropped */)
1905+
// Second replica should be dropped.
1906+
addReplicaAndVerify(2 /* rangeID */, 1 /* expectedLength */, 1 /* expectedDropped */)
1907+
// Third replica should be dropped.
1908+
addReplicaAndVerify(3 /* rangeID */, 1 /* expectedLength */, 2 /* expectedDropped */)
1909+
1910+
// Increase the max size to 100 and add more replicas
1911+
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 100)
1912+
for i := 2; i <= 100; i++ {
1913+
// Should be added.
1914+
addReplicaAndVerify(roachpb.RangeID(i+1 /* rangeID */), i /* expectedLength */, 2 /* expectedDropped */)
1915+
}
1916+
1917+
// Add one more to exceed the max size. Should be dropped.
1918+
addReplicaAndVerify(102 /* rangeID */, 100 /* expectedLength */, 3 /* expectedDropped */)
1919+
1920+
// Reset to the same size should not change the queue length.
1921+
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 100)
1922+
verify(100 /* expectedLength */, 3 /* expectedDropped */)
1923+
1924+
// Decrease the max size to 10 which should drop 90 replicas.
1925+
ReplicateQueueMaxSize.Override(ctx, &tc.store.cfg.Settings.SV, 10)
1926+
verify(10 /* expectedLength */, 93 /* expectedDropped: 3 + 90 */)
1927+
1928+
// Should drop another one now that max size is 10.
1929+
addReplicaAndVerify(103 /* rangeID */, 10 /* expectedLength */, 94 /* expectedDropped: 3 + 90 + 1 */)
1930+
}

0 commit comments

Comments
 (0)