Skip to content

Commit a7fc88c

Browse files
committed
kvserver: add enqueue metrics to base queue
Previously, observability into base queue enqueuing was limited to pending queue length and process results. This commit adds enqueue-specific metrics for the replicate queue: - queue.replicate.enqueue.add: counts replicas successfully added to the queue - queue.replicate.enqueue.failedprecondition: counts replicas that failed the replicaCanBeProcessed precondition check - queue.replicate.enqueue.noaction: counts replicas skipped because ShouldQueue determined no action was needed - queue.replicate.enqueue.unexpectederror: counts replicas that were expected to be enqueued (ShouldQueue returned true or the caller attempted a direct enqueue) but failed due to unexpected errors
1 parent efcf9ce commit a7fc88c

File tree

4 files changed

+104
-11
lines changed

4 files changed

+104
-11
lines changed

docs/generated/metrics/metrics.html

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,10 @@
420420
<tr><td>STORAGE</td><td>queue.replicate.addreplica.error</td><td>Number of failed replica additions processed by the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
421421
<tr><td>STORAGE</td><td>queue.replicate.addreplica.success</td><td>Number of successful replica additions processed by the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
422422
<tr><td>STORAGE</td><td>queue.replicate.addvoterreplica</td><td>Number of voter replica additions attempted by the replicate queue</td><td>Replica Additions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
423+
<tr><td>STORAGE</td><td>queue.replicate.enqueue.add</td><td>Number of replicas successfully added to the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
424+
<tr><td>STORAGE</td><td>queue.replicate.enqueue.failedprecondition</td><td>Number of replicas that failed the precondition checks and were therefore not added to the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
425+
<tr><td>STORAGE</td><td>queue.replicate.enqueue.noaction</td><td>Number of replicas for which ShouldQueue determined no action was needed and were therefore not added to the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
426+
<tr><td>STORAGE</td><td>queue.replicate.enqueue.unexpectederror</td><td>Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to add to the replicate queue directly), but failed to be enqueued due to unexpected errors</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
423427
<tr><td>STORAGE</td><td>queue.replicate.nonvoterpromotions</td><td>Number of non-voters promoted to voters by the replicate queue</td><td>Promotions of Non Voters to Voters</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
424428
<tr><td>STORAGE</td><td>queue.replicate.pending</td><td>Number of pending replicas in the replicate queue</td><td>Replicas</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
425429
<tr><td>STORAGE</td><td>queue.replicate.process.failure</td><td>Number of replicas which failed processing in the replicate queue</td><td>Replicas</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>

