@@ -388,11 +388,21 @@ type queueConfig struct {
388
388
// replicas that have been destroyed but not GCed.
389
389
processDestroyedReplicas bool
390
390
// processTimeout returns the timeout for processing a replica.
391
- processTimeoutFunc queueProcessTimeoutFunc
392
- enqueueAdd * metric.Counter
391
+ processTimeoutFunc queueProcessTimeoutFunc
392
+ // enqueueAdd is a counter of replicas that were successfully added to the
393
+ // queue.
394
+ enqueueAdd * metric.Counter
395
+ // enqueueFailedPrecondition is a counter of replicas that failed the
396
+ // precondition checks and were therefore not added to the queue.
393
397
enqueueFailedPrecondition * metric.Counter
394
- enqueueNoAction * metric.Counter
395
- enqueueUnexpectedError * metric.Counter
398
+ // enqueueNoAction is a counter of replicas that had ShouldQueue determine no
399
+ // action was needed and were therefore not added to the queue.
400
+ enqueueNoAction * metric.Counter
401
+ // enqueueUnexpectedError is a counter of replicas that were expected to be
402
+ // enqueued (either had ShouldQueue return true or the caller explicitly
403
+ // requested to be added to the queue directly), but failed to be enqueued
404
+ // during the enqueue process (such as Async was rated limited).
405
+ enqueueUnexpectedError * metric.Counter
396
406
// successes is a counter of replicas processed successfully.
397
407
successes * metric.Counter
398
408
// failures is a counter of replicas which failed processing.
@@ -787,6 +797,14 @@ func (bq *baseQueue) updateMetricsOnEnqueueUnexpectedError() {
787
797
}
788
798
}
789
799
800
+ // updateMetricsOnEnqueueAdd updates the metrics when a replica is successfully
801
+ // added to the queue.
802
+ func (bq * baseQueue ) updateMetricsOnEnqueueAdd () {
803
+ if bq .enqueueAdd != nil {
804
+ bq .enqueueAdd .Inc (1 )
805
+ }
806
+ }
807
+
790
808
func (bq * baseQueue ) maybeAdd (ctx context.Context , repl replicaInQueue , now hlc.ClockTimestamp ) {
791
809
ctx = repl .AnnotateCtx (ctx )
792
810
ctx = bq .AnnotateCtx (ctx )
@@ -887,9 +905,7 @@ func (bq *baseQueue) addInternal(
887
905
cb processCallback ,
888
906
) (added bool , err error ) {
889
907
defer func () {
890
- if added && bq .enqueueAdd != nil {
891
- bq .enqueueAdd .Inc (1 )
892
- }
908
+ // INVARIANT: added => err == nil.
893
909
if err != nil {
894
910
cb .onEnqueueResult (- 1 /* indexOnHeap */ , err )
895
911
bq .updateMetricsOnEnqueueUnexpectedError ()
@@ -985,6 +1001,10 @@ func (bq *baseQueue) addInternal(
985
1001
default :
986
1002
// No need to signal again.
987
1003
}
1004
+ // Note that we are bumping enqueueAdd here instead of during defer to avoid
1005
+ // treating requeuing a processing replica as newly added. They will be
1006
+ // re-added to the queue later which will double count them.
1007
+ bq .updateMetricsOnEnqueueAdd ()
988
1008
// Note: it may already be dropped or dropped afterwards.
989
1009
cb .onEnqueueResult (item .index /*indexOnHeap*/ , nil )
990
1010
return true , nil
0 commit comments