@@ -387,11 +387,21 @@ type queueConfig struct {
387
387
// replicas that have been destroyed but not GCed.
388
388
processDestroyedReplicas bool
389
389
// processTimeout returns the timeout for processing a replica.
390
- processTimeoutFunc queueProcessTimeoutFunc
391
- enqueueAdd * metric.Counter
390
+ processTimeoutFunc queueProcessTimeoutFunc
391
+ // enqueueAdd is a counter of replicas that were successfully added to the
392
+ // queue.
393
+ enqueueAdd * metric.Counter
394
+ // enqueueFailedPrecondition is a counter of replicas that failed the
395
+ // precondition checks and were therefore not added to the queue.
392
396
enqueueFailedPrecondition * metric.Counter
393
- enqueueNoAction * metric.Counter
394
- enqueueUnexpectedError * metric.Counter
397
+ // enqueueNoAction is a counter of replicas that had ShouldQueue determine no
398
+ // action was needed and were therefore not added to the queue.
399
+ enqueueNoAction * metric.Counter
400
+ // enqueueUnexpectedError is a counter of replicas that were expected to be
401
+ // enqueued (either had ShouldQueue return true or the caller explicitly
402
+ // requested to be added to the queue directly), but failed to be enqueued
403
+ // during the enqueue process (such as Async was rated limited).
404
+ enqueueUnexpectedError * metric.Counter
395
405
// successes is a counter of replicas processed successfully.
396
406
successes * metric.Counter
397
407
// failures is a counter of replicas which failed processing.
@@ -787,6 +797,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueUnexpectedError() {
787
797
}
788
798
}
789
799
800
+ // updateMetricsOnEnqueueAdd updates the metrics when a replica is successfully
801
+ // added to the queue.
802
+ func (bq * baseQueue ) updateMetricsOnEnqueueAdd () {
803
+ if bq .enqueueAdd != nil {
804
+ bq .enqueueAdd .Inc (1 )
805
+ }
806
+ }
807
+
790
808
func (bq * baseQueue ) maybeAdd (ctx context.Context , repl replicaInQueue , now hlc.ClockTimestamp ) {
791
809
ctx = repl .AnnotateCtx (ctx )
792
810
ctx = bq .AnnotateCtx (ctx )
@@ -887,9 +905,7 @@ func (bq *baseQueue) addInternal(
887
905
cb processCallback ,
888
906
) (added bool , err error ) {
889
907
defer func () {
890
- if added && bq .enqueueAdd != nil {
891
- bq .enqueueAdd .Inc (1 )
892
- }
908
+ // INVARIANT: added => err == nil.
893
909
if err != nil {
894
910
cb .onEnqueueResult (- 1 /* indexOnHeap */ , err )
895
911
bq .updateMetricsOnEnqueueUnexpectedError ()
@@ -985,6 +1001,10 @@ func (bq *baseQueue) addInternal(
985
1001
default :
986
1002
// No need to signal again.
987
1003
}
1004
+ // Note that we are bumping enqueueAdd here instead of during defer to avoid
1005
+ // treating requeuing a processing replica as newly added. They will be
1006
+ // re-added to the queue later which will double count them.
1007
+ bq .updateMetricsOnEnqueueAdd ()
988
1008
// Note: it may already be dropped or dropped afterwards.
989
1009
cb .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
990
1010
return true , nil
0 commit comments