66
77 "k8s.io/client-go/util/workqueue"
88 "k8s.io/utils/clock"
9+ "sigs.k8s.io/controller-runtime/pkg/internal/metrics"
910)
1011
1112// This file is mostly a copy of unexported code from
@@ -14,8 +15,9 @@ import (
1415// The only two differences are the addition of mapLock in defaultQueueMetrics and converging retryMetrics into queueMetrics.
1516
1617type queueMetrics [T comparable ] interface {
17- add (item T )
18- get (item T )
18+ add (item T , priority int )
19+ get (item T , priority int )
20+ updateDepthWithPriorityMetric (oldPriority , newPriority int )
1921 done (item T )
2022 updateUnfinishedWork ()
2123 retry ()
@@ -25,9 +27,9 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl
2527 if len (name ) == 0 {
2628 return noMetrics [T ]{}
2729 }
28- return & defaultQueueMetrics [T ]{
30+
31+ dqm := & defaultQueueMetrics [T ]{
2932 clock : clock ,
30- depth : mp .NewDepthMetric (name ),
3133 adds : mp .NewAddsMetric (name ),
3234 latency : mp .NewLatencyMetric (name ),
3335 workDuration : mp .NewWorkDurationMetric (name ),
@@ -37,14 +39,22 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl
3739 processingStartTimes : map [T ]time.Time {},
3840 retries : mp .NewRetriesMetric (name ),
3941 }
42+
43+ if mpp , ok := mp .(metrics.MetricsProviderWithPriority ); ok {
44+ dqm .depthWithPriority = mpp .NewDepthMetricWithPriority (name )
45+ } else {
46+ dqm .depth = mp .NewDepthMetric (name )
47+ }
48+ return dqm
4049}
4150
4251// defaultQueueMetrics expects the caller to lock before setting any metrics.
4352type defaultQueueMetrics [T comparable ] struct {
4453 clock clock.Clock
4554
4655 // current depth of a workqueue
47- depth workqueue.GaugeMetric
56+ depth workqueue.GaugeMetric
57+ depthWithPriority metrics.DepthMetricWithPriority
4858 // total number of adds handled by a workqueue
4959 adds workqueue.CounterMetric
5060 // how long an item stays in a workqueue
@@ -64,13 +74,17 @@ type defaultQueueMetrics[T comparable] struct {
6474}
6575
6676// add is called for ready items only
67- func (m * defaultQueueMetrics [T ]) add (item T ) {
77+ func (m * defaultQueueMetrics [T ]) add (item T , priority int ) {
6878 if m == nil {
6979 return
7080 }
7181
7282 m .adds .Inc ()
73- m .depth .Inc ()
83+ if m .depthWithPriority != nil {
84+ m .depthWithPriority .Inc (priority )
85+ } else {
86+ m .depth .Inc ()
87+ }
7488
7589 m .mapLock .Lock ()
7690 defer m .mapLock .Unlock ()
@@ -80,12 +94,16 @@ func (m *defaultQueueMetrics[T]) add(item T) {
8094 }
8195}
8296
83- func (m * defaultQueueMetrics [T ]) get (item T ) {
97+ func (m * defaultQueueMetrics [T ]) get (item T , priority int ) {
8498 if m == nil {
8599 return
86100 }
87101
88- m .depth .Dec ()
102+ if m .depthWithPriority != nil {
103+ m .depthWithPriority .Dec (priority )
104+ } else {
105+ m .depth .Dec ()
106+ }
89107
90108 m .mapLock .Lock ()
91109 defer m .mapLock .Unlock ()
@@ -97,6 +115,13 @@ func (m *defaultQueueMetrics[T]) get(item T) {
97115 }
98116}
99117
118+ func (m * defaultQueueMetrics [T ]) updateDepthWithPriorityMetric (oldPriority , newPriority int ) {
119+ if m .depthWithPriority != nil {
120+ m .depthWithPriority .Dec (oldPriority )
121+ m .depthWithPriority .Inc (newPriority )
122+ }
123+ }
124+
100125func (m * defaultQueueMetrics [T ]) done (item T ) {
101126 if m == nil {
102127 return
@@ -139,8 +164,9 @@ func (m *defaultQueueMetrics[T]) retry() {
139164
140165type noMetrics [T any ] struct {}
141166
142- func (noMetrics [T ]) add (item T ) {}
143- func (noMetrics [T ]) get (item T ) {}
144- func (noMetrics [T ]) done (item T ) {}
145- func (noMetrics [T ]) updateUnfinishedWork () {}
146- func (noMetrics [T ]) retry () {}
167+ func (noMetrics [T ]) add (item T , priority int ) {}
168+ func (noMetrics [T ]) get (item T , priority int ) {}
169+ func (noMetrics [T ]) updateDepthWithPriorityMetric (oldPriority , newPriority int ) {}
170+ func (noMetrics [T ]) done (item T ) {}
171+ func (noMetrics [T ]) updateUnfinishedWork () {}
172+ func (noMetrics [T ]) retry () {}
0 commit comments