pkg/kv/kvserver/metrics.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2032,6 +2032,33 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
20322032
Measurement: "Processing Time",
20332033
Unit: metric.Unit_NANOSECONDS,
20342034
}
2035+
metaReplicateQueueEnqueueAdd = metric.Metadata{
2036+
Name: "queue.replicate.enqueue.add",
2037+
Help: "Number of replicas successfully added to the replicate queue",
2038+
Measurement: "Replicas",
2039+
Unit: metric.Unit_COUNT,
2040+
}
2041+
metaReplicateQueueEnqueueFailedPrecondition = metric.Metadata{
2042+
Name: "queue.replicate.enqueue.failedprecondition",
2043+
Help: "Number of replicas that failed the precondition checks and were therefore not added to the replicate " +
2044+
"queue",
2045+
Measurement: "Replicas",
2046+
Unit: metric.Unit_COUNT,
2047+
}
2048+
metaReplicateQueueEnqueueNoAction = metric.Metadata{
2049+
Name: "queue.replicate.enqueue.noaction",
2050+
Help: "Number of replicas for which ShouldQueue determined no action was needed and were therefore not " +
2051+
"added to the replicate queue",
2052+
Measurement: "Replicas",
2053+
Unit: metric.Unit_COUNT,
2054+
}
2055+
metaReplicateQueueEnqueueUnexpectedError = metric.Metadata{
2056+
Name: "queue.replicate.enqueue.unexpectederror",
2057+
Help: "Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to " +
2058+
"add to the replicate queue directly), but failed to be enqueued due to unexpected errors",
2059+
Measurement: "Replicas",
2060+
Unit: metric.Unit_COUNT,
2061+
}
20352062
metaLeaseQueueSuccesses = metric.Metadata{
20362063
Name: "queue.lease.process.success",
20372064
Help: "Number of replicas successfully processed by the replica lease queue",
@@ -2990,6 +3017,10 @@ type StoreMetrics struct {
29903017
ReplicaGCQueueFailures *metric.Counter
29913018
ReplicaGCQueuePending *metric.Gauge
29923019
ReplicaGCQueueProcessingNanos *metric.Counter
3020+
ReplicateQueueEnqueueAdd *metric.Counter
3021+
ReplicateQueueEnqueueFailedPrecondition *metric.Counter
3022+
ReplicateQueueEnqueueNoAction *metric.Counter
3023+
ReplicateQueueEnqueueUnexpectedError *metric.Counter
29933024
ReplicateQueueSuccesses *metric.Counter
29943025
ReplicateQueueFailures *metric.Counter
29953026
ReplicateQueuePending *metric.Gauge
@@ -3773,6 +3804,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
37733804
ReplicaGCQueueFailures: metric.NewCounter(metaReplicaGCQueueFailures),
37743805
ReplicaGCQueuePending: metric.NewGauge(metaReplicaGCQueuePending),
37753806
ReplicaGCQueueProcessingNanos: metric.NewCounter(metaReplicaGCQueueProcessingNanos),
3807+
ReplicateQueueEnqueueAdd: metric.NewCounter(metaReplicateQueueEnqueueAdd),
3808+
ReplicateQueueEnqueueFailedPrecondition: metric.NewCounter(metaReplicateQueueEnqueueFailedPrecondition),
3809+
ReplicateQueueEnqueueNoAction: metric.NewCounter(metaReplicateQueueEnqueueNoAction),
3810+
ReplicateQueueEnqueueUnexpectedError: metric.NewCounter(metaReplicateQueueEnqueueUnexpectedError),
37763811
ReplicateQueueSuccesses: metric.NewCounter(metaReplicateQueueSuccesses),
37773812
ReplicateQueueFailures: metric.NewCounter(metaReplicateQueueFailures),
37783813
ReplicateQueuePending: metric.NewGauge(metaReplicateQueuePending),

pkg/kv/kvserver/queue.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,11 @@ type queueConfig struct {
388388
// replicas that have been destroyed but not GCed.
389389
processDestroyedReplicas bool
390390
// processTimeout returns the timeout for processing a replica.
391-
processTimeoutFunc queueProcessTimeoutFunc
391+
processTimeoutFunc queueProcessTimeoutFunc
392+
enqueueAdd *metric.Counter
393+
enqueueFailedPrecondition *metric.Counter
394+
enqueueNoAction *metric.Counter
395+
enqueueUnexpectedError *metric.Counter
392396
// successes is a counter of replicas processed successfully.
393397
successes *metric.Counter
394398
// failures is a counter of replicas which failed processing.
@@ -733,16 +737,54 @@ func (bq *baseQueue) AddAsyncWithCallback(
733737
h.Add(ctx, repl, prio, cb)
734738
}); err != nil {
735739
cb.onEnqueueResult(-1 /*indexOnHeap*/, err)
740+
bq.updateMetricsOnEnqueueUnexpectedError()
736741
}
737742
}
738743

739744
// AddAsync adds the replica to the queue. Unlike MaybeAddAsync, it will wait
740745
// for other operations to finish instead of turning into a noop (because
741746
// unlikely MaybeAdd, Add is not subject to being called opportunistically).
742747
func (bq *baseQueue) AddAsync(ctx context.Context, repl replicaInQueue, prio float64) {
743-
_ = bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
748+
if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
744749
h.Add(ctx, repl, prio, noopProcessCallback)
745-
})
750+
}); err != nil {
751+
// We don't update metrics in MaybeAddAsync because we don't know if the
752+
// replica should be queued at this point. We only count it as an unexpected
753+
// error when we're certain the replica should be enqueued. In this case,
754+
// the caller explicitly wants to add the replica to the queue directly, so
755+
// we do update the metrics on unexpected error.
756+
bq.updateMetricsOnEnqueueUnexpectedError()
757+
}
758+
}
759+
760+
// updateMetricsOnEnqueueFailedPrecondition updates the metrics when a replica
761+
// fails precondition checks (replicaCanBeProcessed) and should not be
762+
// considered for enqueueing. This may include cases where the replica does not
763+
// have a valid lease, is uninitialized, is destroyed, failed to retrieve span
764+
// conf reader, or unsplit ranges.
765+
func (bq *baseQueue) updateMetricsOnEnqueueFailedPrecondition() {
766+
if bq.enqueueFailedPrecondition != nil {
767+
bq.enqueueFailedPrecondition.Inc(1)
768+
}
769+
}
770+
771+
// updateMetricsOnEnqueueNoAction updates the metrics when shouldQueue
772+
// determines no action is needed and the replica is not added to the queue.
773+
func (bq *baseQueue) updateMetricsOnEnqueueNoAction() {
774+
if bq.enqueueNoAction != nil {
775+
bq.enqueueNoAction.Inc(1)
776+
}
777+
}
778+
779+
// updateMetricsOnEnqueueUnexpectedError updates the metrics when an unexpected
780+
// error occurs during enqueue operations. This should be called for replicas
781+
// that were expected to be enqueued (either had ShouldQueue return true or the
782+
// caller explicitly requested to be added to the queue directly), but failed to
783+
// be enqueued during the enqueue process (such as Async was rated limited).
784+
func (bq *baseQueue) updateMetricsOnEnqueueUnexpectedError() {
785+
if bq.enqueueUnexpectedError != nil {
786+
bq.enqueueUnexpectedError.Inc(1)
787+
}
746788
}
747789

