Skip to content

Commit fc26f6c

Browse files
committed
kvserver: add enqueue metrics to base queue
Previously, observability into base queue enqueuing was limited to pending queue length and process results. This commit adds enqueue-specific metrics for the replicate queue: - queue.replicate.enqueue.add: counts replicas successfully added to the queue - queue.replicate.enqueue.failedprecondition: counts replicas that failed the replicaCanBeProcessed precondition check - queue.replicate.enqueue.noaction: counts replicas skipped because ShouldQueue determined no action was needed - queue.replicate.enqueue.unexpectederror: counts replicas that were expected to be enqueued (ShouldQueue returned true or the caller attempted a direct enqueue) but failed due to unexpected errors
1 parent 02dfa8f commit fc26f6c

File tree

4 files changed

+105
-12
lines changed

4 files changed

+105
-12
lines changed

docs/generated/metrics/metrics.html

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,10 @@
436436
<tr><td>STORAGE</td><td>queue.replicate.addreplica.error</td><td>Number of failed replica additions processed by the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
437437
<tr><td>STORAGE</td><td>queue.replicate.addreplica.success</td><td>Number of successful replica additions processed by the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
438438
<tr><td>STORAGE</td><td>queue.replicate.addvoterreplica</td><td>Number of voter replica additions attempted by the replicate queue</td><td>Replica Additions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
439+
<tr><td>STORAGE</td><td>queue.replicate.enqueue.add</td><td>Number of replicas successfully added to the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
440+
<tr><td>STORAGE</td><td>queue.replicate.enqueue.failedprecondition</td><td>Number of replicas that failed the precondition checks and were therefore not added to the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
441+
<tr><td>STORAGE</td><td>queue.replicate.enqueue.noaction</td><td>Number of replicas for which ShouldQueue determined no action was needed and were therefore not added to the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
442+
<tr><td>STORAGE</td><td>queue.replicate.enqueue.unexpectederror</td><td>Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to add to the replicate queue directly), but failed to be enqueued due to unexpected errors</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
439443
<tr><td>STORAGE</td><td>queue.replicate.nonvoterpromotions</td><td>Number of non-voters promoted to voters by the replicate queue</td><td>Promotions of Non Voters to Voters</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
440444
<tr><td>STORAGE</td><td>queue.replicate.pending</td><td>Number of pending replicas in the replicate queue</td><td>Replicas</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
441445
<tr><td>STORAGE</td><td>queue.replicate.process.failure</td><td>Number of replicas which failed processing in the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>

