Skip to content

Commit 5b2fc27

Browse files
committed
Emit metrics with the exact message priority
Not just high/low
1 parent 06bd51d commit 5b2fc27

File tree

5 files changed

+50
-63
lines changed

5 files changed

+50
-63
lines changed

dashboard/OMQ-Grafana.json

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -464,34 +464,16 @@
464464
},
465465
"disableTextWrap": false,
466466
"editorMode": "code",
467-
"expr": "sum by (instance, priority) (irate(omq_messages_consumed_total{priority=\"normal\"}[$__rate_interval]))",
467+
"expr": "sum by (instance, priority) (irate(omq_messages_consumed_total[$__rate_interval]))",
468468
"fullMetaSearch": false,
469469
"hide": false,
470470
"includeNullMetadata": true,
471471
"instant": false,
472472
"interval": "1s",
473-
"legendFormat": "Consumed normal priority ({{instance}})",
473+
"legendFormat": "Consumed priority {{priority}} ({{instance}})",
474474
"range": true,
475475
"refId": "B",
476476
"useBackend": false
477-
},
478-
{
479-
"datasource": {
480-
"type": "prometheus",
481-
"uid": "${DS_PROMETHEUS}"
482-
},
483-
"disableTextWrap": false,
484-
"editorMode": "code",
485-
"expr": "sum by (instance, priority) (irate(omq_messages_consumed_total{priority=\"high\"}[$__rate_interval]))",
486-
"fullMetaSearch": false,
487-
"hide": false,
488-
"includeNullMetadata": true,
489-
"instant": false,
490-
"interval": "1s",
491-
"legendFormat": "Consumed high priority ({{instance}})",
492-
"range": true,
493-
"refId": "A",
494-
"useBackend": false
495477
}
496478
],
497479
"title": "Message Consumption by Priority (msgs/s)",

