Skip to content

Commit d368cc2

Browse files
committed
support queue length through policy
#238
1 parent a917fbb commit d368cc2

File tree

3 files changed

+70
-57
lines changed

3 files changed

+70
-57
lines changed

exporter_queue.go

Lines changed: 65 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ var (
3434
"consumer_utilisation": newGaugeVec("queue_consumer_utilisation", "Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.", queueLabels),
3535
"memory": newGaugeVec("queue_memory", "Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.", queueLabels),
3636
"head_message_timestamp": newGaugeVec("queue_head_message_timestamp", "The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.", queueLabels), //https://github.com/rabbitmq/rabbitmq-server/pull/54
37-
"arguments.x-max-length-bytes": newGaugeVec("queue_max_length_bytes", "Total body size for ready messages a queue can contain before it starts to drop them from its head.", queueLabels),
38-
"arguments.x-max-length": newGaugeVec("queue_max_length", "How many (ready) messages a queue can contain before it starts to drop them from its head.", queueLabels),
3937
"garbage_collection.min_heap_size": newGaugeVec("queue_gc_min_heap", "Minimum heap size in words", queueLabels),
4038
"garbage_collection.min_bin_vheap_size": newGaugeVec("queue_gc_min_vheap", "Minimum binary virtual heap size in words", queueLabels),
4139
"garbage_collection.fullsweep_after": newGaugeVec("queue_gc_collections_before_fullsweep", "Maximum generational collections before fullsweep", queueLabels),
@@ -45,6 +43,10 @@ var (
4543
"message_stats.deliver_no_ack_details.rate": newGaugeVec("queue_messages_deliver_no_ack_rate", "Rate at which messages are delivered to consumers that use automatic acknowledgements.", queueLabels),
4644
"message_stats.deliver_details.rate": newGaugeVec("queue_messages_deliver_rate", "Rate at which messages are delivered to consumers that use manual acknowledgements.", queueLabels),
4745
}
46+
limitsGaugeVec = map[string]*prometheus.GaugeVec{
47+
"max-length-bytes": newGaugeVec("queue_max_length_bytes", "Total body size for ready messages a queue can contain before it starts to drop them from its head.", queueLabels),
48+
"max-length": newGaugeVec("queue_max_length", "How many (ready) messages a queue can contain before it starts to drop them from its head.", queueLabels),
49+
}
4850

4951
queueCounterVec = map[string]*prometheus.Desc{
5052
"disk_reads": newDesc("queue_disk_reads_total", "Total number of times messages have been read from disk by this queue since it started.", queueLabels),
@@ -64,6 +66,7 @@ var (
6466
)
6567

6668
type exporterQueue struct {
69+
limitsGauge map[string]*prometheus.GaugeVec
6770
queueMetricsGauge map[string]*prometheus.GaugeVec
6871
queueMetricsCounter map[string]*prometheus.Desc
6972
stateMetric *prometheus.GaugeVec
@@ -73,6 +76,7 @@ type exporterQueue struct {
7376
func newExporterQueue() Exporter {
7477
queueGaugeVecActual := queueGaugeVec
7578
queueCounterVecActual := queueCounterVec
79+
litmitsGaugeVecActual := limitsGaugeVec
7680

7781
if len(config.ExcludeMetrics) > 0 {
7882
for _, metric := range config.ExcludeMetrics {
@@ -82,21 +86,48 @@ func newExporterQueue() Exporter {
8286
if queueCounterVecActual[metric] != nil {
8387
delete(queueCounterVecActual, metric)
8488
}
89+
if litmitsGaugeVecActual[metric] != nil {
90+
delete(litmitsGaugeVecActual, metric)
91+
}
8592
}
8693
}
8794

8895
return exporterQueue{
96+
limitsGauge: litmitsGaugeVecActual,
8997
queueMetricsGauge: queueGaugeVecActual,
9098
queueMetricsCounter: queueCounterVecActual,
9199
stateMetric: newGaugeVec("queue_state", "A metric with a value of constant '1' if the queue is in a certain state", append(queueLabels, "state")),
92100
idleSinceMetric: newGaugeVec("queue_idle_since_seconds", "starttime where the queue switched to idle state; in seconds since epoch (1970).", queueLabels),
93101
}
94102
}
95103

104+
func collectLowerMetric(metricA, metricB string, stats StatsInfo) float64 {
105+
mA, okA := stats.metrics[metricA]
106+
mB, okB := stats.metrics[metricB]
107+
108+
if okA && okB {
109+
if mA < mB {
110+
return mA
111+
} else {
112+
return mB
113+
}
114+
}
115+
if okA {
116+
return mA
117+
}
118+
if okB {
119+
return mB
120+
}
121+
return -1.0
122+
}
123+
96124
func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) error {
97125
for _, gaugevec := range e.queueMetricsGauge {
98126
gaugevec.Reset()
99127
}
128+
for _, m := range e.limitsGauge {
129+
m.Reset()
130+
}
100131
e.stateMetric.Reset()
101132
e.idleSinceMetric.Reset()
102133

@@ -126,40 +157,14 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
126157
}
127158

128159
rabbitMqQueueData, err := getStatsInfo(config, "queues", queueLabelKeys)
129-
130160
if err != nil {
131161
return err
132162
}
133163

134164
log.WithField("queueData", rabbitMqQueueData).Debug("Queue data")
135-
for key, gaugevec := range e.queueMetricsGauge {
136-
for _, queue := range rabbitMqQueueData {
137-
qname := queue.labels["name"]
138-
vname := queue.labels["vhost"]
139-
if value, ok := queue.metrics[key]; ok {
140-
141-
if matchVhost := config.IncludeVHost.MatchString(vname); matchVhost {
142-
if skipVhost := config.SkipVHost.MatchString(vname); !skipVhost {
143-
if matchInclude := config.IncludeQueues.MatchString(qname); matchInclude {
144-
if matchSkip := config.SkipQueues.MatchString(qname); !matchSkip {
145-
self := "0"
146-
if queue.labels["node"] == selfNode {
147-
self = "1"
148-
}
149-
// log.WithFields(log.Fields{"vhost": queue.labels["vhost"], "queue": queue.labels["name"], "key": key, "value": value}).Info("Set queue metric for key")
150-
gaugevec.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self).Set(value)
151-
}
152-
}
153-
}
154-
}
155-
}
156-
}
157-
}
158-
159165
for _, queue := range rabbitMqQueueData {
160166
qname := queue.labels["name"]
161167
vname := queue.labels["vhost"]
162-
163168
if vhostIncluded := config.IncludeVHost.MatchString(vname); !vhostIncluded {
164169
continue
165170
}
@@ -177,50 +182,51 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
177182
if queue.labels["node"] == selfNode {
178183
self = "1"
179184
}
185+
labelValues := []string{cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self}
186+
187+
for key, gaugevec := range e.queueMetricsGauge {
188+
if value, ok := queue.metrics[key]; ok {
189+
// log.WithFields(log.Fields{"vhost": queue.labels["vhost"], "queue": queue.labels["name"], "key": key, "value": value}).Info("Set queue metric for key")
190+
gaugevec.WithLabelValues(labelValues...).Set(value)
191+
}
192+
}
193+
194+
for key, countvec := range e.queueMetricsCounter {
195+
if value, ok := queue.metrics[key]; ok {
196+
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, labelValues...)
197+
} else {
198+
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, labelValues...)
199+
}
200+
}
180201

202+
state := queue.labels["state"]
181203
idleSince, exists := queue.labels["idle_since"]
182204
if exists && idleSince != "" {
183205
if t, err := time.Parse("2006-01-02 15:04:05", idleSince); err == nil {
184206
unixSeconds := float64(t.UnixNano()) / 1e9
185-
state := queue.labels["state"]
207+
186208
if state == "running" { //replace running state with idle if idle_since time is provided. Other states (flow, etc.) are not replaced
187209
state = "idle"
188210
}
189-
e.idleSinceMetric.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self).Set(unixSeconds)
190-
e.stateMetric.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self, state).Set(1)
211+
e.idleSinceMetric.WithLabelValues(labelValues...).Set(unixSeconds)
191212
} else {
192213
log.WithError(err).WithField("idle_since", idleSince).Warn("error parsing idle since time")
193214
}
194-
} else {
195-
e.stateMetric.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self, queue.labels["state"]).Set(1)
196215
}
197-
}
216+
e.stateMetric.WithLabelValues(append(labelValues, state)...).Set(1)
198217