748790
func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
@@ -779,6 +821,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
779821
// Load the system config if it's needed.
780822
confReader, err := bq.replicaCanBeProcessed(ctx, repl, false /* acquireLeaseIfNeeded */)
781823
if err != nil {
824+
bq.updateMetricsOnEnqueueFailedPrecondition()
782825
return
783826
}
784827

@@ -788,17 +831,20 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
788831
realRepl, _ := repl.(*Replica)
789832
should, priority := bq.impl.shouldQueue(ctx, now, realRepl, confReader)
790833
if !should {
834+
bq.updateMetricsOnEnqueueNoAction()
791835
return
792836
}
793837

794838
extConf := bq.skipIfReplicaHasExternalFilesConfig
795839
if extConf != nil && extConf.Get(&bq.store.cfg.Settings.SV) {
796840
hasExternal, err := realRepl.HasExternalBytes()
797841
if err != nil {
842+
bq.updateMetricsOnEnqueueUnexpectedError()
798843
log.Warningf(ctx, "could not determine if %s has external bytes: %s", realRepl, err)
799844
return
800845
}
801846
if hasExternal {
847+
bq.updateMetricsOnEnqueueUnexpectedError()
802848
log.VInfof(ctx, 1, "skipping %s for %s because it has external bytes", bq.name, realRepl)
803849
return
804850
}
@@ -841,8 +887,12 @@ func (bq *baseQueue) addInternal(
841887
cb processCallback,
842888
) (added bool, err error) {
843889
defer func() {
890+
if added && bq.enqueueAdd != nil {
891+
bq.enqueueAdd.Inc(1)
892+
}
844893
if err != nil {
845894
cb.onEnqueueResult(-1 /* indexOnHeap */, err)
895+
bq.updateMetricsOnEnqueueUnexpectedError()
846896
}
847897
}()
848898
// NB: this is intentionally outside of bq.mu to avoid having to consider

pkg/kv/kvserver/replicate_queue.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -603,14 +603,18 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
603603
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
604604
// timeout based on the range size and the sending rate in addition
605605
// to consulting the setting which controls the minimum timeout.
606-
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
607-
successes: store.metrics.ReplicateQueueSuccesses,
608-
failures: store.metrics.ReplicateQueueFailures,
609-
pending: store.metrics.ReplicateQueuePending,
610-
full: store.metrics.ReplicateQueueFull,
611-
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
612-
purgatory: store.metrics.ReplicateQueuePurgatory,
613-
disabledConfig: kvserverbase.ReplicateQueueEnabled,
606+
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
607+
enqueueAdd: store.metrics.ReplicateQueueEnqueueAdd,
608+
enqueueFailedPrecondition: store.metrics.ReplicateQueueEnqueueFailedPrecondition,
609+
enqueueNoAction: store.metrics.ReplicateQueueEnqueueNoAction,
610+
enqueueUnexpectedError: store.metrics.ReplicateQueueEnqueueUnexpectedError,
611+
successes: store.metrics.ReplicateQueueSuccesses,
612+
failures: store.metrics.ReplicateQueueFailures,
613+
pending: store.metrics.ReplicateQueuePending,
614+
full: store.metrics.ReplicateQueueFull,
615+
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
616+
purgatory: store.metrics.ReplicateQueuePurgatory,
617+
disabledConfig: kvserverbase.ReplicateQueueEnabled,
614618
},
615619
)
616620
rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV))

0 commit comments

Comments
 (0)