Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions webhook/resource_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,25 @@ func (r *resourceQueue) Stop(force bool) {
}
}

func (r *resourceQueue) Enqueue(ctx context.Context, whEvent *livekit.WebhookEvent, params *ResourceURLNotifierParams) error {
func (r *resourceQueue) Enqueue(ctx context.Context, whEvent *livekit.WebhookEvent, params *ResourceURLNotifierParams) (int, error) {
return r.EnqueueAt(ctx, time.Now(), whEvent, params)
}

func (r *resourceQueue) EnqueueAt(ctx context.Context, at time.Time, whEvent *livekit.WebhookEvent, params *ResourceURLNotifierParams) error {
func (r *resourceQueue) EnqueueAt(ctx context.Context, at time.Time, whEvent *livekit.WebhookEvent, params *ResourceURLNotifierParams) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()

if r.closed {
return errQueueClosed
return r.items.Len(), errQueueClosed
}

if r.items.Len() >= r.params.MaxDepth {
return errQueueFull
return r.items.Len(), errQueueFull
}

r.items.PushBack(&item{ctx, at, whEvent, params})
r.cond.Broadcast()
return nil
return r.items.Len(), nil
}

func (r *resourceQueue) worker() {
Expand Down
8 changes: 7 additions & 1 deletion webhook/resource_url_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,12 @@ func (r *ResourceURLNotifier) QueueNotify(ctx context.Context, event *livekit.We
}
r.mu.Unlock()

err := rqi.resourceQueue.Enqueue(ctx, event, &params)
qLen, err := rqi.resourceQueue.Enqueue(ctx, event, &params)
if err != nil {
fields := logFields(event, params.URL)
fields = append(fields, "reason", err)
params.Logger.Infow("dropped webhook", fields...)
IncDispatchDrop(err.Error())

if ph := r.getProcessedHook(); ph != nil {
whi := webhookInfo(
Expand All @@ -233,6 +234,8 @@ func (r *ResourceURLNotifier) QueueNotify(ctx context.Context, event *livekit.We
}
ph(ctx, whi)
}
} else {
RecordQueueLength(qLen)
}
return err
}
Expand Down Expand Up @@ -260,6 +263,7 @@ func (r *ResourceURLNotifier) Process(ctx context.Context, queuedAt time.Time, e
if queueDuration > params.Config.MaxAge {
fields = append(fields, "reason", "age")
params.Logger.Infow("dropped webhook", fields...)
IncDispatchDrop("age")

if ph := r.getProcessedHook(); ph != nil {
whi := webhookInfo(
Expand All @@ -286,8 +290,10 @@ func (r *ResourceURLNotifier) Process(ctx context.Context, queuedAt time.Time, e
fields = append(fields, "sendDuration", sendDuration)
if err != nil {
params.Logger.Warnw("failed to send webhook", err, fields...)
IncDispatchFailure()
} else {
params.Logger.Infow("sent webhook", fields...)
IncDispatchSuccess()
}
if ph := r.getProcessedHook(); ph != nil {
whi := webhookInfo(
Expand Down
67 changes: 67 additions & 0 deletions webhook/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package webhook

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

var (
promWebhookDispatchTotal *prometheus.CounterVec
promWebhookQueueLengthHistogram prometheus.Histogram

promWebhookInitOnce sync.Once
)

func InitWebhookStats(constLabels prometheus.Labels) {
promWebhookInitOnce.Do(func() { initWebhookStats(constLabels) })
}

func initWebhookStats(constLabels prometheus.Labels) {
promWebhookDispatchTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "livekit",
Subsystem: "webhook",
Name: "dispatch_total",
ConstLabels: constLabels,
}, []string{"status", "reason"})
prometheus.MustRegister(promWebhookDispatchTotal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

promauto.New* creates and registers metrics with the default registry in the same function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nice! will do


promWebhookQueueLengthHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "livekit",
Subsystem: "webhook",
Name: "queue_length",
ConstLabels: constLabels,
Buckets: prometheus.ExponentialBucketsRange(1, 100, 4),
})
prometheus.MustRegister(promWebhookQueueLengthHistogram)
}

func IncDispatchSuccess() {
promWebhookDispatchTotal.WithLabelValues("success", "").Inc()
}

func IncDispatchFailure() {
promWebhookDispatchTotal.WithLabelValues("failure", "").Inc()
}

func IncDispatchDrop(reason string) {
promWebhookDispatchTotal.WithLabelValues("drop", reason).Inc()
}

func RecordQueueLength(queueLength int) {
promWebhookQueueLengthHistogram.Observe(float64(queueLength))
}
3 changes: 3 additions & 0 deletions webhook/url_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ func (n *URLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEve
if err != nil {
params.Logger.Warnw("failed to send webhook", err, fields...)
n.dropped.Add(event.NumDropped + 1)
IncDispatchFailure()
} else {
params.Logger.Infow("sent webhook", fields...)
IncDispatchSuccess()
}
if ph := n.getProcessedHook(); ph != nil {
whi := webhookInfo(
Expand All @@ -207,6 +209,7 @@ func (n *URLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEve

fields := logFields(event, params.URL)
params.Logger.Infow("dropped webhook", fields...)
IncDispatchDrop("overflow")

if ph := n.getProcessedHook(); ph != nil {
whi := webhookInfo(
Expand Down
10 changes: 9 additions & 1 deletion webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

Expand All @@ -43,6 +44,8 @@ var authProvider = auth.NewSimpleKeyProvider(
)

func TestWebHook(t *testing.T) {
InitWebhookStats(prometheus.Labels{})

s := newServer(testAddr)
require.NoError(t, s.Start())
defer s.Stop()
Expand Down Expand Up @@ -81,10 +84,11 @@ func TestWebHook(t *testing.T) {
wg.Wait()

})

}

func TestURLNotifierDropped(t *testing.T) {
InitWebhookStats(prometheus.Labels{})

s := newServer(testAddr)
require.NoError(t, s.Start())
defer s.Stop()
Expand Down Expand Up @@ -114,6 +118,8 @@ func TestURLNotifierDropped(t *testing.T) {
}

func TestURLNotifierLifecycle(t *testing.T) {
InitWebhookStats(prometheus.Labels{})

s := newServer(testAddr)
require.NoError(t, s.Start())
defer s.Stop()
Expand Down Expand Up @@ -209,6 +215,8 @@ func TestURLNotifierLifecycle(t *testing.T) {
}

func TestURLNotifierFilter(t *testing.T) {
InitWebhookStats(prometheus.Labels{})

s := newServer(testAddr)
require.NoError(t, s.Start())
defer s.Stop()
Expand Down
Loading