199-
for key, countvec := range e.queueMetricsCounter {
200-
for _, queue := range rabbitMqQueueData {
201-
qname := queue.labels["name"]
202-
vname := queue.labels["vhost"]
203-
204-
if matchVhost := config.IncludeVHost.MatchString(vname); matchVhost {
205-
if skipVhost := config.SkipVHost.MatchString(vname); !skipVhost {
206-
if matchInclude := config.IncludeQueues.MatchString(qname); matchInclude {
207-
if matchSkip := config.SkipQueues.MatchString(qname); !matchSkip {
208-
self := "0"
209-
if queue.labels["node"] == selfNode {
210-
self = "1"
211-
}
212-
if value, ok := queue.metrics[key]; ok {
213-
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self)
214-
} else {
215-
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self)
216-
}
217-
}
218-
}
219-
}
220-
}
218+
if f := collectLowerMetric("arguments.x-max-length", "effective_policy_definition.max-length", queue); f >= 0 {
219+
limitsGaugeVec["max-length"].WithLabelValues(labelValues...).Set(f)
221220
}
221+
if f := collectLowerMetric("arguments.x-max-length-bytes", "effective_policy_definition.max-length-bytes", queue); f >= 0 {
222+
limitsGaugeVec["max-length-bytes"].WithLabelValues(labelValues...).Set(f)
223+
}
224+
222225
}
223226

