Skip to content

Commit 6268773

Browse files
committed
kvserver: add enqueue metrics to base queue
Previously, observability into base queue enqueuing was limited to pending queue length and process results. This commit adds enqueue-specific metrics for the replicate queue: - queue.replicate.enqueue.add: counts replicas successfully added to the queue - queue.replicate.enqueue.failedprecondition: counts replicas that failed the replicaCanBeProcessed precondition check - queue.replicate.enqueue.noaction: counts replicas skipped because ShouldQueue determined no action was needed - queue.replicate.enqueue.unexpectederror: counts replicas that were expected to be enqueued (ShouldQueue returned true or the caller attempted a direct enqueue) but failed due to unexpected errors
1 parent fef65e5 commit 6268773

File tree

4 files changed

+132
-11
lines changed

4 files changed

+132
-11
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13903,6 +13903,38 @@ layers:
1390313903
unit: COUNT
1390413904
aggregation: AVG
1390513905
derivative: NON_NEGATIVE_DERIVATIVE
13906+
- name: queue.replicate.enqueue.add
13907+
exported_name: queue_replicate_enqueue_add
13908+
description: Number of replicas successfully added to the replicate queue
13909+
y_axis_label: Replicas
13910+
type: COUNTER
13911+
unit: COUNT
13912+
aggregation: AVG
13913+
derivative: NON_NEGATIVE_DERIVATIVE
13914+
- name: queue.replicate.enqueue.failedprecondition
13915+
exported_name: queue_replicate_enqueue_failedprecondition
13916+
description: Number of replicas that failed the precondition checks and were therefore not added to the replicate queue
13917+
y_axis_label: Replicas
13918+
type: COUNTER
13919+
unit: COUNT
13920+
aggregation: AVG
13921+
derivative: NON_NEGATIVE_DERIVATIVE
13922+
- name: queue.replicate.enqueue.noaction
13923+
exported_name: queue_replicate_enqueue_noaction
13924+
description: Number of replicas for which ShouldQueue determined no action was needed and were therefore not added to the replicate queue
13925+
y_axis_label: Replicas
13926+
type: COUNTER
13927+
unit: COUNT
13928+
aggregation: AVG
13929+
derivative: NON_NEGATIVE_DERIVATIVE
13930+
- name: queue.replicate.enqueue.unexpectederror
13931+
exported_name: queue_replicate_enqueue_unexpectederror
13932+
description: Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to add to the replicate queue directly), but failed to be enqueued due to unexpected errors
13933+
y_axis_label: Replicas
13934+
type: COUNTER
13935+
unit: COUNT
13936+
aggregation: AVG
13937+
derivative: NON_NEGATIVE_DERIVATIVE
1390613938
- name: queue.replicate.nonvoterpromotions
1390713939
exported_name: queue_replicate_nonvoterpromotions
1390813940
description: Number of non-voters promoted to voters by the replicate queue