pkg/kv/kvserver/metrics.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2017,6 +2017,33 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
20172017
Measurement: "Processing Time",
20182018
Unit: metric.Unit_NANOSECONDS,
20192019
}
2020+
metaReplicateQueueEnqueueAdd = metric.Metadata{
2021+
Name: "queue.replicate.enqueue.add",
2022+
Help: "Number of replicas successfully added to the replicate queue",
2023+
Measurement: "Replicas",
2024+
Unit: metric.Unit_COUNT,
2025+
}
2026+
metaReplicateQueueEnqueueFailedPrecondition = metric.Metadata{
2027+
Name: "queue.replicate.enqueue.failedprecondition",
2028+
Help: "Number of replicas that failed the precondition checks and were therefore not added to the replicate " +
2029+
"queue",
2030+
Measurement: "Replicas",
2031+
Unit: metric.Unit_COUNT,
2032+
}
2033+
metaReplicateQueueEnqueueNoAction = metric.Metadata{
2034+
Name: "queue.replicate.enqueue.noaction",
2035+
Help: "Number of replicas for which ShouldQueue determined no action was needed and were therefore not " +
2036+
"added to the replicate queue",
2037+
Measurement: "Replicas",
2038+
Unit: metric.Unit_COUNT,
2039+
}
2040+
metaReplicateQueueEnqueueUnexpectedError = metric.Metadata{
2041+
Name: "queue.replicate.enqueue.unexpectederror",
2042+
Help: "Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to " +
2043+
"add to the replicate queue directly), but failed to be enqueued due to unexpected errors",
2044+
Measurement: "Replicas",
2045+
Unit: metric.Unit_COUNT,
2046+
}
20202047
metaLeaseQueueSuccesses = metric.Metadata{
20212048
Name: "queue.lease.process.success",
20222049
Help: "Number of replicas successfully processed by the replica lease queue",
@@ -2960,6 +2987,10 @@ type StoreMetrics struct {
29602987
ReplicaGCQueueFailures *metric.Counter
29612988
ReplicaGCQueuePending *metric.Gauge
29622989
ReplicaGCQueueProcessingNanos *metric.Counter
2990+
ReplicateQueueEnqueueAdd *metric.Counter
2991+
ReplicateQueueEnqueueFailedPrecondition *metric.Counter
2992+
ReplicateQueueEnqueueNoAction *metric.Counter
2993+
ReplicateQueueEnqueueUnexpectedError *metric.Counter
29632994
ReplicateQueueSuccesses *metric.Counter
29642995
ReplicateQueueFailures *metric.Counter
29652996
ReplicateQueuePending *metric.Gauge
@@ -3742,6 +3773,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
37423773
ReplicaGCQueueFailures: metric.NewCounter(metaReplicaGCQueueFailures),
37433774
ReplicaGCQueuePending: metric.NewGauge(metaReplicaGCQueuePending),
37443775
ReplicaGCQueueProcessingNanos: metric.NewCounter(metaReplicaGCQueueProcessingNanos),
3776+
ReplicateQueueEnqueueAdd: metric.NewCounter(metaReplicateQueueEnqueueAdd),
3777+
ReplicateQueueEnqueueFailedPrecondition: metric.NewCounter(metaReplicateQueueEnqueueFailedPrecondition),
3778+
ReplicateQueueEnqueueNoAction: metric.NewCounter(metaReplicateQueueEnqueueNoAction),
3779+
ReplicateQueueEnqueueUnexpectedError: metric.NewCounter(metaReplicateQueueEnqueueUnexpectedError),
37453780
ReplicateQueueSuccesses: metric.NewCounter(metaReplicateQueueSuccesses),
37463781
ReplicateQueueFailures: metric.NewCounter(metaReplicateQueueFailures),
37473782
ReplicateQueuePending: metric.NewGauge(metaReplicateQueuePending),

pkg/kv/kvserver/queue.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,11 @@ type queueConfig struct {
388388
// replicas that have been destroyed but not GCed.
389389
processDestroyedReplicas bool
390390
// processTimeout returns the timeout for processing a replica.
391-
processTimeoutFunc queueProcessTimeoutFunc
391+
processTimeoutFunc queueProcessTimeoutFunc
392+
enqueueAdd *metric.Counter
393+
enqueueFailedPrecondition *metric.Counter
394+
enqueueNoAction *metric.Counter
395+
enqueueUnexpectedError *metric.Counter
392396
// successes is a counter of replicas processed successfully.
393397
successes *metric.Counter
394398
// failures is a counter of replicas which failed processing.
@@ -737,16 +741,54 @@ func (bq *baseQueue) AddAsyncWithCallback(
737741
h.Add(ctx, repl, prio, cb)
738742
}); err != nil {
739743
cb.onEnqueueResult(-1 /*indexOnHeap*/, err)
744+
bq.updateMetricsOnEnqueueUnexpectedError()
740745
}
741746
}
742747

743748
// AddAsync adds the replica to the queue. Unlike MaybeAddAsync, it will wait
744749
// for other operations to finish instead of turning into a noop (because
745750
// unlikely MaybeAdd, Add is not subject to being called opportunistically).
746751
func (bq *baseQueue) AddAsync(ctx context.Context, repl replicaInQueue, prio float64) {
747-
_ = bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
752+
if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
748753
h.Add(ctx, repl, prio, noopProcessCallback)
749-
})
754+
}); err != nil {
755+
// We don't update metrics in MaybeAddAsync because we don't know if the
756+
// replica should be queued at this point. We only count it as an unexpected
757+
// error when we're certain the replica should be enqueued. In this case,
758+
// the caller explicitly wants to add the replica to the queue directly, so
759+
// we do update the metrics on unexpected error.
760+
bq.updateMetricsOnEnqueueUnexpectedError()
761+
}
762+
}
763+
764+
// updateMetricsOnEnqueueFailedPrecondition updates the metrics when a replica
765+
// fails precondition checks (replicaCanBeProcessed) and should not be
766+
// considered for enqueueing. This may include cases where the replica does not
767+
// have a valid lease, is uninitialized, is destroyed, failed to retrieve span
768+
// conf reader, or unsplit ranges.
769+
func (bq *baseQueue) updateMetricsOnEnqueueFailedPrecondition() {
770+
if bq.enqueueFailedPrecondition != nil {
771+
bq.enqueueFailedPrecondition.Inc(1)
772+
}
773+
}
774+
775+
// updateMetricsOnEnqueueNoAction updates the metrics when shouldQueue
776+
// determines no action is needed and the replica is not added to the queue.
777+
func (bq *baseQueue) updateMetricsOnEnqueueNoAction() {
778+
if bq.enqueueNoAction != nil {
779+
bq.enqueueNoAction.Inc(1)
780+
}
781+
}
782+
783+
// updateMetricsOnEnqueueUnexpectedError updates the metrics when an unexpected
784+
// error occurs during enqueue operations. This should be called for replicas
785+
// that were expected to be enqueued (either had ShouldQueue return true or the
786+
// caller explicitly requested to be added to the queue directly), but failed to
787+
// be enqueued during the enqueue process (such as Async was rated limited).
788+
func (bq *baseQueue) updateMetricsOnEnqueueUnexpectedError() {
789+
if bq.enqueueUnexpectedError != nil {
790+
bq.enqueueUnexpectedError.Inc(1)
791+
}
750792
}
751793

