Skip to content

Commit 214e48a

Browse files
committed
queue: Collect jobs with user, partition and reason information
1 parent df50ddf commit 214e48a

File tree

1 file changed

+197
-67
lines changed

1 file changed

+197
-67
lines changed

queue.go

Lines changed: 197 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -20,62 +20,137 @@ import (
2020
"io/ioutil"
2121
"log"
2222
"os/exec"
23+
"strconv"
2324
"strings"
2425
)
2526

27+
type NNVal map[string]map[string]map[string]float64
28+
type NVal map[string]map[string]float64
29+
2630
type QueueMetrics struct {
27-
pending float64
28-
pending_dep float64
29-
running float64
30-
suspended float64
31-
cancelled float64
32-
completing float64
33-
completed float64
34-
configuring float64
35-
failed float64
36-
timeout float64
37-
preempted float64
38-
node_fail float64
31+
pending NNVal
32+
running NVal
33+
suspended NVal
34+
cancelled NVal
35+
completing NVal
36+
completed NVal
37+
configuring NVal
38+
failed NVal
39+
timeout NVal
40+
preempted NVal
41+
node_fail NVal
42+
c_pending NNVal
43+
c_running NVal
44+
c_suspended NVal
45+
c_cancelled NVal
46+
c_completing NVal
47+
c_completed NVal
48+
c_configuring NVal
49+
c_failed NVal
50+
c_timeout NVal
51+
c_preempted NVal
52+
c_node_fail NVal
3953
}
4054

4155
// Returns the scheduler metrics
4256
func QueueGetMetrics() *QueueMetrics {
4357
return ParseQueueMetrics(QueueData())
4458
}
4559

