Skip to content

Commit 11efdcd

Browse files
committed
Update collectors for latest mqmetric function
1 parent e617c0c commit 11efdcd

File tree

8 files changed

+164
-50
lines changed

8 files changed

+164
-50
lines changed

cmd/mq_aws/exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ func Collect() error {
511511

512512
collectStopTime := time.Now()
513513
elapsedSecs := int64(collectStopTime.Sub(collectStartTime).Seconds())
514-
log.Infof("Collection time = %d secs", elapsedSecs)
514+
log.Debugf("Collection time = %d secs", elapsedSecs)
515515

516516
return err
517517

cmd/mq_coll/exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ func Collect() error {
395395

396396
collectStopTime := time.Now()
397397
elapsedSecs := int64(collectStopTime.Sub(collectStartTime).Seconds())
398-
log.Infof("Collection time = %d secs", elapsedSecs)
398+
log.Debugf("Collection time = %d secs", elapsedSecs)
399399

400400
return err
401401

cmd/mq_influx/exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ func Collect(c client.Client) error {
489489

490490
collectStopTime := time.Now()
491491
elapsedSecs := int64(collectStopTime.Sub(collectStartTime).Seconds())
492-
log.Infof("Collection time = %d secs", elapsedSecs)
492+
log.Debugf("Collection time = %d secs", elapsedSecs)
493493

494494
return err
495495

cmd/mq_json/exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ func Collect() error {
484484

485485
collectStopTime := time.Now()
486486
elapsedSecs := int64(collectStopTime.Sub(collectStartTime).Seconds())
487-
log.Infof("Collection time = %d secs", elapsedSecs)
487+
log.Debugf("Collection time = %d secs", elapsedSecs)
488488

489489
return err
490490

cmd/mq_opentsdb/exporter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package main
22

33
/*
4-
Copyright (c) IBM Corporation 2016,2020
4+
Copyright (c) IBM Corporation 2016,2021
55
66
Licensed under the Apache License, Version 2.0 (the "License");
77
you may not use this file except in compliance with the License.
@@ -496,7 +496,7 @@ func Collect() error {
496496

497497
collectStopTime := time.Now()
498498
elapsedSecs := int64(collectStopTime.Sub(collectStartTime).Seconds())
499-
log.Infof("Collection time = %d secs", elapsedSecs)
499+
log.Debugf("Collection time = %d secs", elapsedSecs)
500500

501501
return err
502502
}

cmd/mq_prometheus/exporter.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ and update the various Gauges.
2727

2828
import (
2929
"strings"
30-
"sync/atomic"
3130
"time"
3231

3332
"github.com/ibm-messaging/mq-golang/v5/ibmmq"
@@ -96,7 +95,7 @@ func (e *exporter) Describe(ch chan<- *prometheus.Desc) {
9695

9796
// Seeing this twice in succession in the logs is a bit odd. So we'll set things up that
9897
// the invocation is not reported during an Unregister operation.
99-
if atomic.LoadInt32(&st.collectorSilent) == 0 {
98+
if !isCollectorSilent() {
10099
log.Infof("IBMMQ Describe started")
101100
log.Infof("Platform is %s", platformString)
102101
}
@@ -159,7 +158,7 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) {
159158
// If we're not connected, then continue to report a single metric about the qmgr status
160159
// This value is created by the mqmetric package even on z/OS which doesn't support
161160
// the DIS QMSTATUS command.
162-
if atomic.LoadInt32(&st.connectedQMgr) == 0 {
161+
if !isConnectedQMgr() {
163162
log.Infof("Reporting status as disconnected")
164163
if g, ok := qMgrStatusGaugeMap[mqmetric.ATTR_QMGR_STATUS]; ok {
165164
// There's no MQQMSTA_STOPPED value defined .All the regular qmgr status
@@ -197,7 +196,7 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) {
197196
pollStatus = false
198197
thisPoll := time.Now()
199198
elapsed = thisPoll.Sub(lastPoll)
200-
if elapsed >= config.cf.PollIntervalDuration || atomic.LoadInt32(&st.firstCollection) == 0 {
199+
if elapsed >= config.cf.PollIntervalDuration || !isFirstCollection() {
201200
log.Debugf("Polling for object status")
202201
lastPoll = thisPoll
203202
pollStatus = true
@@ -331,17 +330,16 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) {
331330
// quit out of the collector.
332331
switch mqrc {
333332
case ibmmq.MQRC_NONE:
334-
atomic.StoreInt32(&st.connectedQMgr, 1)
333+
setConnectedQMgr(true)
335334
case ibmmq.MQRC_UNEXPECTED_ERROR |
336335
ibmmq.MQRC_STANDBY_Q_MGR |
337336
ibmmq.MQRC_RECONNECT_FAILED:
338-
atomic.StoreInt32(&st.collectorEnd, 1)
339-
atomic.StoreInt32(&st.connectedQMgr, 0)
337+
setCollectorEnd(true)
338+
setConnectedQMgr(false)
340339
default:
341-
atomic.StoreInt32(&st.connectedQMgr, 0)
340+
setConnectedQMgr(false)
342341
}
343342
return
344-
345343
}
346344

347345
thisDiscovery := time.Now()
@@ -361,11 +359,11 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) {
361359
// value fields and maps have been updated.
362360
//
363361
// Now need to set all of the real Gauges with the correct values
364-
if atomic.LoadInt32(&st.firstCollection) == 1 {
362+
if isFirstCollection() {
365363
// Always ignore the first loop through as there might
366364
// be accumulated stuff from a while ago, and lead to
367365
// a misleading range on graphs.
368-
atomic.StoreInt32(&st.firstCollection, 0)
366+
setFirstCollection(false)
369367
} else {
370368

371369
for _, cl := range e.metrics.Classes {
@@ -660,7 +658,7 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) {
660658

661659
collectStopTime := time.Now()
662660
elapsedSecs := int64(collectStopTime.Sub(collectStartTime).Seconds())
663-
log.Infof("Collection time = %d secs", elapsedSecs)
661+
log.Debugf("Collection time = %d secs", elapsedSecs)
664662
if elapsedSecs > defaultScrapeTimeout && !warnedScrapeTimeout {
665663
log.Warnf("Collection time has exceeded Prometheus default scrape_timeout value of %d seconds. Ensure you have set a larger value for this job", defaultScrapeTimeout)
666664
warnedScrapeTimeout = true

cmd/mq_prometheus/main.go

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626

2727
"strings"
2828
"sync"
29-
"sync/atomic"
3029
"time"
3130

3231
"github.com/ibm-messaging/mq-golang/v5/ibmmq"
@@ -37,17 +36,6 @@ import (
3736
log "github.com/sirupsen/logrus"
3837
)
3938

40-
// These are really booleans but I'm using atomic updates
41-
// to avoid potential races (though they'd likely be harmless) between
42-
// the main code and the callbacks
43-
type status struct {
44-
connectedOnce int32
45-
connectedQMgr int32
46-
collectorEnd int32
47-
collectorSilent int32
48-
firstCollection int32
49-
}
50-
5139
var (
5240
BuildStamp string
5341
GitCommit string
@@ -59,7 +47,6 @@ var (
5947
collector prometheus.Collector
6048
mutex sync.RWMutex
6149
retryCount = 0 // Might use this with a maxRetry to force a quit out of collector
62-
st status
6350
)
6451

6552
func main() {
@@ -76,28 +63,34 @@ func main() {
7663
if err != nil {
7764
log.Error(err)
7865
} else {
79-
st.connectedOnce = 0
80-
st.connectedQMgr = 0
81-
st.collectorEnd = 0
82-
st.firstCollection = 0
83-
st.collectorSilent = 0
66+
setConnectedOnce(false)
67+
setConnectedQMgr(false)
68+
setCollectorEnd(false)
69+
setFirstCollection(false)
70+
setCollectorSilent(false)
8471

8572
// Start the webserver in a separate thread
8673
go startServer()
8774

8875
// This is the main loop that tries to keep the collector connected to a queue manager
8976
// even after a failure.
90-
for atomic.LoadInt32(&st.collectorEnd) == 0 {
91-
log.Debugf("In main loop: qMgrConnected=%d", atomic.LoadInt32(&st.connectedQMgr))
77+
for !isCollectorEnd() {
78+
log.Debugf("In main loop: qMgrConnected=%v", isConnectedQMgr())
9279
err = nil // Start clean on each loop
9380

9481
// The callback will set this flag to false if there's an error while
9582
// processing the messages.
96-
if atomic.LoadInt32(&st.connectedQMgr) == 0 {
83+
if !isConnectedQMgr() {
9784
mutex.Lock()
9885
if err == nil {
9986
mqmetric.EndConnection()
100-
// Connect and open standard queues
87+
// Connect and open standard queues. If we're going to manage reconnection from
88+
// this collector, then turn off the MQ client automatic option
89+
if config.keepRunning {
90+
config.cf.CC.SingleConnect = true
91+
} else {
92+
config.cf.CC.SingleConnect = false
93+
}
10194
err = mqmetric.InitConnection(config.cf.QMgrName, config.cf.ReplyQ, &config.cf.CC)
10295
if err == nil {
10396
log.Infoln("Connected to queue manager " + config.cf.QMgrName)
@@ -157,26 +150,26 @@ func main() {
157150
allocateAllGauges()
158151

159152
if collector != nil {
160-
atomic.StoreInt32(&st.collectorSilent, 1)
153+
setCollectorSilent(true)
161154
prometheus.Unregister(collector)
162-
atomic.StoreInt32(&st.collectorSilent, 0)
155+
setCollectorSilent(false)
163156
}
164157

165158
collector = newExporter()
166-
atomic.StoreInt32(&st.firstCollection, 1)
159+
setFirstCollection(true)
167160
prometheus.MustRegister(collector)
168-
atomic.StoreInt32(&st.connectedQMgr, 1)
161+
setConnectedQMgr(true)
169162

170-
if atomic.LoadInt32(&st.connectedOnce) == 0 {
163+
if !isConnectedOnce() {
171164
startChannel <- true
172-
atomic.StoreInt32(&st.connectedOnce, 1)
165+
setConnectedOnce(true)
173166
}
174167
} else {
175-
if atomic.LoadInt32(&st.connectedOnce) == 0 || !config.keepRunning {
168+
if !isConnectedOnce() || !config.keepRunning {
176169
// If we've never successfully connected, then exit instead
177170
// of retrying as it probably means a config error
178171
log.Errorf("Connection to %s has failed. %v", config.cf.QMgrName, err)
179-
atomic.StoreInt32(&st.collectorEnd, 1)
172+
setCollectorEnd(true)
180173
} else {
181174
log.Debug("Sleeping a bit after a failure")
182175
retryCount++
@@ -197,7 +190,6 @@ func main() {
197190
} else {
198191
os.Exit(0)
199192
}
200-
201193
}
202194

203195
func startServer() {
@@ -268,7 +260,7 @@ func stopServer() {
268260
if err != nil {
269261
log.Errorf("Failed to shutdown metrics server: %v", err)
270262
}
271-
atomic.StoreInt32(&st.collectorEnd, 1)
263+
setCollectorEnd(true)
272264
}
273265

274266
/*

cmd/mq_prometheus/status.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package main
2+
3+
/*
4+
Copyright (c) IBM Corporation 2021
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific
16+
17+
Contributors:
18+
Mark Taylor - Initial Contribution
19+
*/
20+
21+
/*
22+
This file manages the state between different threads of the collector. It exposes
23+
the state via getter/setter functions which use atomic read/write operations. We also
24+
map between integer values internally and boolean state which is all we really care about.
25+
*/
26+
27+
import (
28+
"sync/atomic"
29+
)
30+
31+
// These are really booleans but I'm using atomic updates
32+
// to avoid potential races (though they'd likely be harmless) between
33+
// the main code and the callbacks
34+
type status struct {
35+
connectedOnce int32
36+
connectedQMgr int32
37+
collectorEnd int32
38+
collectorSilent int32
39+
firstCollection int32
40+
}
41+
42+
var (
43+
st status
44+
)
45+
46+
func isConnectedQMgr() bool {
47+
b := atomic.LoadInt32(&st.connectedQMgr)
48+
if b == 0 {
49+
return false
50+
}
51+
return true
52+
}
53+
54+
func setConnectedQMgr(b bool) {
55+
if b {
56+
atomic.StoreInt32(&st.connectedQMgr, 1)
57+
} else {
58+
atomic.StoreInt32(&st.connectedQMgr, 0)
59+
}
60+
}
61+
62+
func isCollectorEnd() bool {
63+
b := atomic.LoadInt32(&st.collectorEnd)
64+
if b == 0 {
65+
return false
66+
}
67+
return true
68+
}
69+
70+
func setCollectorEnd(b bool) {
71+
if b {
72+
atomic.StoreInt32(&st.collectorEnd, 1)
73+
} else {
74+
atomic.StoreInt32(&st.collectorEnd, 0)
75+
}
76+
}
77+
78+
func isFirstCollection() bool {
79+
b := atomic.LoadInt32(&st.firstCollection)
80+
if b == 0 {
81+
return false
82+
}
83+
return true
84+
}
85+
86+
func setFirstCollection(b bool) {
87+
if b {
88+
atomic.StoreInt32(&st.firstCollection, 1)
89+
} else {
90+
atomic.StoreInt32(&st.firstCollection, 0)
91+
}
92+
}
93+
94+
func isCollectorSilent() bool {
95+
b := atomic.LoadInt32(&st.collectorSilent)
96+
if b == 0 {
97+
return false
98+
}
99+
return true
100+
}
101+
102+
func setCollectorSilent(b bool) {
103+
if b {
104+
atomic.StoreInt32(&st.collectorSilent, 1)
105+
} else {
106+
atomic.StoreInt32(&st.collectorSilent, 0)
107+
}
108+
}
109+
110+
func setConnectedOnce(b bool) {
111+
if b {
112+
atomic.StoreInt32(&st.connectedOnce, 1)
113+
} else {
114+
atomic.StoreInt32(&st.connectedOnce, 0)
115+
}
116+
}
117+
118+
func isConnectedOnce() bool {
119+
b := atomic.LoadInt32(&st.connectedOnce)
120+
if b == 0 {
121+
return false
122+
}
123+
return true
124+
}

0 commit comments

Comments
 (0)