diff --git a/pkg/controller/priorityqueue/metrics.go b/pkg/controller/priorityqueue/metrics.go index 36626646f4..967a252dfb 100644 --- a/pkg/controller/priorityqueue/metrics.go +++ b/pkg/controller/priorityqueue/metrics.go @@ -6,6 +6,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/internal/metrics" ) // This file is mostly a copy of unexported code from @@ -14,8 +15,9 @@ import ( // The only two differences are the addition of mapLock in defaultQueueMetrics and converging retryMetrics into queueMetrics. type queueMetrics[T comparable] interface { - add(item T) - get(item T) + add(item T, priority int) + get(item T, priority int) + updateDepthWithPriorityMetric(oldPriority, newPriority int) done(item T) updateUnfinishedWork() retry() @@ -25,9 +27,9 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl if len(name) == 0 { return noMetrics[T]{} } - return &defaultQueueMetrics[T]{ + + dqm := &defaultQueueMetrics[T]{ clock: clock, - depth: mp.NewDepthMetric(name), adds: mp.NewAddsMetric(name), latency: mp.NewLatencyMetric(name), workDuration: mp.NewWorkDurationMetric(name), @@ -37,6 +39,13 @@ func newQueueMetrics[T comparable](mp workqueue.MetricsProvider, name string, cl processingStartTimes: map[T]time.Time{}, retries: mp.NewRetriesMetric(name), } + + if mpp, ok := mp.(metrics.MetricsProviderWithPriority); ok { + dqm.depthWithPriority = mpp.NewDepthMetricWithPriority(name) + } else { + dqm.depth = mp.NewDepthMetric(name) + } + return dqm } // defaultQueueMetrics expects the caller to lock before setting any metrics. @@ -44,7 +53,8 @@ type defaultQueueMetrics[T comparable] struct { clock clock.Clock // current depth of a workqueue - depth workqueue.GaugeMetric + depth workqueue.GaugeMetric + depthWithPriority metrics.DepthMetricWithPriority // total number of adds handled by a workqueue adds workqueue.CounterMetric // how long an item stays in a workqueue @@ -64,13 +74,17 @@ type defaultQueueMetrics[T comparable] struct { } // add is called for ready items only -func (m *defaultQueueMetrics[T]) add(item T) { +func (m *defaultQueueMetrics[T]) add(item T, priority int) { if m == nil { return } m.adds.Inc() - m.depth.Inc() + if m.depthWithPriority != nil { + m.depthWithPriority.Inc(priority) + } else { + m.depth.Inc() + } m.mapLock.Lock() defer m.mapLock.Unlock() @@ -80,12 +94,16 @@ func (m *defaultQueueMetrics[T]) add(item T) { } } -func (m *defaultQueueMetrics[T]) get(item T) { +func (m *defaultQueueMetrics[T]) get(item T, priority int) { if m == nil { return } - m.depth.Dec() + if m.depthWithPriority != nil { + m.depthWithPriority.Dec(priority) + } else { + m.depth.Dec() + } m.mapLock.Lock() defer m.mapLock.Unlock() @@ -97,6 +115,13 @@ func (m *defaultQueueMetrics[T]) get(item T) { } } +func (m *defaultQueueMetrics[T]) updateDepthWithPriorityMetric(oldPriority, newPriority int) { + if m.depthWithPriority != nil { + m.depthWithPriority.Dec(oldPriority) + m.depthWithPriority.Inc(newPriority) + } +} + func (m *defaultQueueMetrics[T]) done(item T) { if m == nil { return @@ -139,8 +164,9 @@ func (m *defaultQueueMetrics[T]) retry() { type noMetrics[T any] struct{} -func (noMetrics[T]) add(item T) {} -func (noMetrics[T]) get(item T) {} -func (noMetrics[T]) done(item T) {} -func (noMetrics[T]) updateUnfinishedWork() {} -func (noMetrics[T]) retry() {} +func (noMetrics[T]) add(item T, priority int) {} +func (noMetrics[T]) get(item T, priority int) {} +func (noMetrics[T]) updateDepthWithPriorityMetric(oldPriority, newPriority int) {} +func (noMetrics[T]) done(item T) {} +func (noMetrics[T]) updateUnfinishedWork() {} +func (noMetrics[T]) retry() {} diff --git a/pkg/controller/priorityqueue/metrics_test.go b/pkg/controller/priorityqueue/metrics_test.go index 634250b93e..3be3989d89 100644 --- a/pkg/controller/priorityqueue/metrics_test.go +++ b/pkg/controller/priorityqueue/metrics_test.go @@ -4,11 +4,12 @@ import ( "sync" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/internal/metrics" ) func newFakeMetricsProvider() *fakeMetricsProvider { return &fakeMetricsProvider{ - depth: make(map[string]int), + depth: make(map[string]map[int]int), adds: make(map[string]int), latency: make(map[string][]float64), workDuration: make(map[string][]float64), @@ -19,8 +20,10 @@ func newFakeMetricsProvider() *fakeMetricsProvider { } } +var _ metrics.MetricsProviderWithPriority = &fakeMetricsProvider{} + type fakeMetricsProvider struct { - depth map[string]int + depth map[string]map[int]int adds map[string]int latency map[string][]float64 workDuration map[string][]float64 @@ -31,9 +34,13 @@ type fakeMetricsProvider struct { } func (f *fakeMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { + panic("Should never be called. Expected NewDepthMetricWithPriority to be called instead") +} + +func (f *fakeMetricsProvider) NewDepthMetricWithPriority(name string) metrics.DepthMetricWithPriority { f.mu.Lock() defer f.mu.Unlock() - f.depth[name] = 0 + f.depth[name] = map[int]int{} return &fakeGaugeMetric{m: &f.depth, mu: &f.mu, name: name} } @@ -80,21 +87,21 @@ func (f *fakeMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMet } type fakeGaugeMetric struct { - m *map[string]int + m *map[string]map[int]int mu *sync.Mutex name string } -func (fg *fakeGaugeMetric) Inc() { +func (fg *fakeGaugeMetric) Inc(priority int) { fg.mu.Lock() defer fg.mu.Unlock() - (*fg.m)[fg.name]++ + (*fg.m)[fg.name][priority]++ } -func (fg *fakeGaugeMetric) Dec() { +func (fg *fakeGaugeMetric) Dec(priority int) { fg.mu.Lock() defer fg.mu.Unlock() - (*fg.m)[fg.name]-- + (*fg.m)[fg.name][priority]-- } type fakeCounterMetric struct { diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index ff5dea9021..c3f77a6f39 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -156,7 +156,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { w.items[key] = item w.queue.ReplaceOrInsert(item) if item.ReadyAt == nil { - w.metrics.add(key) + w.metrics.add(key, item.Priority) } w.addedCounter++ continue @@ -166,12 +166,16 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { // will affect the order - Just delete and re-add. item, _ := w.queue.Delete(w.items[key]) if o.Priority > item.Priority { + // Update depth metric only if the item in the queue was already added to the depth metric. + if item.ReadyAt == nil || w.becameReady.Has(key) { + w.metrics.updateDepthWithPriorityMetric(item.Priority, o.Priority) + } item.Priority = o.Priority } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { if readyAt == nil && !w.becameReady.Has(key) { - w.metrics.add(key) + w.metrics.add(key, item.Priority) } item.ReadyAt = readyAt } @@ -223,7 +227,7 @@ func (w *priorityqueue[T]) spin() { return false } if !w.becameReady.Has(item.Key) { - w.metrics.add(item.Key) + w.metrics.add(item.Key, item.Priority) w.becameReady.Insert(item.Key) } } @@ -239,7 +243,7 @@ func (w *priorityqueue[T]) spin() { return true } - w.metrics.get(item.Key) + w.metrics.get(item.Key, item.Priority) w.locked.Insert(item.Key) w.waiters.Add(-1) delete(w.items, item.Key) diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index f54d3cc11c..ec0f36e95f 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -23,7 +23,7 @@ var _ = Describe("Controllerworkqueue", func() { item, _, _ := q.GetWithPriority() Expect(item).To(Equal("foo")) - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) Expect(metrics.adds["test"]).To(Equal(1)) Expect(metrics.retries["test"]).To(Equal(0)) }) @@ -40,7 +40,7 @@ var _ = Describe("Controllerworkqueue", func() { item, _, _ = q.GetWithPriority() Expect(item).To(Equal("bar")) - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) Expect(metrics.adds["test"]).To(Equal(2)) }) @@ -58,7 +58,7 @@ var _ = Describe("Controllerworkqueue", func() { item, _, _ = q.GetWithPriority() Expect(item).To(Equal("bar")) - Expect(metrics.depth["test"]).To(Equal(1)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) Expect(metrics.adds["test"]).To(Equal(3)) }) @@ -81,7 +81,7 @@ var _ = Describe("Controllerworkqueue", func() { item, _, _ = q.GetWithPriority() Expect(item).To(Equal("foo")) - Expect(metrics.depth["test"]).To(Equal(1)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) Expect(metrics.adds["test"]).To(Equal(4)) }) @@ -98,7 +98,7 @@ var _ = Describe("Controllerworkqueue", func() { cwq.lockedLock.Lock() Expect(cwq.locked.Len()).To(Equal(0)) - Expect(metrics.depth["test"]).To(Equal(1)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) Expect(metrics.adds["test"]).To(Equal(1)) }) @@ -115,7 +115,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(0)) - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{1: 0, 2: 0})) Expect(metrics.adds["test"]).To(Equal(1)) }) @@ -134,7 +134,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(2)) - Expect(metrics.depth["test"]).To(Equal(2)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2, 1: 0})) Expect(metrics.adds["test"]).To(Equal(3)) }) @@ -150,7 +150,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(priority).To(Equal(0)) Expect(q.Len()).To(Equal(0)) - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) Expect(metrics.adds["test"]).To(Equal(1)) }) @@ -191,7 +191,7 @@ var _ = Describe("Controllerworkqueue", func() { tick <- now Eventually(retrievedItem).Should(BeClosed()) - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) Expect(metrics.adds["test"]).To(Equal(1)) Expect(metrics.retries["test"]).To(Equal(1)) }) @@ -217,7 +217,7 @@ var _ = Describe("Controllerworkqueue", func() { q.AddWithOpts(AddOpts{}, "foo") Eventually(retrieved).Should(BeClosed()) - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) Expect(metrics.adds["test"]).To(Equal(1)) }) @@ -282,7 +282,7 @@ var _ = Describe("Controllerworkqueue", func() { Eventually(retrievedItem).Should(BeClosed()) Eventually(retrievedSecondItem).Should(BeClosed()) - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) Expect(metrics.adds["test"]).To(Equal(2)) }) @@ -297,7 +297,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(2)) Expect(metrics.depth).To(HaveLen(1)) - Expect(metrics.depth["test"]).To(Equal(2)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2})) }) It("items are included in Len() and the queueDepth metric once they are ready", func() { @@ -311,12 +311,12 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(2)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(2)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2})) metrics.mu.Unlock() time.Sleep(time.Second) Expect(q.Len()).To(Equal(4)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(4)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 4})) metrics.mu.Unlock() // Drain queue @@ -326,7 +326,7 @@ var _ = Describe("Controllerworkqueue", func() { } Expect(q.Len()).To(Equal(0)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) metrics.mu.Unlock() // Validate that doing it again still works to notice bugs with removing @@ -338,12 +338,12 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(2)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(2)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 2})) metrics.mu.Unlock() time.Sleep(time.Second) Expect(q.Len()).To(Equal(4)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(4)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 4})) metrics.mu.Unlock() }) @@ -388,14 +388,14 @@ var _ = Describe("Controllerworkqueue", func() { q.AddWithOpts(AddOpts{After: time.Hour}, "foo") Expect(q.Len()).To(Equal(0)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{})) metrics.mu.Unlock() q.AddWithOpts(AddOpts{}, "foo") Expect(q.Len()).To(Equal(1)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(1)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) metrics.mu.Unlock() // Get the item to ensure the codepath in @@ -406,7 +406,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(item).To(Equal("foo")) Expect(q.Len()).To(Equal(0)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) metrics.mu.Unlock() }) @@ -419,13 +419,13 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(1)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(1)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) metrics.mu.Unlock() q.AddWithOpts(AddOpts{}, "foo") Expect(q.Len()).To(Equal(1)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(1)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 1})) metrics.mu.Unlock() // Get the item to ensure the codepath in @@ -436,7 +436,7 @@ var _ = Describe("Controllerworkqueue", func() { Expect(item).To(Equal("foo")) Expect(q.Len()).To(Equal(0)) metrics.mu.Lock() - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) metrics.mu.Unlock() }) @@ -501,7 +501,7 @@ var _ = Describe("Controllerworkqueue", func() { tick <- now Eventually(retrievedSecondItem).Should(BeClosed()) - Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.depth["test"]).To(Equal(map[int]int{0: 0})) Expect(metrics.adds["test"]).To(Equal(2)) Expect(metrics.retries["test"]).To(Equal(2)) }) @@ -559,7 +559,7 @@ func BenchmarkAddLockContended(b *testing.B) { // - An item is never handed out again before it is returned // - Items in the queue are de-duplicated // - max(existing priority, new priority) is used -func TestFuzzPrioriorityQueue(t *testing.T) { +func TestFuzzPriorityQueue(t *testing.T) { t.Parallel() seed := time.Now().UnixNano() @@ -647,9 +647,12 @@ func TestFuzzPrioriorityQueue(t *testing.T) { } metrics.mu.Lock() - if metrics.depth["test"] < 0 { - t.Errorf("negative depth of %d", metrics.depth["test"]) + for priority, depth := range metrics.depth["test"] { + if depth < 0 { + t.Errorf("negative depth of %d for priority %d:", depth, priority) + } } + metrics.mu.Unlock() handedOut.Insert(item) }() diff --git a/pkg/internal/metrics/workqueue.go b/pkg/internal/metrics/workqueue.go index 86da340af8..9e2fced9f1 100644 --- a/pkg/internal/metrics/workqueue.go +++ b/pkg/internal/metrics/workqueue.go @@ -17,6 +17,8 @@ limitations under the License. package metrics import ( + "strconv" + "github.com/prometheus/client_golang/prometheus" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -42,8 +44,8 @@ var ( depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: WorkQueueSubsystem, Name: DepthKey, - Help: "Current depth of workqueue", - }, []string{"name", "controller"}) + Help: "Current depth of workqueue by workqueue and priority", + }, []string{"name", "controller", "priority"}) adds = prometheus.NewCounterVec(prometheus.CounterOpts{ Subsystem: WorkQueueSubsystem, @@ -103,7 +105,7 @@ func init() { type WorkqueueMetricsProvider struct{} func (WorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { - return depth.WithLabelValues(name, name) + return depth.WithLabelValues(name, name, "") // no priority } func (WorkqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { @@ -129,3 +131,33 @@ func (WorkqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name str func (WorkqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { return retries.WithLabelValues(name, name) } + +type MetricsProviderWithPriority interface { + workqueue.MetricsProvider + + NewDepthMetricWithPriority(name string) DepthMetricWithPriority +} + +// DepthMetricWithPriority represents a depth metric with priority. +type DepthMetricWithPriority interface { + Inc(priority int) + Dec(priority int) +} + +var _ MetricsProviderWithPriority = WorkqueueMetricsProvider{} + +func (WorkqueueMetricsProvider) NewDepthMetricWithPriority(name string) DepthMetricWithPriority { + return &depthWithPriorityMetric{lvs: []string{name, name}} +} + +type depthWithPriorityMetric struct { + lvs []string +} + +func (g *depthWithPriorityMetric) Inc(priority int) { + depth.WithLabelValues(append(g.lvs, strconv.Itoa(priority))...).Inc() +} + +func (g *depthWithPriorityMetric) Dec(priority int) { + depth.WithLabelValues(append(g.lvs, strconv.Itoa(priority))...).Dec() +}