752794
func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
@@ -783,6 +825,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
783825
// Load the system config if it's needed.
784826
confReader, err := bq.replicaCanBeProcessed(ctx, repl, false /* acquireLeaseIfNeeded */)
785827
if err != nil {
828+
bq.updateMetricsOnEnqueueFailedPrecondition()
786829
return
787830
}
788831

@@ -792,17 +835,20 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
792835
realRepl, _ := repl.(*Replica)
793836
should, priority := bq.impl.shouldQueue(ctx, now, realRepl, confReader)
794837
if !should {
838+
bq.updateMetricsOnEnqueueNoAction()
795839
return
796840
}
797841

798842
extConf := bq.skipIfReplicaHasExternalFilesConfig
799843
if extConf != nil && extConf.Get(&bq.store.cfg.Settings.SV) {
800844
hasExternal, err := realRepl.HasExternalBytes()
801845
if err != nil {
846+
bq.updateMetricsOnEnqueueUnexpectedError()
802847
log.Warningf(ctx, "could not determine if %s has external bytes: %s", realRepl, err)
803848
return
804849
}
805850
if hasExternal {
851+
bq.updateMetricsOnEnqueueUnexpectedError()
806852
log.VInfof(ctx, 1, "skipping %s for %s because it has external bytes", bq.name, realRepl)
807853
return
808854
}
@@ -845,8 +891,12 @@ func (bq *baseQueue) addInternal(
845891
cb processCallback,
846892
) (added bool, err error) {
847893
defer func() {
894+
if added && bq.enqueueAdd != nil {
895+
bq.enqueueAdd.Inc(1)
896+
}
848897
if err != nil {
849898
cb.onEnqueueResult(-1 /* indexOnHeap */, err)
899+
bq.updateMetricsOnEnqueueUnexpectedError()
850900
}
851901
}()
852902
// NB: this is intentionally outside of bq.mu to avoid having to consider

pkg/kv/kvserver/replicate_queue.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -603,15 +603,19 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
603603
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
604604
// timeout based on the range size and the sending rate in addition
605605
// to consulting the setting which controls the minimum timeout.
606-
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
607-
successes: store.metrics.ReplicateQueueSuccesses,
608-
failures: store.metrics.ReplicateQueueFailures,
609-
storeFailures: store.metrics.StoreFailures,
610-
pending: store.metrics.ReplicateQueuePending,
611-
full: store.metrics.ReplicateQueueFull,
612-
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
613-
purgatory: store.metrics.ReplicateQueuePurgatory,
614-
disabledConfig: kvserverbase.ReplicateQueueEnabled,
606+
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
607+
successes: store.metrics.ReplicateQueueSuccesses,
608+
failures: store.metrics.ReplicateQueueFailures,
609+
storeFailures: store.metrics.StoreFailures,
610+
pending: store.metrics.ReplicateQueuePending,
611+
full: store.metrics.ReplicateQueueFull,
612+
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
613+
purgatory: store.metrics.ReplicateQueuePurgatory,
614+
disabledConfig: kvserverbase.ReplicateQueueEnabled,
615+
enqueueAdd: store.metrics.ReplicateQueueEnqueueAdd,
616+
enqueueFailedPrecondition: store.metrics.ReplicateQueueEnqueueFailedPrecondition,
617+
enqueueNoAction: store.metrics.ReplicateQueueEnqueueNoAction,
618+
enqueueUnexpectedError: store.metrics.ReplicateQueueEnqueueUnexpectedError,
615619
},
616620
)
617621
rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV))

0 commit comments

Comments
 (0)