60+
func (s *NVal) Incr(user string, part string, count float64) {
61+
child, ok := (*s)[user]
62+
if !ok {
63+
child = map[string]float64{}
64+
(*s)[user] = child
65+
child[part] = 0
66+
}
67+
child[part] += count
68+
}
69+
70+
func (s *NNVal) Incr2(reason string, user string, part string, count float64) {
71+
child, ok := (*s)[reason]
72+
if !ok {
73+
child = map[string]map[string]float64{}
74+
(*s)[reason] = child
75+
}
76+
child2, ok := (*s)[reason][user]
77+
if !ok {
78+
child2 = map[string]float64{}
79+
(*s)[reason][user] = child2
80+
}
81+
child2[part] += count
82+
}
83+
4684
func ParseQueueMetrics(input []byte) *QueueMetrics {
47-
var qm QueueMetrics
85+
qm := QueueMetrics{
86+
pending: make(NNVal),
87+
running: make(NVal),
88+
suspended: make(NVal),
89+
cancelled: make(NVal),
90+
completing: make(NVal),
91+
completed: make(NVal),
92+
configuring: make(NVal),
93+
failed: make(NVal),
94+
timeout: make(NVal),
95+
preempted: make(NVal),
96+
node_fail: make(NVal),
97+
c_pending: make(NNVal),
98+
c_running: make(NVal),
99+
c_suspended: make(NVal),
100+
c_cancelled: make(NVal),
101+
c_completing: make(NVal),
102+
c_completed: make(NVal),
103+
c_configuring: make(NVal),
104+
c_failed: make(NVal),
105+
c_timeout: make(NVal),
106+
c_preempted: make(NVal),
107+
c_node_fail: make(NVal),
108+
}
48109
lines := strings.Split(string(input), "\n")
49110
for _, line := range lines {
50111
if strings.Contains(line, ",") {
51-
splitted := strings.Split(line, ",")
52-
state := splitted[1]
112+
part := strings.Split(line, ",")[0]
113+
part = strings.TrimSpace(part)
114+
state := strings.Split(line, ",")[1]
115+
cores_i, _ := strconv.Atoi(strings.Split(line, ",")[2])
116+
cores := float64(cores_i)
117+
user := strings.Split(line, ",")[4]
118+
user = strings.TrimSpace(user)
119+
reason := strings.Split(line, ",")[3]
53120
switch state {
54121
case "PENDING":
55-
qm.pending++
56-
if len(splitted) > 2 && splitted[2] == "Dependency" {
57-
qm.pending_dep++
58-
}
122+
qm.pending.Incr2(reason, user, part, 1)
123+
qm.c_pending.Incr2(reason, user, part, cores)
59124
case "RUNNING":
60-
qm.running++
125+
qm.running.Incr(user, part, 1)
126+
qm.c_running.Incr(user, part, cores)
61127
case "SUSPENDED":
62-
qm.suspended++
128+
qm.suspended.Incr(user, part, 1)
129+
qm.suspended.Incr(user, part, cores)
63130
case "CANCELLED":
64-
qm.cancelled++
131+
qm.cancelled.Incr(user, part, 1)
132+
qm.c_cancelled.Incr(user, part, cores)
65133
case "COMPLETING":
66-
qm.completing++
134+
qm.completing.Incr(user, part, 1)
135+
qm.c_completing.Incr(user, part, cores)
67136
case "COMPLETED":
68-
qm.completed++
137+
qm.completed.Incr(user, part, 1)
138+
qm.c_completed.Incr(user, part, cores)
69139
case "CONFIGURING":
70-
qm.configuring++
140+
qm.configuring.Incr(user, part, 1)
141+
qm.c_configuring.Incr(user, part, cores)
71142
case "FAILED":
72-
qm.failed++
143+
qm.failed.Incr(user, part, 1)
144+
qm.c_failed.Incr(user, part, cores)
73145
case "TIMEOUT":
74-
qm.timeout++
146+
qm.timeout.Incr(user, part, 1)
147+
qm.c_timeout.Incr(user, part, cores)
75148
case "PREEMPTED":
76-
qm.preempted++
149+
qm.preempted.Incr(user, part, 1)
150+
qm.c_preempted.Incr(user, part, cores)
77151
case "NODE_FAIL":
78-
qm.node_fail++
152+
qm.node_fail.Incr(user, part, 1)
153+
qm.c_node_fail.Incr(user, part, cores)
79154
}
80155
}
81156
}
@@ -84,7 +159,7 @@ func ParseQueueMetrics(input []byte) *QueueMetrics {
84159

85160
// Execute the squeue command and return its output
86161
func QueueData() []byte {
87-
cmd := exec.Command("squeue", "-a", "-r", "-h", "-o %A,%T,%r", "--states=all")
162+
cmd := exec.Command("/usr/bin/squeue", "-h", "-o %P,%T,%C,%r,%u")
88163
stdout, err := cmd.StdoutPipe()
89164
if err != nil {
90165
log.Fatal(err)
@@ -107,39 +182,58 @@ func QueueData() []byte {
107182

108183
func NewQueueCollector() *QueueCollector {
109184
return &QueueCollector{
110-
pending: prometheus.NewDesc("slurm_queue_pending", "Pending jobs in queue", nil, nil),
111-
pending_dep: prometheus.NewDesc("slurm_queue_pending_dependency", "Pending jobs because of dependency in queue", nil, nil),
112-
running: prometheus.NewDesc("slurm_queue_running", "Running jobs in the cluster", nil, nil),
113-
suspended: prometheus.NewDesc("slurm_queue_suspended", "Suspended jobs in the cluster", nil, nil),
114-
cancelled: prometheus.NewDesc("slurm_queue_cancelled", "Cancelled jobs in the cluster", nil, nil),
115-
completing: prometheus.NewDesc("slurm_queue_completing", "Completing jobs in the cluster", nil, nil),
116-
completed: prometheus.NewDesc("slurm_queue_completed", "Completed jobs in the cluster", nil, nil),
117-
configuring: prometheus.NewDesc("slurm_queue_configuring", "Configuring jobs in the cluster", nil, nil),
118-
failed: prometheus.NewDesc("slurm_queue_failed", "Number of failed jobs", nil, nil),
119-
timeout: prometheus.NewDesc("slurm_queue_timeout", "Jobs stopped by timeout", nil, nil),
120-
preempted: prometheus.NewDesc("slurm_queue_preempted", "Number of preempted jobs", nil, nil),
121-
node_fail: prometheus.NewDesc("slurm_queue_node_fail", "Number of jobs stopped due to node fail", nil, nil),
185+
pending: prometheus.NewDesc("slurm_queue_pending", "Pending jobs in queue", []string{"user", "partition", "reason"}, nil),
186+
running: prometheus.NewDesc("slurm_queue_running", "Running jobs in the cluster", []string{"user", "partition"}, nil),
187+
suspended: prometheus.NewDesc("slurm_queue_suspended", "Suspended jobs in the cluster", []string{"user", "partition"}, nil),
188+
cancelled: prometheus.NewDesc("slurm_queue_cancelled", "Cancelled jobs in the cluster", []string{"user", "partition"}, nil),
189+
completing: prometheus.NewDesc("slurm_queue_completing", "Completing jobs in the cluster", []string{"user", "partition"}, nil),
190+
completed: prometheus.NewDesc("slurm_queue_completed", "Completed jobs in the cluster", []string{"user", "partition"}, nil),
191+
configuring: prometheus.NewDesc("slurm_queue_configuring", "Configuring jobs in the cluster", []string{"user", "partition"}, nil),
192+
failed: prometheus.NewDesc("slurm_queue_failed", "Number of failed jobs", []string{"user", "partition"}, nil),
193+
timeout: prometheus.NewDesc("slurm_queue_timeout", "Jobs stopped by timeout", []string{"user", "partition"}, nil),
194+
preempted: prometheus.NewDesc("slurm_queue_preempted", "Number of preempted jobs", []string{"user", "partition"}, nil),
195+
node_fail: prometheus.NewDesc("slurm_queue_node_fail", "Number of jobs stopped due to node fail", []string{"user", "partition"}, nil),
196+
cores_pending: prometheus.NewDesc("slurm_cores_pending", "Pending cores in queue", []string{"user", "partition", "reason"}, nil),
197+
cores_running: prometheus.NewDesc("slurm_cores_running", "Running cores in the cluster", []string{"user", "partition"}, nil),
198+
cores_suspended: prometheus.NewDesc("slurm_cores_suspended", "Suspended cores in the cluster", []string{"user", "partition"}, nil),
199+
cores_cancelled: prometheus.NewDesc("slurm_cores_cancelled", "Cancelled cores in the cluster", []string{"user", "partition"}, nil),
200+
cores_completing: prometheus.NewDesc("slurm_cores_completing", "Completing cores in the cluster", []string{"user", "partition"}, nil),
201+
cores_completed: prometheus.NewDesc("slurm_cores_completed", "Completed cores in the cluster", []string{"user", "partition"}, nil),
202+
cores_configuring: prometheus.NewDesc("slurm_cores_configuring", "Configuring cores in the cluster", []string{"user", "partition"}, nil),
203+
cores_failed: prometheus.NewDesc("slurm_cores_failed", "Number of failed cores", []string{"user", "partition"}, nil),
204+
cores_timeout: prometheus.NewDesc("slurm_cores_timeout", "Cores stopped by timeout", []string{"user", "partition"}, nil),
205+
cores_preempted: prometheus.NewDesc("slurm_cores_preempted", "Number of preempted cores", []string{"user", "partition"}, nil),
206+
cores_node_fail: prometheus.NewDesc("slurm_cores_node_fail", "Number of cores stopped due to node fail", []string{"user", "partition"}, nil),
122207
}
123208
}
124209

125210
type QueueCollector struct {
126-
pending *prometheus.Desc
127-
pending_dep *prometheus.Desc
128-
running *prometheus.Desc
129-
suspended *prometheus.Desc
130-
cancelled *prometheus.Desc
131-
completing *prometheus.Desc
132-
completed *prometheus.Desc
133-
configuring *prometheus.Desc
134-
failed *prometheus.Desc
135-
timeout *prometheus.Desc
136-
preempted *prometheus.Desc
137-
node_fail *prometheus.Desc
211+
pending *prometheus.Desc
212+
running *prometheus.Desc
213+
suspended *prometheus.Desc
214+
cancelled *prometheus.Desc
215+
completing *prometheus.Desc
216+
completed *prometheus.Desc
217+
configuring *prometheus.Desc
218+
failed *prometheus.Desc
219+
timeout *prometheus.Desc
220+
preempted *prometheus.Desc
221+
node_fail *prometheus.Desc
222+
cores_pending *prometheus.Desc
223+
cores_running *prometheus.Desc
224+
cores_suspended *prometheus.Desc
225+
cores_cancelled *prometheus.Desc
226+
cores_completing *prometheus.Desc
227+
cores_completed *prometheus.Desc
228+
cores_configuring *prometheus.Desc
229+
cores_failed *prometheus.Desc
230+
cores_timeout *prometheus.Desc
231+
cores_preempted *prometheus.Desc
232+
cores_node_fail *prometheus.Desc
138233
}
139234

140235
func (qc *QueueCollector) Describe(ch chan<- *prometheus.Desc) {
141236
ch <- qc.pending
142-
ch <- qc.pending_dep
143237
ch <- qc.running
144238
ch <- qc.suspended
145239
ch <- qc.cancelled
@@ -150,20 +244,56 @@ func (qc *QueueCollector) Describe(ch chan<- *prometheus.Desc) {
150244
ch <- qc.timeout
151245
ch <- qc.preempted
152246
ch <- qc.node_fail
247+
ch <- qc.cores_pending
248+
ch <- qc.cores_running
249+
ch <- qc.cores_suspended
250+
ch <- qc.cores_cancelled
251+
ch <- qc.cores_completing
252+
ch <- qc.cores_completed
253+
ch <- qc.cores_configuring
254+
ch <- qc.cores_failed
255+
ch <- qc.cores_timeout
256+
ch <- qc.cores_preempted
257+
ch <- qc.cores_node_fail
153258
}
154259

155260
func (qc *QueueCollector) Collect(ch chan<- prometheus.Metric) {
156261
qm := QueueGetMetrics()
157-
ch <- prometheus.MustNewConstMetric(qc.pending, prometheus.GaugeValue, qm.pending)
158-
ch <- prometheus.MustNewConstMetric(qc.pending_dep, prometheus.GaugeValue, qm.pending_dep)
159-
ch <- prometheus.MustNewConstMetric(qc.running, prometheus.GaugeValue, qm.running)
160-
ch <- prometheus.MustNewConstMetric(qc.suspended, prometheus.GaugeValue, qm.suspended)
161-
ch <- prometheus.MustNewConstMetric(qc.cancelled, prometheus.GaugeValue, qm.cancelled)
162-
ch <- prometheus.MustNewConstMetric(qc.completing, prometheus.GaugeValue, qm.completing)
163-
ch <- prometheus.MustNewConstMetric(qc.completed, prometheus.GaugeValue, qm.completed)
164-
ch <- prometheus.MustNewConstMetric(qc.configuring, prometheus.GaugeValue, qm.configuring)
165-
ch <- prometheus.MustNewConstMetric(qc.failed, prometheus.GaugeValue, qm.failed)
166-
ch <- prometheus.MustNewConstMetric(qc.timeout, prometheus.GaugeValue, qm.timeout)
167-
ch <- prometheus.MustNewConstMetric(qc.preempted, prometheus.GaugeValue, qm.preempted)
168-
ch <- prometheus.MustNewConstMetric(qc.node_fail, prometheus.GaugeValue, qm.node_fail)
262+
for reason, values := range qm.pending {
263+
PushMetric(values, ch, qc.pending, reason)
264+
}
265+
266+
PushMetric(qm.running, ch, qc.running, "")
267+
PushMetric(qm.cancelled, ch, qc.cancelled, "")
268+
PushMetric(qm.completing, ch, qc.completing, "")
269+
PushMetric(qm.completed, ch, qc.completed, "")
270+
PushMetric(qm.configuring, ch, qc.configuring, "")
271+
PushMetric(qm.failed, ch, qc.failed, "")
272+
PushMetric(qm.timeout, ch, qc.timeout, "")
273+
PushMetric(qm.preempted, ch, qc.preempted, "")
274+
PushMetric(qm.node_fail, ch, qc.node_fail, "")
275+
for reason, value := range qm.c_pending {
276+
PushMetric(value, ch, qc.cores_pending, reason)
277+
}
278+
PushMetric(qm.c_running, ch, qc.cores_running, "")
279+
PushMetric(qm.c_cancelled, ch, qc.cores_cancelled, "")
280+
PushMetric(qm.c_completing, ch, qc.cores_completing, "")
281+
PushMetric(qm.c_completed, ch, qc.cores_completed, "")
282+
PushMetric(qm.c_configuring, ch, qc.cores_configuring, "")
283+
PushMetric(qm.c_failed, ch, qc.cores_failed, "")
284+
PushMetric(qm.c_timeout, ch, qc.cores_timeout, "")
285+
PushMetric(qm.c_preempted, ch, qc.cores_preempted, "")
286+
PushMetric(qm.c_node_fail, ch, qc.cores_node_fail, "")
287+
}
288+
289+
func PushMetric(m map[string]map[string]float64, ch chan<- prometheus.Metric, coll *prometheus.Desc, a_label string) {
290+
for label1, vals1 := range m {
291+
for label2, val := range vals1 {
292+
if a_label != "" {
293+
ch <- prometheus.MustNewConstMetric(coll, prometheus.GaugeValue, val, label1, label2, a_label)
294+
} else {
295+
ch <- prometheus.MustNewConstMetric(coll, prometheus.GaugeValue, val, label1, label2)
296+
}
297+
}
298+
}
169299
}

0 commit comments

Comments
 (0)