main_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ var _ = Describe("OMQ CLI", func() {
157157

158158
Eventually(session.Err).Should(gbytes.Say(`TOTAL PUBLISHED messages=1`))
159159
Eventually(session.Err).Should(gbytes.Say(`TOTAL CONSUMED messages=1`))
160-
Eventually(session).Should(gbytes.Say(`omq_messages_consumed_total{priority="high"} 1`))
160+
Eventually(session).Should(gbytes.Say(`omq_messages_consumed_total{priority="13"} 1`))
161161
},
162162
Entry("amqp -> amqp", "amqp", "/queues/", "amqp", "/queues/"),
163163
Entry("stomp -> amqp", "stomp", "/topic/", "amqp", "/queues/"),
@@ -351,7 +351,7 @@ var _ = Describe("OMQ CLI", func() {
351351
Eventually(session).WithTimeout(5 * time.Second).Should(gexec.Exit(0))
352352
output, _ := io.ReadAll(session.Out)
353353
buf := bytes.NewReader(output)
354-
Expect(metricValue(buf, `omq_messages_consumed_total{priority="normal"}`)).Should(Equal(1.0))
354+
Expect(metricValue(buf, `omq_messages_consumed_total{priority="0"}`)).Should(Equal(1.0))
355355
buf.Reset(output)
356356
Expect(metricValue(buf, `omq_end_to_end_latency_seconds{quantile="0.99"}`)).Should(BeNumerically(">", 2))
357357
})
@@ -428,7 +428,7 @@ var _ = Describe("OMQ CLI", func() {
428428

429429
output, _ := io.ReadAll(session.Out)
430430
buf := bytes.NewReader(output)
431-
Expect(metricValue(buf, `omq_messages_consumed_total{priority="normal"}`)).Should((BeNumerically(">", 0.0)))
431+
Expect(metricValue(buf, `omq_messages_consumed_total{priority="0"}`)).Should((BeNumerically(">", 0.0)))
432432
buf.Reset(output)
433433
Expect(metricValue(buf, `omq_messages_published_total`)).Should((BeNumerically(">", 0.0)))
434434
},
@@ -613,7 +613,7 @@ var _ = Describe("OMQ CLI", func() {
613613
buf := bytes.NewReader(output)
614614
Expect(metricValue(buf, `omq_messages_published_total`)).Should(Equal(1.0))
615615
buf.Reset(output)
616-
Expect(metricValue(buf, `omq_messages_consumed_total{priority="normal"}`)).Should(Equal(1.0))
616+
Expect(metricValue(buf, `omq_messages_consumed_total{priority="0"}`)).Should(Equal(1.0))
617617
buf.Reset(output)
618618
Expect(metricValue(buf, `omq_messages_returned_total`)).Should(Equal(0.0))
619619
})

pkg/metrics/metrics.go

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,16 @@ var once sync.Once
3636
var metricsServer *MetricsServer
3737

3838
var (
39-
CommandLineArgs *vmetrics.Gauge
40-
MessagesPublished *vmetrics.Counter
41-
MessagesConfirmed *vmetrics.Counter
42-
MessagesReturned *vmetrics.Counter
43-
MessagesConsumedNormalPriority *vmetrics.Counter
44-
MessagesConsumedHighPriority *vmetrics.Counter
45-
MessagesConsumedOutOfOrderNormalPriority *vmetrics.Counter
46-
MessagesConsumedOutOfOrderHighPriority *vmetrics.Counter
47-
MessagesDeliveredTooEarly *vmetrics.Counter
48-
PublishingLatency *vmetrics.Summary
49-
EndToEndLatency *vmetrics.Summary
50-
DelayAccuracy *vmetrics.Summary
39+
CommandLineArgs *vmetrics.Gauge
40+
MessagesPublished *vmetrics.Counter
41+
MessagesConfirmed *vmetrics.Counter
42+
MessagesReturned *vmetrics.Counter
43+
messagesConsumedCounters sync.Map
44+
MessagesDeliveredTooEarly *vmetrics.Counter
45+
PublishingLatency *vmetrics.Summary
46+
EndToEndLatency *vmetrics.Summary
47+
DelayAccuracy *vmetrics.Summary
48+
globalLabels map[string]string
5149
)
5250

5351
func startServer() {
@@ -96,21 +94,12 @@ func Start(ctx context.Context, cfg config.Config) *MetricsServer {
9694
return metricsServer
9795
}
9896

99-
func registerMetrics(globalLabels map[string]string) {
100-
normal := map[string]string{"priority": "normal"}
101-
maps.Copy(normal, globalLabels)
102-
normalPriorityLabels := labelsToString(normal)
103-
high := map[string]string{"priority": "high"}
104-
maps.Copy(high, globalLabels)
105-
highPriorityLabels := labelsToString(high)
97+
func registerMetrics(labels map[string]string) {
98+
globalLabels = labels
10699

107100
MessagesPublished = vmetrics.GetOrCreateCounter("omq_messages_published_total" + labelsToString(globalLabels))
108101
MessagesConfirmed = vmetrics.GetOrCreateCounter("omq_messages_confirmed_total" + labelsToString(globalLabels))
109102
MessagesReturned = vmetrics.GetOrCreateCounter("omq_messages_returned_total" + labelsToString(globalLabels))
110-
MessagesConsumedNormalPriority = vmetrics.GetOrCreateCounter(`omq_messages_consumed_total` + normalPriorityLabels)
111-
MessagesConsumedHighPriority = vmetrics.GetOrCreateCounter(`omq_messages_consumed_total` + highPriorityLabels)
112-
MessagesConsumedOutOfOrderNormalPriority = vmetrics.GetOrCreateCounter(`omq_messages_consumed_out_of_order` + normalPriorityLabels)
113-
MessagesConsumedOutOfOrderHighPriority = vmetrics.GetOrCreateCounter(`omq_messages_consumed_out_of_order` + highPriorityLabels)
114103
MessagesDeliveredTooEarly = vmetrics.GetOrCreateCounter("omq_early_messages_total" + labelsToString(globalLabels))
115104
PublishingLatency = vmetrics.GetOrCreateSummaryExt(`omq_publishing_latency_seconds`+labelsToString(globalLabels), 1*time.Second, []float64{0.5, 0.9, 0.95, 0.99})
116105
EndToEndLatency = vmetrics.GetOrCreateSummaryExt(`omq_end_to_end_latency_seconds`+labelsToString(globalLabels), 1*time.Second, []float64{0.5, 0.9, 0.95, 0.99})
@@ -146,19 +135,22 @@ func registerCommandLineMetric(cfg config.Config, globalLabels map[string]string
146135
}
147136

148137
func MessagesConsumedMetric(priority int) *vmetrics.Counter {
149-
// we assume AMQP-1.0 priority definition
150-
if priority > 4 {
151-
return MessagesConsumedHighPriority
138+
if counter, ok := messagesConsumedCounters.Load(priority); ok {
139+
return counter.(*vmetrics.Counter)
152140
}
153-
return MessagesConsumedNormalPriority
141+
142+
labels := map[string]string{"priority": strconv.Itoa(priority)}
143+
maps.Copy(labels, globalLabels)
144+
counter := vmetrics.GetOrCreateCounter(`omq_messages_consumed_total` + labelsToString(labels))
145+
146+
actual, _ := messagesConsumedCounters.LoadOrStore(priority, counter)
147+
return actual.(*vmetrics.Counter)
154148
}
155149

156150
func MessagesConsumedOutOfOrderMetric(priority int) *vmetrics.Counter {
157-
// we assume AMQP-1.0 priority definition
158-
if priority > 4 {
159-
return MessagesConsumedOutOfOrderHighPriority
160-
}
161-
return MessagesConsumedOutOfOrderNormalPriority
151+
labels := map[string]string{"priority": strconv.Itoa(priority)}
152+
maps.Copy(labels, globalLabels)
153+
return vmetrics.GetOrCreateCounter(`omq_messages_consumed_out_of_order` + labelsToString(labels))
162154
}
163155

164156
var (
@@ -174,7 +166,7 @@ func (m *MetricsServer) printMessageRates(ctx context.Context) {
174166
return
175167
case <-time.After(1 * time.Second):
176168
published := MessagesPublished.Get()
177-
consumed := MessagesConsumedNormalPriority.Get() + MessagesConsumedHighPriority.Get()
169+
consumed := getTotalConsumed()
178170

179171
if published > 0 || consumed > 0 {
180172
log.Print("",
@@ -206,9 +198,22 @@ func (m *MetricsServer) PrintSummary() {
206198
"confirmed", MessagesConfirmed.Get(),
207199
"returned", MessagesReturned.Get(),
208200
"rate", fmt.Sprintf("%.2f/s", float64(MessagesPublished.Get())/time.Since(m.started).Seconds()))
201+
consumed := getTotalConsumed()
209202
log.Print("TOTAL CONSUMED",
210-
"messages", MessagesConsumedNormalPriority.Get()+MessagesConsumedHighPriority.Get(),
211-
"rate", fmt.Sprintf("%.2f/s", float64(MessagesConsumedNormalPriority.Get()+MessagesConsumedHighPriority.Get())/time.Since(m.started).Seconds()))
203+
"messages", consumed,
204+
"rate", fmt.Sprintf("%.2f/s", float64(consumed)/time.Since(m.started).Seconds()))
205+
}
206+
207+
func getTotalConsumed() uint64 {
208+
var total uint64
209+
210+
messagesConsumedCounters.Range(func(key, value interface{}) bool {
211+
counter := value.(*vmetrics.Counter)
212+
total += counter.Get()
213+
return true
214+
})
215+
216+
return total
212217
}
213218

214219
func (m MetricsServer) PrintAll() {

pkg/mqtt/consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ func (c MqttConsumer) Start(cosumerReady chan bool) {
3838
handler := func(client mqtt.Client, msg mqtt.Message) {
3939
payload := msg.Payload()
4040
handleMessage(payload)
41-
metrics.MessagesConsumedNormalPriority.Inc()
41+
metrics.MessagesConsumedMetric(0).Inc()
4242
timeSent, latency := utils.CalculateEndToEndLatency(&payload)
4343
metrics.EndToEndLatency.UpdateDuration(timeSent)
4444

4545
if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) {
46-
metrics.MessagesConsumedOutOfOrderNormalPriority.Inc()
46+
metrics.MessagesConsumedOutOfOrderMetric(0).Inc()
4747
log.Info("out of order message received. This message was sent before the previous message", "this messsage", timeSent, "previous message", previousMessageTimeSent)
4848
}
4949
previousMessageTimeSent = timeSent

pkg/mqtt/consumer_v5.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ func (c Mqtt5Consumer) Start(consumerReady chan bool) {
3737
previousMessageTimeSent := time.Unix(0, 0)
3838

3939
handler := func(rcv paho.PublishReceived) (bool, error) {
40-
metrics.MessagesConsumedNormalPriority.Inc()
40+
metrics.MessagesConsumedMetric(0).Inc()
4141
payload := rcv.Packet.Payload
4242
timeSent, latency := utils.CalculateEndToEndLatency(&payload)
4343
metrics.EndToEndLatency.UpdateDuration(timeSent)
4444

4545
if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) {
46-
metrics.MessagesConsumedOutOfOrderNormalPriority.Inc()
46+
metrics.MessagesConsumedOutOfOrderMetric(0).Inc()
4747
log.Info("out of order message received. This message was sent before the previous message", "this messsage", timeSent, "previous message", previousMessageTimeSent)
4848
}
4949
previousMessageTimeSent = timeSent

0 commit comments

Comments
 (0)