@@ -20,62 +20,137 @@ import (
20
20
"io/ioutil"
21
21
"log"
22
22
"os/exec"
23
+ "strconv"
23
24
"strings"
24
25
)
25
26
27
+ type NNVal map [string ]map [string ]map [string ]float64
28
+ type NVal map [string ]map [string ]float64
29
+
26
30
type QueueMetrics struct {
27
- pending float64
28
- pending_dep float64
29
- running float64
30
- suspended float64
31
- cancelled float64
32
- completing float64
33
- completed float64
34
- configuring float64
35
- failed float64
36
- timeout float64
37
- preempted float64
38
- node_fail float64
31
+ pending NNVal
32
+ running NVal
33
+ suspended NVal
34
+ cancelled NVal
35
+ completing NVal
36
+ completed NVal
37
+ configuring NVal
38
+ failed NVal
39
+ timeout NVal
40
+ preempted NVal
41
+ node_fail NVal
42
+ c_pending NNVal
43
+ c_running NVal
44
+ c_suspended NVal
45
+ c_cancelled NVal
46
+ c_completing NVal
47
+ c_completed NVal
48
+ c_configuring NVal
49
+ c_failed NVal
50
+ c_timeout NVal
51
+ c_preempted NVal
52
+ c_node_fail NVal
39
53
}
40
54
41
55
// Returns the scheduler metrics
42
56
func QueueGetMetrics () * QueueMetrics {
43
57
return ParseQueueMetrics (QueueData ())
44
58
}
45
59
60
+ func (s * NVal ) Incr (user string , part string , count float64 ) {
61
+ child , ok := (* s )[user ]
62
+ if ! ok {
63
+ child = map [string ]float64 {}
64
+ (* s )[user ] = child
65
+ child [part ] = 0
66
+ }
67
+ child [part ] += count
68
+ }
69
+
70
+ func (s * NNVal ) Incr2 (reason string , user string , part string , count float64 ) {
71
+ child , ok := (* s )[reason ]
72
+ if ! ok {
73
+ child = map [string ]map [string ]float64 {}
74
+ (* s )[reason ] = child
75
+ }
76
+ child2 , ok := (* s )[reason ][user ]
77
+ if ! ok {
78
+ child2 = map [string ]float64 {}
79
+ (* s )[reason ][user ] = child2
80
+ }
81
+ child2 [part ] += count
82
+ }
83
+
46
84
func ParseQueueMetrics (input []byte ) * QueueMetrics {
47
- var qm QueueMetrics
85
+ qm := QueueMetrics {
86
+ pending : make (NNVal ),
87
+ running : make (NVal ),
88
+ suspended : make (NVal ),
89
+ cancelled : make (NVal ),
90
+ completing : make (NVal ),
91
+ completed : make (NVal ),
92
+ configuring : make (NVal ),
93
+ failed : make (NVal ),
94
+ timeout : make (NVal ),
95
+ preempted : make (NVal ),
96
+ node_fail : make (NVal ),
97
+ c_pending : make (NNVal ),
98
+ c_running : make (NVal ),
99
+ c_suspended : make (NVal ),
100
+ c_cancelled : make (NVal ),
101
+ c_completing : make (NVal ),
102
+ c_completed : make (NVal ),
103
+ c_configuring : make (NVal ),
104
+ c_failed : make (NVal ),
105
+ c_timeout : make (NVal ),
106
+ c_preempted : make (NVal ),
107
+ c_node_fail : make (NVal ),
108
+ }
48
109
lines := strings .Split (string (input ), "\n " )
49
110
for _ , line := range lines {
50
111
if strings .Contains (line , "," ) {
51
- splitted := strings .Split (line , "," )
52
- state := splitted [1 ]
112
+ part := strings .Split (line , "," )[0 ]
113
+ part = strings .TrimSpace (part )
114
+ state := strings .Split (line , "," )[1 ]
115
+ cores_i , _ := strconv .Atoi (strings .Split (line , "," )[2 ])
116
+ cores := float64 (cores_i )
117
+ user := strings .Split (line , "," )[4 ]
118
+ user = strings .TrimSpace (user )
119
+ reason := strings .Split (line , "," )[3 ]
53
120
switch state {
54
121
case "PENDING" :
55
- qm .pending ++
56
- if len (splitted ) > 2 && splitted [2 ] == "Dependency" {
57
- qm .pending_dep ++
58
- }
122
+ qm .pending .Incr2 (reason , user , part , 1 )
123
+ qm .c_pending .Incr2 (reason , user , part , cores )
59
124
case "RUNNING" :
60
- qm .running ++
125
+ qm .running .Incr (user , part , 1 )
126
+ qm .c_running .Incr (user , part , cores )
61
127
case "SUSPENDED" :
62
- qm .suspended ++
128
+ qm .suspended .Incr (user , part , 1 )
129
+ qm .suspended .Incr (user , part , cores )
63
130
case "CANCELLED" :
64
- qm .cancelled ++
131
+ qm .cancelled .Incr (user , part , 1 )
132
+ qm .c_cancelled .Incr (user , part , cores )
65
133
case "COMPLETING" :
66
- qm .completing ++
134
+ qm .completing .Incr (user , part , 1 )
135
+ qm .c_completing .Incr (user , part , cores )
67
136
case "COMPLETED" :
68
- qm .completed ++
137
+ qm .completed .Incr (user , part , 1 )
138
+ qm .c_completed .Incr (user , part , cores )
69
139
case "CONFIGURING" :
70
- qm .configuring ++
140
+ qm .configuring .Incr (user , part , 1 )
141
+ qm .c_configuring .Incr (user , part , cores )
71
142
case "FAILED" :
72
- qm .failed ++
143
+ qm .failed .Incr (user , part , 1 )
144
+ qm .c_failed .Incr (user , part , cores )
73
145
case "TIMEOUT" :
74
- qm .timeout ++
146
+ qm .timeout .Incr (user , part , 1 )
147
+ qm .c_timeout .Incr (user , part , cores )
75
148
case "PREEMPTED" :
76
- qm .preempted ++
149
+ qm .preempted .Incr (user , part , 1 )
150
+ qm .c_preempted .Incr (user , part , cores )
77
151
case "NODE_FAIL" :
78
- qm .node_fail ++
152
+ qm .node_fail .Incr (user , part , 1 )
153
+ qm .c_node_fail .Incr (user , part , cores )
79
154
}
80
155
}
81
156
}
@@ -84,7 +159,7 @@ func ParseQueueMetrics(input []byte) *QueueMetrics {
84
159
85
160
// Execute the squeue command and return its output
86
161
func QueueData () []byte {
87
- cmd := exec .Command ("squeue" , "-a" , "-r" , "- h" , "-o %A ,%T,%r" , "--states=all " )
162
+ cmd := exec .Command ("/usr/bin/ squeue" , "-h" , "-o %P ,%T,%C,%r,%u " )
88
163
stdout , err := cmd .StdoutPipe ()
89
164
if err != nil {
90
165
log .Fatal (err )
@@ -107,39 +182,58 @@ func QueueData() []byte {
107
182
108
183
func NewQueueCollector () * QueueCollector {
109
184
return & QueueCollector {
110
- pending : prometheus .NewDesc ("slurm_queue_pending" , "Pending jobs in queue" , nil , nil ),
111
- pending_dep : prometheus .NewDesc ("slurm_queue_pending_dependency" , "Pending jobs because of dependency in queue" , nil , nil ),
112
- running : prometheus .NewDesc ("slurm_queue_running" , "Running jobs in the cluster" , nil , nil ),
113
- suspended : prometheus .NewDesc ("slurm_queue_suspended" , "Suspended jobs in the cluster" , nil , nil ),
114
- cancelled : prometheus .NewDesc ("slurm_queue_cancelled" , "Cancelled jobs in the cluster" , nil , nil ),
115
- completing : prometheus .NewDesc ("slurm_queue_completing" , "Completing jobs in the cluster" , nil , nil ),
116
- completed : prometheus .NewDesc ("slurm_queue_completed" , "Completed jobs in the cluster" , nil , nil ),
117
- configuring : prometheus .NewDesc ("slurm_queue_configuring" , "Configuring jobs in the cluster" , nil , nil ),
118
- failed : prometheus .NewDesc ("slurm_queue_failed" , "Number of failed jobs" , nil , nil ),
119
- timeout : prometheus .NewDesc ("slurm_queue_timeout" , "Jobs stopped by timeout" , nil , nil ),
120
- preempted : prometheus .NewDesc ("slurm_queue_preempted" , "Number of preempted jobs" , nil , nil ),
121
- node_fail : prometheus .NewDesc ("slurm_queue_node_fail" , "Number of jobs stopped due to node fail" , nil , nil ),
185
+ pending : prometheus .NewDesc ("slurm_queue_pending" , "Pending jobs in queue" , []string {"user" , "partition" , "reason" }, nil ),
186
+ running : prometheus .NewDesc ("slurm_queue_running" , "Running jobs in the cluster" , []string {"user" , "partition" }, nil ),
187
+ suspended : prometheus .NewDesc ("slurm_queue_suspended" , "Suspended jobs in the cluster" , []string {"user" , "partition" }, nil ),
188
+ cancelled : prometheus .NewDesc ("slurm_queue_cancelled" , "Cancelled jobs in the cluster" , []string {"user" , "partition" }, nil ),
189
+ completing : prometheus .NewDesc ("slurm_queue_completing" , "Completing jobs in the cluster" , []string {"user" , "partition" }, nil ),
190
+ completed : prometheus .NewDesc ("slurm_queue_completed" , "Completed jobs in the cluster" , []string {"user" , "partition" }, nil ),
191
+ configuring : prometheus .NewDesc ("slurm_queue_configuring" , "Configuring jobs in the cluster" , []string {"user" , "partition" }, nil ),
192
+ failed : prometheus .NewDesc ("slurm_queue_failed" , "Number of failed jobs" , []string {"user" , "partition" }, nil ),
193
+ timeout : prometheus .NewDesc ("slurm_queue_timeout" , "Jobs stopped by timeout" , []string {"user" , "partition" }, nil ),
194
+ preempted : prometheus .NewDesc ("slurm_queue_preempted" , "Number of preempted jobs" , []string {"user" , "partition" }, nil ),
195
+ node_fail : prometheus .NewDesc ("slurm_queue_node_fail" , "Number of jobs stopped due to node fail" , []string {"user" , "partition" }, nil ),
196
+ cores_pending : prometheus .NewDesc ("slurm_cores_pending" , "Pending cores in queue" , []string {"user" , "partition" , "reason" }, nil ),
197
+ cores_running : prometheus .NewDesc ("slurm_cores_running" , "Running cores in the cluster" , []string {"user" , "partition" }, nil ),
198
+ cores_suspended : prometheus .NewDesc ("slurm_cores_suspended" , "Suspended cores in the cluster" , []string {"user" , "partition" }, nil ),
199
+ cores_cancelled : prometheus .NewDesc ("slurm_cores_cancelled" , "Cancelled cores in the cluster" , []string {"user" , "partition" }, nil ),
200
+ cores_completing : prometheus .NewDesc ("slurm_cores_completing" , "Completing cores in the cluster" , []string {"user" , "partition" }, nil ),
201
+ cores_completed : prometheus .NewDesc ("slurm_cores_completed" , "Completed cores in the cluster" , []string {"user" , "partition" }, nil ),
202
+ cores_configuring : prometheus .NewDesc ("slurm_cores_configuring" , "Configuring cores in the cluster" , []string {"user" , "partition" }, nil ),
203
+ cores_failed : prometheus .NewDesc ("slurm_cores_failed" , "Number of failed cores" , []string {"user" , "partition" }, nil ),
204
+ cores_timeout : prometheus .NewDesc ("slurm_cores_timeout" , "Cores stopped by timeout" , []string {"user" , "partition" }, nil ),
205
+ cores_preempted : prometheus .NewDesc ("slurm_cores_preempted" , "Number of preempted cores" , []string {"user" , "partition" }, nil ),
206
+ cores_node_fail : prometheus .NewDesc ("slurm_cores_node_fail" , "Number of cores stopped due to node fail" , []string {"user" , "partition" }, nil ),
122
207
}
123
208
}
124
209
125
210
type QueueCollector struct {
126
- pending * prometheus.Desc
127
- pending_dep * prometheus.Desc
128
- running * prometheus.Desc
129
- suspended * prometheus.Desc
130
- cancelled * prometheus.Desc
131
- completing * prometheus.Desc
132
- completed * prometheus.Desc
133
- configuring * prometheus.Desc
134
- failed * prometheus.Desc
135
- timeout * prometheus.Desc
136
- preempted * prometheus.Desc
137
- node_fail * prometheus.Desc
211
+ pending * prometheus.Desc
212
+ running * prometheus.Desc
213
+ suspended * prometheus.Desc
214
+ cancelled * prometheus.Desc
215
+ completing * prometheus.Desc
216
+ completed * prometheus.Desc
217
+ configuring * prometheus.Desc
218
+ failed * prometheus.Desc
219
+ timeout * prometheus.Desc
220
+ preempted * prometheus.Desc
221
+ node_fail * prometheus.Desc
222
+ cores_pending * prometheus.Desc
223
+ cores_running * prometheus.Desc
224
+ cores_suspended * prometheus.Desc
225
+ cores_cancelled * prometheus.Desc
226
+ cores_completing * prometheus.Desc
227
+ cores_completed * prometheus.Desc
228
+ cores_configuring * prometheus.Desc
229
+ cores_failed * prometheus.Desc
230
+ cores_timeout * prometheus.Desc
231
+ cores_preempted * prometheus.Desc
232
+ cores_node_fail * prometheus.Desc
138
233
}
139
234
140
235
func (qc * QueueCollector ) Describe (ch chan <- * prometheus.Desc ) {
141
236
ch <- qc .pending
142
- ch <- qc .pending_dep
143
237
ch <- qc .running
144
238
ch <- qc .suspended
145
239
ch <- qc .cancelled
@@ -150,20 +244,56 @@ func (qc *QueueCollector) Describe(ch chan<- *prometheus.Desc) {
150
244
ch <- qc .timeout
151
245
ch <- qc .preempted
152
246
ch <- qc .node_fail
247
+ ch <- qc .cores_pending
248
+ ch <- qc .cores_running
249
+ ch <- qc .cores_suspended
250
+ ch <- qc .cores_cancelled
251
+ ch <- qc .cores_completing
252
+ ch <- qc .cores_completed
253
+ ch <- qc .cores_configuring
254
+ ch <- qc .cores_failed
255
+ ch <- qc .cores_timeout
256
+ ch <- qc .cores_preempted
257
+ ch <- qc .cores_node_fail
153
258
}
154
259
155
260
func (qc * QueueCollector ) Collect (ch chan <- prometheus.Metric ) {
156
261
qm := QueueGetMetrics ()
157
- ch <- prometheus .MustNewConstMetric (qc .pending , prometheus .GaugeValue , qm .pending )
158
- ch <- prometheus .MustNewConstMetric (qc .pending_dep , prometheus .GaugeValue , qm .pending_dep )
159
- ch <- prometheus .MustNewConstMetric (qc .running , prometheus .GaugeValue , qm .running )
160
- ch <- prometheus .MustNewConstMetric (qc .suspended , prometheus .GaugeValue , qm .suspended )
161
- ch <- prometheus .MustNewConstMetric (qc .cancelled , prometheus .GaugeValue , qm .cancelled )
162
- ch <- prometheus .MustNewConstMetric (qc .completing , prometheus .GaugeValue , qm .completing )
163
- ch <- prometheus .MustNewConstMetric (qc .completed , prometheus .GaugeValue , qm .completed )
164
- ch <- prometheus .MustNewConstMetric (qc .configuring , prometheus .GaugeValue , qm .configuring )
165
- ch <- prometheus .MustNewConstMetric (qc .failed , prometheus .GaugeValue , qm .failed )
166
- ch <- prometheus .MustNewConstMetric (qc .timeout , prometheus .GaugeValue , qm .timeout )
167
- ch <- prometheus .MustNewConstMetric (qc .preempted , prometheus .GaugeValue , qm .preempted )
168
- ch <- prometheus .MustNewConstMetric (qc .node_fail , prometheus .GaugeValue , qm .node_fail )
262
+ for reason , values := range qm .pending {
263
+ PushMetric (values , ch , qc .pending , reason )
264
+ }
265
+
266
+ PushMetric (qm .running , ch , qc .running , "" )
267
+ PushMetric (qm .cancelled , ch , qc .cancelled , "" )
268
+ PushMetric (qm .completing , ch , qc .completing , "" )
269
+ PushMetric (qm .completed , ch , qc .completed , "" )
270
+ PushMetric (qm .configuring , ch , qc .configuring , "" )
271
+ PushMetric (qm .failed , ch , qc .failed , "" )
272
+ PushMetric (qm .timeout , ch , qc .timeout , "" )
273
+ PushMetric (qm .preempted , ch , qc .preempted , "" )
274
+ PushMetric (qm .node_fail , ch , qc .node_fail , "" )
275
+ for reason , value := range qm .c_pending {
276
+ PushMetric (value , ch , qc .cores_pending , reason )
277
+ }
278
+ PushMetric (qm .c_running , ch , qc .cores_running , "" )
279
+ PushMetric (qm .c_cancelled , ch , qc .cores_cancelled , "" )
280
+ PushMetric (qm .c_completing , ch , qc .cores_completing , "" )
281
+ PushMetric (qm .c_completed , ch , qc .cores_completed , "" )
282
+ PushMetric (qm .c_configuring , ch , qc .cores_configuring , "" )
283
+ PushMetric (qm .c_failed , ch , qc .cores_failed , "" )
284
+ PushMetric (qm .c_timeout , ch , qc .cores_timeout , "" )
285
+ PushMetric (qm .c_preempted , ch , qc .cores_preempted , "" )
286
+ PushMetric (qm .c_node_fail , ch , qc .cores_node_fail , "" )
287
+ }
288
+
289
+ func PushMetric (m map [string ]map [string ]float64 , ch chan <- prometheus.Metric , coll * prometheus.Desc , a_label string ) {
290
+ for label1 , vals1 := range m {
291
+ for label2 , val := range vals1 {
292
+ if a_label != "" {
293
+ ch <- prometheus .MustNewConstMetric (coll , prometheus .GaugeValue , val , label1 , label2 , a_label )
294
+ } else {
295
+ ch <- prometheus .MustNewConstMetric (coll , prometheus .GaugeValue , val , label1 , label2 )
296
+ }
297
+ }
298
+ }
169
299
}
0 commit comments