pkg/kv/kvserver/metrics.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2172,6 +2172,33 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
21722172
Measurement: "Processing Time",
21732173
Unit: metric.Unit_NANOSECONDS,
21742174
}
2175+
metaReplicateQueueEnqueueAdd = metric.Metadata{
2176+
Name: "queue.replicate.enqueue.add",
2177+
Help: "Number of replicas successfully added to the replicate queue",
2178+
Measurement: "Replicas",
2179+
Unit: metric.Unit_COUNT,
2180+
}
2181+
metaReplicateQueueEnqueueFailedPrecondition = metric.Metadata{
2182+
Name: "queue.replicate.enqueue.failedprecondition",
2183+
Help: "Number of replicas that failed the precondition checks and were therefore not added to the replicate " +
2184+
"queue",
2185+
Measurement: "Replicas",
2186+
Unit: metric.Unit_COUNT,
2187+
}
2188+
metaReplicateQueueEnqueueNoAction = metric.Metadata{
2189+
Name: "queue.replicate.enqueue.noaction",
2190+
Help: "Number of replicas for which ShouldQueue determined no action was needed and were therefore not " +
2191+
"added to the replicate queue",
2192+
Measurement: "Replicas",
2193+
Unit: metric.Unit_COUNT,
2194+
}
2195+
metaReplicateQueueEnqueueUnexpectedError = metric.Metadata{
2196+
Name: "queue.replicate.enqueue.unexpectederror",
2197+
Help: "Number of replicas that were expected to be enqueued (ShouldQueue returned true or the caller decided to " +
2198+
"add to the replicate queue directly), but failed to be enqueued due to unexpected errors",
2199+
Measurement: "Replicas",
2200+
Unit: metric.Unit_COUNT,
2201+
}
21752202
metaLeaseQueueSuccesses = metric.Metadata{
21762203
Name: "queue.lease.process.success",
21772204
Help: "Number of replicas successfully processed by the replica lease queue",
@@ -3220,6 +3247,10 @@ type StoreMetrics struct {
32203247
ReplicaGCQueueFailures *metric.Counter
32213248
ReplicaGCQueuePending *metric.Gauge
32223249
ReplicaGCQueueProcessingNanos *metric.Counter
3250+
ReplicateQueueEnqueueAdd *metric.Counter
3251+
ReplicateQueueEnqueueFailedPrecondition *metric.Counter
3252+
ReplicateQueueEnqueueNoAction *metric.Counter
3253+
ReplicateQueueEnqueueUnexpectedError *metric.Counter
32233254
ReplicateQueueSuccesses *metric.Counter
32243255
ReplicateQueueFailures *metric.Counter
32253256
ReplicateQueuePending *metric.Gauge
@@ -4014,6 +4045,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
40144045
ReplicaGCQueueFailures: metric.NewCounter(metaReplicaGCQueueFailures),
40154046
ReplicaGCQueuePending: metric.NewGauge(metaReplicaGCQueuePending),
40164047
ReplicaGCQueueProcessingNanos: metric.NewCounter(metaReplicaGCQueueProcessingNanos),
4048+
ReplicateQueueEnqueueAdd: metric.NewCounter(metaReplicateQueueEnqueueAdd),
4049+
ReplicateQueueEnqueueFailedPrecondition: metric.NewCounter(metaReplicateQueueEnqueueFailedPrecondition),
4050+
ReplicateQueueEnqueueNoAction: metric.NewCounter(metaReplicateQueueEnqueueNoAction),
4051+
ReplicateQueueEnqueueUnexpectedError: metric.NewCounter(metaReplicateQueueEnqueueUnexpectedError),
40174052
ReplicateQueueSuccesses: metric.NewCounter(metaReplicateQueueSuccesses),
40184053
ReplicateQueueFailures: metric.NewCounter(metaReplicateQueueFailures),
40194054
ReplicateQueuePending: metric.NewGauge(metaReplicateQueuePending),

pkg/kv/kvserver/queue.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,11 @@ type queueConfig struct {
387387
// replicas that have been destroyed but not GCed.
388388
processDestroyedReplicas bool
389389
// processTimeout returns the timeout for processing a replica.
390-
processTimeoutFunc queueProcessTimeoutFunc
390+
processTimeoutFunc queueProcessTimeoutFunc
391+
enqueueAdd *metric.Counter
392+
enqueueFailedPrecondition *metric.Counter
393+
enqueueNoAction *metric.Counter
394+
enqueueUnexpectedError *metric.Counter
391395
// successes is a counter of replicas processed successfully.
392396
successes *metric.Counter
393397
// failures is a counter of replicas which failed processing.
@@ -733,16 +737,54 @@ func (bq *baseQueue) AddAsyncWithCallback(
733737
h.Add(ctx, repl, prio, cb)
734738
}); err != nil {
735739
cb.onEnqueueResult(-1 /*indexOnHeap*/, err)
740+
bq.updateMetricsOnEnqueueUnexpectedError()
736741
}
737742
}
738743

739744
// AddAsync adds the replica to the queue. Unlike MaybeAddAsync, it will wait
740745
// for other operations to finish instead of turning into a noop (because
741746
// unlikely MaybeAdd, Add is not subject to being called opportunistically).
742747
func (bq *baseQueue) AddAsync(ctx context.Context, repl replicaInQueue, prio float64) {
743-
_ = bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
748+
if err := bq.Async(ctx, "Add", true /* wait */, func(ctx context.Context, h queueHelper) {
744749
h.Add(ctx, repl, prio, noopProcessCallback)
745-
})
750+
}); err != nil {
751+
// We don't update metrics in MaybeAddAsync because we don't know if the
752+
// replica should be queued at this point. We only count it as an unexpected
753+
// error when we're certain the replica should be enqueued. In this case,
754+
// the caller explicitly wants to add the replica to the queue directly, so
755+
// we do update the metrics on unexpected error.
756+
bq.updateMetricsOnEnqueueUnexpectedError()
757+
}
758+
}
759+
760+
// updateMetricsOnEnqueueFailedPrecondition updates the metrics when a replica
761+
// fails precondition checks (replicaCanBeProcessed) and should not be
762+
// considered for enqueueing. This may include cases where the replica does not
763+
// have a valid lease, is uninitialized, is destroyed, failed to retrieve span
764+
// conf reader, or unsplit ranges.
765+
func (bq *baseQueue) updateMetricsOnEnqueueFailedPrecondition() {
766+
if bq.enqueueFailedPrecondition != nil {
767+
bq.enqueueFailedPrecondition.Inc(1)
768+
}
769+
}
770+
771+
// updateMetricsOnEnqueueNoAction updates the metrics when shouldQueue
772+
// determines no action is needed and the replica is not added to the queue.
773+
func (bq *baseQueue) updateMetricsOnEnqueueNoAction() {
774+
if bq.enqueueNoAction != nil {
775+
bq.enqueueNoAction.Inc(1)
776+
}
777+
}
778+
779+
// updateMetricsOnEnqueueUnexpectedError updates the metrics when an unexpected
780+
// error occurs during enqueue operations. This should be called for replicas
781+
// that were expected to be enqueued (either had ShouldQueue return true or the
782+
// caller explicitly requested to be added to the queue directly), but failed to
783+
// be enqueued during the enqueue process (such as Async was rated limited).
784+
func (bq *baseQueue) updateMetricsOnEnqueueUnexpectedError() {
785+
if bq.enqueueUnexpectedError != nil {
786+
bq.enqueueUnexpectedError.Inc(1)
787+
}
746788
}
747789

