Skip to content

Commit 2227c44

Browse files
authored
Add prom metrics for webhook dispatch. (#1095)
* Add prom metrics for webhook dispatch. 1. success/failure/drop counter. 2. queue length histogram. - doing this only for resource_url_notifier which does a per room queue as queue length is easily available. This is the one currently in use. - worker pool based notifier will need a change in `core` package to return a queue length when enqueuing. * promauto goodness
1 parent c8f416a commit 2227c44

File tree

5 files changed

+90
-7
lines changed

5 files changed

+90
-7
lines changed

webhook/resource_queue.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,25 +77,25 @@ func (r *resourceQueue) Stop(force bool) {
7777
}
7878
}
7979

80-
func (r *resourceQueue) Enqueue(ctx context.Context, whEvent *livekit.WebhookEvent, params *ResourceURLNotifierParams) error {
80+
func (r *resourceQueue) Enqueue(ctx context.Context, whEvent *livekit.WebhookEvent, params *ResourceURLNotifierParams) (int, error) {
8181
return r.EnqueueAt(ctx, time.Now(), whEvent, params)
8282
}
8383

84-
func (r *resourceQueue) EnqueueAt(ctx context.Context, at time.Time, whEvent *livekit.WebhookEvent, params *ResourceURLNotifierParams) error {
84+
func (r *resourceQueue) EnqueueAt(ctx context.Context, at time.Time, whEvent *livekit.WebhookEvent, params *ResourceURLNotifierParams) (int, error) {
8585
r.mu.Lock()
8686
defer r.mu.Unlock()
8787

8888
if r.closed {
89-
return errQueueClosed
89+
return r.items.Len(), errQueueClosed
9090
}
9191

9292
if r.items.Len() >= r.params.MaxDepth {
93-
return errQueueFull
93+
return r.items.Len(), errQueueFull
9494
}
9595

9696
r.items.PushBack(&item{ctx, at, whEvent, params})
9797
r.cond.Broadcast()
98-
return nil
98+
return r.items.Len(), nil
9999
}
100100

101101
func (r *resourceQueue) worker() {

webhook/resource_url_notifier.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,11 +211,12 @@ func (r *ResourceURLNotifier) QueueNotify(ctx context.Context, event *livekit.We
211211
}
212212
r.mu.Unlock()
213213

214-
err := rqi.resourceQueue.Enqueue(ctx, event, &params)
214+
qLen, err := rqi.resourceQueue.Enqueue(ctx, event, &params)
215215
if err != nil {
216216
fields := logFields(event, params.URL)
217217
fields = append(fields, "reason", err)
218218
params.Logger.Infow("dropped webhook", fields...)
219+
IncDispatchDrop(err.Error())
219220

220221
if ph := r.getProcessedHook(); ph != nil {
221222
whi := webhookInfo(
@@ -233,6 +234,8 @@ func (r *ResourceURLNotifier) QueueNotify(ctx context.Context, event *livekit.We
233234
}
234235
ph(ctx, whi)
235236
}
237+
} else {
238+
RecordQueueLength(qLen)
236239
}
237240
return err
238241
}
@@ -260,6 +263,7 @@ func (r *ResourceURLNotifier) Process(ctx context.Context, queuedAt time.Time, e
260263
if queueDuration > params.Config.MaxAge {
261264
fields = append(fields, "reason", "age")
262265
params.Logger.Infow("dropped webhook", fields...)
266+
IncDispatchDrop("age")
263267

264268
if ph := r.getProcessedHook(); ph != nil {
265269
whi := webhookInfo(
@@ -286,8 +290,10 @@ func (r *ResourceURLNotifier) Process(ctx context.Context, queuedAt time.Time, e
286290
fields = append(fields, "sendDuration", sendDuration)
287291
if err != nil {
288292
params.Logger.Warnw("failed to send webhook", err, fields...)
293+
IncDispatchFailure()
289294
} else {
290295
params.Logger.Infow("sent webhook", fields...)
296+
IncDispatchSuccess()
291297
}
292298
if ph := r.getProcessedHook(); ph != nil {
293299
whi := webhookInfo(

webhook/stats.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright 2023 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package webhook
16+
17+
import (
18+
"sync"
19+
20+
"github.com/prometheus/client_golang/prometheus"
21+
"github.com/prometheus/client_golang/prometheus/promauto"
22+
)
23+
24+
var (
25+
promWebhookDispatchTotal *prometheus.CounterVec
26+
promWebhookQueueLengthHistogram prometheus.Histogram
27+
28+
promWebhookInitOnce sync.Once
29+
)
30+
31+
func InitWebhookStats(constLabels prometheus.Labels) {
32+
promWebhookInitOnce.Do(func() { initWebhookStats(constLabels) })
33+
}
34+
35+
func initWebhookStats(constLabels prometheus.Labels) {
36+
promWebhookDispatchTotal = promauto.NewCounterVec(prometheus.CounterOpts{
37+
Namespace: "livekit",
38+
Subsystem: "webhook",
39+
Name: "dispatch_total",
40+
ConstLabels: constLabels,
41+
}, []string{"status", "reason"})
42+
43+
promWebhookQueueLengthHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
44+
Namespace: "livekit",
45+
Subsystem: "webhook",
46+
Name: "queue_length",
47+
ConstLabels: constLabels,
48+
Buckets: prometheus.ExponentialBucketsRange(1, 100, 4),
49+
})
50+
}
51+
52+
func IncDispatchSuccess() {
53+
promWebhookDispatchTotal.WithLabelValues("success", "").Inc()
54+
}
55+
56+
func IncDispatchFailure() {
57+
promWebhookDispatchTotal.WithLabelValues("failure", "").Inc()
58+
}
59+
60+
func IncDispatchDrop(reason string) {
61+
promWebhookDispatchTotal.WithLabelValues("drop", reason).Inc()
62+
}
63+
64+
func RecordQueueLength(queueLength int) {
65+
promWebhookQueueLengthHistogram.Observe(float64(queueLength))
66+
}

webhook/url_notifier.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,10 @@ func (n *URLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEve
183183
if err != nil {
184184
params.Logger.Warnw("failed to send webhook", err, fields...)
185185
n.dropped.Add(event.NumDropped + 1)
186+
IncDispatchFailure()
186187
} else {
187188
params.Logger.Infow("sent webhook", fields...)
189+
IncDispatchSuccess()
188190
}
189191
if ph := n.getProcessedHook(); ph != nil {
190192
whi := webhookInfo(
@@ -207,6 +209,7 @@ func (n *URLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEve
207209

208210
fields := logFields(event, params.URL)
209211
params.Logger.Infow("dropped webhook", fields...)
212+
IncDispatchDrop("overflow")
210213

211214
if ph := n.getProcessedHook(); ph != nil {
212215
whi := webhookInfo(

webhook/webhook_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"testing"
2424
"time"
2525

26+
"github.com/prometheus/client_golang/prometheus"
2627
"github.com/stretchr/testify/require"
2728
"go.uber.org/atomic"
2829

@@ -43,6 +44,8 @@ var authProvider = auth.NewSimpleKeyProvider(
4344
)
4445

4546
func TestWebHook(t *testing.T) {
47+
InitWebhookStats(prometheus.Labels{})
48+
4649
s := newServer(testAddr)
4750
require.NoError(t, s.Start())
4851
defer s.Stop()
@@ -81,10 +84,11 @@ func TestWebHook(t *testing.T) {
8184
wg.Wait()
8285

8386
})
84-
8587
}
8688

8789
func TestURLNotifierDropped(t *testing.T) {
90+
InitWebhookStats(prometheus.Labels{})
91+
8892
s := newServer(testAddr)
8993
require.NoError(t, s.Start())
9094
defer s.Stop()
@@ -114,6 +118,8 @@ func TestURLNotifierDropped(t *testing.T) {
114118
}
115119

116120
func TestURLNotifierLifecycle(t *testing.T) {
121+
InitWebhookStats(prometheus.Labels{})
122+
117123
s := newServer(testAddr)
118124
require.NoError(t, s.Start())
119125
defer s.Stop()
@@ -209,6 +215,8 @@ func TestURLNotifierLifecycle(t *testing.T) {
209215
}
210216

211217
func TestURLNotifierFilter(t *testing.T) {
218+
InitWebhookStats(prometheus.Labels{})
219+
212220
s := newServer(testAddr)
213221
require.NoError(t, s.Start())
214222
defer s.Stop()

0 commit comments

Comments
 (0)