Skip to content

Commit eb11825

Browse files
hweawerCopilot
andauthored
feat: Metrics for writeback queue (#520)
* feat: Metrics for writeback queue * Update lib/persistedretry/manager.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Report metric interval * Remove 0 checks * Update --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent c3d4766 commit eb11825

File tree

2 files changed

+50
-5
lines changed

2 files changed

+50
-5
lines changed

lib/persistedretry/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ type Config struct {
3939
// Exponential backoff configuration for synchronous task execution.
4040
SyncRetryBackoff httputil.ExponentialBackOffConfig `yaml:"sync_retry_backoff"`
4141

42+
// Interval at which workqueue metrics should be emitted.
43+
WorkqueueMetricsEmitInterval time.Duration `yaml:"workqueue_metrics_emit_interval"`
44+
4245
// Flags that zero-value channel sizes should not have defaults applied.
4346
Testing bool
4447
}
@@ -70,6 +73,9 @@ func (c Config) applyDefaults() Config {
7073
MaxRetries: 2, // 2 retries = 3 total attempts (1 initial + 2 retries)
7174
}
7275
}
76+
if c.WorkqueueMetricsEmitInterval == 0 {
77+
c.WorkqueueMetricsEmitInterval = 5 * time.Second
78+
}
7379
if !c.Testing {
7480
if c.IncomingBuffer == 0 {
7581
c.IncomingBuffer = 1000

lib/persistedretry/manager.go

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ func (m *manager) start() error {
114114
m.wg.Add(1)
115115
go m.tickerLoop()
116116

117+
m.wg.Add(1)
118+
go m.reportQueueMetrics()
119+
117120
return nil
118121
}
119122

@@ -122,6 +125,8 @@ func (m *manager) Add(t Task) error {
122125
if m.closed.Load() {
123126
return ErrManagerClosed
124127
}
128+
m.stats.Counter("tasks.added").Inc(1)
129+
125130
ready := t.Ready()
126131
var err error
127132
if ready {
@@ -137,7 +142,7 @@ func (m *manager) Add(t Task) error {
137142
return fmt.Errorf("store: %s", err)
138143
}
139144
if ready {
140-
if err := m.enqueue(t, m.incoming); err != nil {
145+
if err := m.enqueue(t, m.incoming, "incoming"); err != nil {
141146
return fmt.Errorf("enqueue: %s", err)
142147
}
143148
}
@@ -173,12 +178,14 @@ func (m *manager) Find(query interface{}) ([]Task, error) {
173178
return m.store.Find(query)
174179
}
175180

176-
func (m *manager) enqueue(t Task, tasks chan Task) error {
181+
func (m *manager) enqueue(t Task, tasks chan Task, queueName string) error {
182+
queueStats := m.stats.Tagged(map[string]string{"queue": queueName})
177183
select {
178184
case tasks <- t:
185+
queueStats.Gauge("queue.size_on_add").Update(float64(len(tasks)))
179186
default:
180-
// If task queue is full, fallback task to failure state so it can be
181-
// picked up by a retry round.
187+
queueStats.Counter("tasks.dropped.queue_full").Inc(1)
188+
log.Errorf("Task queue full (%s), marking task as failed for later retry", queueName)
182189
if err := m.store.MarkFailed(t); err != nil {
183190
return fmt.Errorf("mark task as failed: %s", err)
184191
}
@@ -190,7 +197,7 @@ func (m *manager) retry(t Task) error {
190197
if err := m.store.MarkPending(t); err != nil {
191198
return fmt.Errorf("mark pending: %s", err)
192199
}
193-
if err := m.enqueue(t, m.retries); err != nil {
200+
if err := m.enqueue(t, m.retries, "retries"); err != nil {
194201
return fmt.Errorf("enqueue: %s", err)
195202
}
196203
return nil
@@ -243,6 +250,38 @@ func (m *manager) pollRetries() {
243250
}
244251
}
245252

253+
func (m *manager) reportQueueMetrics() {
254+
defer m.wg.Done()
255+
ticker := time.NewTicker(m.config.WorkqueueMetricsEmitInterval)
256+
defer ticker.Stop()
257+
258+
for {
259+
select {
260+
case <-ticker.C:
261+
m.reportQueueStats("incoming", m.incoming, m.config.IncomingBuffer)
262+
m.reportQueueStats("retries", m.retries, m.config.RetryBuffer)
263+
m.stats.Gauge("queue.total.size").Update(float64(len(m.incoming) + len(m.retries)))
264+
265+
case <-m.done:
266+
return
267+
}
268+
}
269+
}
270+
271+
func (m *manager) reportQueueStats(name string, tasks chan Task, capacity int) {
272+
queueStats := m.stats.Tagged(map[string]string{"queue": name})
273+
size := len(tasks)
274+
util := float64(size) / float64(capacity) * 100
275+
276+
queueStats.Gauge("queue.size").Update(float64(size))
277+
queueStats.Gauge("queue.utilization_pct").Update(util)
278+
279+
if util > 80 {
280+
log.With("queue", name, "size", size, "capacity", capacity, "utilization_pct", util).
281+
Warn("Writeback queue is near capacity")
282+
}
283+
}
284+
246285
func (m *manager) exec(t Task) error {
247286
if err := m.executor.Exec(t); err != nil {
248287
if err := m.store.MarkFailed(t); err != nil {

0 commit comments

Comments
 (0)