748790
func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.ClockTimestamp) {
@@ -779,6 +821,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
779821
// Load the system config if it's needed.
780822
confReader, err := bq.replicaCanBeProcessed(ctx, repl, false /* acquireLeaseIfNeeded */)
781823
if err != nil {
824+
bq.updateMetricsOnEnqueueFailedPrecondition()
782825
return
783826
}
784827

@@ -788,17 +831,20 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
788831
realRepl, _ := repl.(*Replica)
789832
should, priority := bq.impl.shouldQueue(ctx, now, realRepl, confReader)
790833
if !should {
834+
bq.updateMetricsOnEnqueueNoAction()
791835
return
792836
}
793837

794838
extConf := bq.skipIfReplicaHasExternalFilesConfig
795839
if extConf != nil && extConf.Get(&bq.store.cfg.Settings.SV) {
796840
hasExternal, err := realRepl.HasExternalBytes()
797841
if err != nil {
842+
bq.updateMetricsOnEnqueueUnexpectedError()
798843
log.Dev.Warningf(ctx, "could not determine if %s has external bytes: %s", realRepl, err)
799844
return
800845
}
801846
if hasExternal {
847+
bq.updateMetricsOnEnqueueUnexpectedError()
802848
log.Dev.VInfof(ctx, 1, "skipping %s for %s because it has external bytes", bq.name, realRepl)
803849
return
804850
}
@@ -841,8 +887,12 @@ func (bq *baseQueue) addInternal(
841887
cb processCallback,
842888
) (added bool, err error) {
843889
defer func() {
890+
if added && bq.enqueueAdd != nil {
891+
bq.enqueueAdd.Inc(1)
892+
}
844893
if err != nil {
845894
cb.onEnqueueResult(-1 /* indexOnHeap */, err)
895+
bq.updateMetricsOnEnqueueUnexpectedError()
846896
}
847897
}()
848898
// NB: this is intentionally outside of bq.mu to avoid having to consider

pkg/kv/kvserver/replicate_queue.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -608,14 +608,18 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
608608
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
609609
// timeout based on the range size and the sending rate in addition
610610
// to consulting the setting which controls the minimum timeout.
611-
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
612-
successes: store.metrics.ReplicateQueueSuccesses,
613-
failures: store.metrics.ReplicateQueueFailures,
614-
pending: store.metrics.ReplicateQueuePending,
615-
full: store.metrics.ReplicateQueueFull,
616-
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
617-
purgatory: store.metrics.ReplicateQueuePurgatory,
618-
disabledConfig: kvserverbase.ReplicateQueueEnabled,
611+
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
612+
enqueueAdd: store.metrics.ReplicateQueueEnqueueAdd,
613+
enqueueFailedPrecondition: store.metrics.ReplicateQueueEnqueueFailedPrecondition,
614+
enqueueNoAction: store.metrics.ReplicateQueueEnqueueNoAction,
615+
enqueueUnexpectedError: store.metrics.ReplicateQueueEnqueueUnexpectedError,
616+
successes: store.metrics.ReplicateQueueSuccesses,
617+
failures: store.metrics.ReplicateQueueFailures,
618+
pending: store.metrics.ReplicateQueuePending,
619+
full: store.metrics.ReplicateQueueFull,
620+
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
621+
purgatory: store.metrics.ReplicateQueuePurgatory,
622+
disabledConfig: kvserverbase.ReplicateQueueEnabled,
619623
},
620624
)
621625
rq.baseQueue.SetMaxSize(ReplicateQueueMaxSize.Get(&store.cfg.Settings.SV))

0 commit comments

Comments
 (0)