Skip to content

Commit 828825b

Browse files
author
Armin
committed
add metrics tracking for RabbitMQ publishing and processing
1 parent 4f3f873 commit 828825b

File tree

2 files changed

+24
-5
lines changed

2 files changed

+24
-5
lines changed

pkg/metrics/worker.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ func WorkerObserver(queue string, handler func(context.Context) error) func(cont
4141
return err
4242
}
4343
}
44+
45+
func WorkerProcessed(queue, result string) {
46+
workerProcessed.WithLabelValues(queue, result).Inc()
47+
}

pkg/queue/rabbitmq.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"log/slog"
77
"time"
88

9+
"sms-gateway/pkg/metrics"
10+
911
"github.com/google/uuid"
1012
"github.com/rabbitmq/amqp091-go"
1113
)
@@ -47,10 +49,14 @@ func (rp *RabbitConnection) PublishContext(ctx context.Context, req PublishReque
4749
}
4850
}()
4951

50-
if err = ch.PublishWithContext(ctx, req.Exchange, req.Key, false, false, amqp091.Publishing{
51-
Timestamp: time.Now(),
52-
Body: req.Msg,
53-
}); err != nil {
52+
publishFn := metrics.OperatorObserver("rabbit_publish", func(c context.Context) error {
53+
return ch.PublishWithContext(c, req.Exchange, req.Key, false, false, amqp091.Publishing{
54+
Timestamp: time.Now(),
55+
Body: req.Msg,
56+
})
57+
})
58+
59+
if err = publishFn(ctx); err != nil {
5460
return err
5561
}
5662
return nil
@@ -99,7 +105,16 @@ func (rp *RabbitConnection) ConsumeContext(ctx context.Context, appName string,
99105
return nil, err
100106
}
101107

102-
return delivery, nil
108+
wrapped := make(chan amqp091.Delivery)
109+
go func() {
110+
defer close(wrapped)
111+
for d := range delivery {
112+
metrics.WorkerProcessed(queueName, "received")
113+
wrapped <- d
114+
}
115+
}()
116+
117+
return wrapped, nil
103118
}
104119

105120
func (rp *RabbitConnection) Close() error {

0 commit comments

Comments
 (0)