@@ -388,11 +388,21 @@ type queueConfig struct {
388
388
// replicas that have been destroyed but not GCed.
389
389
processDestroyedReplicas bool
390
390
// processTimeout returns the timeout for processing a replica.
391
- processTimeoutFunc queueProcessTimeoutFunc
392
- enqueueAdd * metric.Counter
391
+ processTimeoutFunc queueProcessTimeoutFunc
392
+ // enqueueAdd is a counter of replicas that were successfully added to the
393
+ // queue.
394
+ enqueueAdd * metric.Counter
395
+ // enqueueFailedPrecondition is a counter of replicas that failed the
396
+ // precondition checks and were therefore not added to the queue.
393
397
enqueueFailedPrecondition * metric.Counter
394
- enqueueNoAction * metric.Counter
395
- enqueueUnexpectedError * metric.Counter
398
+ // enqueueNoAction is a counter of replicas that had ShouldQueue determine no
399
+ // action was needed and were therefore not added to the queue.
400
+ enqueueNoAction * metric.Counter
401
+ // enqueueUnexpectedError is a counter of replicas that were expected to be
402
+ // enqueued (either had ShouldQueue return true or the caller explicitly
403
+ // requested to be added to the queue directly), but failed to be enqueued
404
+ // during the enqueue process (such as Async was rated limited).
405
+ enqueueUnexpectedError * metric.Counter
396
406
// successes is a counter of replicas processed successfully.
397
407
successes * metric.Counter
398
408
// failures is a counter of replicas which failed processing.
@@ -791,6 +801,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueUnexpectedError() {
791
801
}
792
802
}
793
803
804
+ // updateMetricsOnEnqueueAdd updates the metrics when a replica is successfully
805
+ // added to the queue.
806
+ func (bq * baseQueue ) updateMetricsOnEnqueueAdd () {
807
+ if bq .enqueueAdd != nil {
808
+ bq .enqueueAdd .Inc (1 )
809
+ }
810
+ }
811
+
794
812
func (bq * baseQueue ) maybeAdd (ctx context.Context , repl replicaInQueue , now hlc.ClockTimestamp ) {
795
813
ctx = repl .AnnotateCtx (ctx )
796
814
ctx = bq .AnnotateCtx (ctx )
@@ -891,9 +909,7 @@ func (bq *baseQueue) addInternal(
891
909
cb processCallback ,
892
910
) (added bool , err error ) {
893
911
defer func () {
894
- if added && bq .enqueueAdd != nil {
895
- bq .enqueueAdd .Inc (1 )
896
- }
912
+ // INVARIANT: added => err == nil.
897
913
if err != nil {
898
914
cb .onEnqueueResult (- 1 /* indexOnHeap */ , err )
899
915
bq .updateMetricsOnEnqueueUnexpectedError ()
@@ -989,6 +1005,10 @@ func (bq *baseQueue) addInternal(
989
1005
default :
990
1006
// No need to signal again.
991
1007
}
1008
+ // Note that we are bumping enqueueAdd here instead of during defer to avoid
1009
+ // treating requeuing a processing replica as newly added. They will be
1010
+ // re-added to the queue later which will double count them.
1011
+ bq .updateMetricsOnEnqueueAdd ()
992
1012
// Note: it may already be dropped or dropped afterwards.
993
1013
cb .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
994
1014
return true , nil
0 commit comments