227+
for _, metric := range e.limitsGauge {
228+
metric.Collect(ch)
229+
}
224230
for _, gaugevec := range e.queueMetricsGauge {
225231
gaugevec.Collect(ch)
226232
}
@@ -231,6 +237,9 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
231237
}
232238

233239
func (e exporterQueue) Describe(ch chan<- *prometheus.Desc) {
240+
for _, metric := range e.limitsGauge {
241+
metric.Describe(ch)
242+
}
234243
for _, gaugevec := range e.queueMetricsGauge {
235244
gaugevec.Describe(ch)
236245
}

exporter_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ func TestWholeApp(t *testing.T) {
8787
}
8888
body := w.Body.String()
8989
t.Log(body)
90+
lines := strings.Split(body, "\n")
91+
if lc := len(lines); lc != 372 {
92+
t.Errorf("expected 372 lines, got %d", lc)
93+
}
9094
expectSubstring(t, body, `rabbitmq_up{cluster="my-rabbit@ae74c041248b",node="my-rabbit@ae74c041248b"} 1`)
9195

9296
// overview

testenv/testenv.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func (tenv *TestEnvironment) MustSetPolicy(name string, pattern string) {
137137
client := &http.Client{}
138138
request, err := http.NewRequest("PUT", url, strings.NewReader(policy))
139139
if err != nil {
140-
log.Fatal(fmt.Errorf("could not create NewRequst: %w", err))
140+
log.Fatal(fmt.Errorf("could not create NewRequest: %w", err))
141141
}
142142
request.Header.Add("Content-Type", "application/json")
143143
request.ContentLength = int64(len(policy))

0 commit comments

Comments
 (0)