Skip to content

Commit dff6df3

Browse files
committed
scheduler: Collect rpc stats
1 parent 214e48a commit dff6df3

File tree

1 file changed

+169
-20
lines changed

1 file changed

+169
-20
lines changed

scheduler.go

Lines changed: 169 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ package main
1717

1818
import (
1919
"github.com/prometheus/client_golang/prometheus"
20+
"github.com/prometheus/common/log"
2021
"io/ioutil"
21-
"log"
2222
"os/exec"
2323
"regexp"
2424
"strconv"
@@ -33,23 +33,29 @@ import (
3333

3434
// Basic metrics for the scheduler
3535
type SchedulerMetrics struct {
36-
threads float64
37-
queue_size float64
38-
dbd_queue_size float64
39-
last_cycle float64
40-
mean_cycle float64
41-
cycle_per_minute float64
42-
backfill_last_cycle float64
43-
backfill_mean_cycle float64
44-
backfill_depth_mean float64
36+
threads float64
37+
queue_size float64
38+
dbd_queue_size float64
39+
last_cycle float64
40+
mean_cycle float64
41+
cycle_per_minute float64
42+
backfill_last_cycle float64
43+
backfill_mean_cycle float64
44+
backfill_depth_mean float64
4545
total_backfilled_jobs_since_start float64
4646
total_backfilled_jobs_since_cycle float64
4747
total_backfilled_heterogeneous float64
48+
rpc_stats_count map[string]float64
49+
rpc_stats_avg_time map[string]float64
50+
rpc_stats_total_time map[string]float64
51+
user_rpc_stats_count map[string]float64
52+
user_rpc_stats_avg_time map[string]float64
53+
user_rpc_stats_total_time map[string]float64
4854
}
4955

5056
// Execute the sdiag command and return its output
5157
func SchedulerData() []byte {
52-
cmd := exec.Command("sdiag")
58+
cmd := exec.Command("/usr/bin/sdiag")
5359
stdout, err := cmd.StdoutPipe()
5460
if err != nil {
5561
log.Fatal(err)
@@ -121,9 +127,87 @@ func ParseSchedulerMetrics(input []byte) *SchedulerMetrics {
121127
}
122128
}
123129
}
130+
rpc_stats := ParseRpcStats(lines)
131+
sm.rpc_stats_count = rpc_stats[0]
132+
sm.rpc_stats_avg_time = rpc_stats[1]
133+
sm.rpc_stats_total_time = rpc_stats[2]
134+
sm.user_rpc_stats_count = rpc_stats[3]
135+
sm.user_rpc_stats_avg_time = rpc_stats[4]
136+
sm.user_rpc_stats_total_time = rpc_stats[5]
124137
return &sm
125138
}
126139

140+
// Helper function to split a single line from the sdiag output
141+
func SplitColonValueToFloat(input string) float64 {
142+
str := strings.Split(input, ":")
143+
if len(str) == 1 {
144+
return 0
145+
} else {
146+
rvalue := strings.TrimSpace(str[1])
147+
flt, _ := strconv.ParseFloat(rvalue, 64)
148+
return flt
149+
}
150+
}
151+
152+
// Helper function to return RPC stats from sdiag output
153+
func ParseRpcStats(lines []string) []map[string]float64 {
154+
var in_rpc bool
155+
var in_rpc_per_user bool
156+
var count_stats map[string]float64
157+
var avg_stats map[string]float64
158+
var total_stats map[string]float64
159+
var user_count_stats map[string]float64
160+
var user_avg_stats map[string]float64
161+
var user_total_stats map[string]float64
162+
163+
count_stats = make(map[string]float64)
164+
avg_stats = make(map[string]float64)
165+
total_stats = make(map[string]float64)
166+
user_count_stats = make(map[string]float64)
167+
user_avg_stats = make(map[string]float64)
168+
user_total_stats = make(map[string]float64)
169+
170+
in_rpc = false
171+
in_rpc_per_user = false
172+
173+
stat_line_re := regexp.MustCompile(`^\s*([A-Za-z0-9_]*).*count:([0-9]*)\s*ave_time:([0-9]*)\s\s*total_time:([0-9]*)\s*$`)
174+
175+
for _, line := range lines {
176+
if strings.Contains(line, "Remote Procedure Call statistics by message type") {
177+
in_rpc = true
178+
in_rpc_per_user = false
179+
} else if strings.Contains(line, "Remote Procedure Call statistics by user") {
180+
in_rpc = false
181+
in_rpc_per_user = true
182+
}
183+
if in_rpc || in_rpc_per_user {
184+
re_match := stat_line_re.FindAllStringSubmatch(line, -1)
185+
if re_match != nil {
186+
re_match_first := re_match[0]
187+
if in_rpc {
188+
count_stats[re_match_first[1]], _ = strconv.ParseFloat(re_match_first[2], 64)
189+
avg_stats[re_match_first[1]], _ = strconv.ParseFloat(re_match_first[3], 64)
190+
total_stats[re_match_first[1]], _ = strconv.ParseFloat(re_match_first[4], 64)
191+
} else if in_rpc_per_user {
192+
user_count_stats[re_match_first[1]], _ = strconv.ParseFloat(re_match_first[2], 64)
193+
user_avg_stats[re_match_first[1]], _ = strconv.ParseFloat(re_match_first[3], 64)
194+
user_total_stats[re_match_first[1]], _ = strconv.ParseFloat(re_match_first[4], 64)
195+
}
196+
}
197+
}
198+
}
199+
200+
rpc_stats_final := []map[string]float64{
201+
count_stats,
202+
avg_stats,
203+
total_stats,
204+
user_count_stats,
205+
user_avg_stats,
206+
user_total_stats,
207+
}
208+
return rpc_stats_final
209+
}
210+
127211
// Returns the scheduler metrics
128212
func SchedulerGetMetrics() *SchedulerMetrics {
129213
return ParseSchedulerMetrics(SchedulerData())
@@ -137,18 +221,24 @@ func SchedulerGetMetrics() *SchedulerMetrics {
137221

138222
// Collector strcture
139223
type SchedulerCollector struct {
140-
threads *prometheus.Desc
141-
queue_size *prometheus.Desc
142-
dbd_queue_size *prometheus.Desc
143-
last_cycle *prometheus.Desc
144-
mean_cycle *prometheus.Desc
145-
cycle_per_minute *prometheus.Desc
146-
backfill_last_cycle *prometheus.Desc
147-
backfill_mean_cycle *prometheus.Desc
148-
backfill_depth_mean *prometheus.Desc
224+
threads *prometheus.Desc
225+
queue_size *prometheus.Desc
226+
dbd_queue_size *prometheus.Desc
227+
last_cycle *prometheus.Desc
228+
mean_cycle *prometheus.Desc
229+
cycle_per_minute *prometheus.Desc
230+
backfill_last_cycle *prometheus.Desc
231+
backfill_mean_cycle *prometheus.Desc
232+
backfill_depth_mean *prometheus.Desc
149233
total_backfilled_jobs_since_start *prometheus.Desc
150234
total_backfilled_jobs_since_cycle *prometheus.Desc
151235
total_backfilled_heterogeneous *prometheus.Desc
236+
rpc_stats_count *prometheus.Desc
237+
rpc_stats_avg_time *prometheus.Desc
238+
rpc_stats_total_time *prometheus.Desc
239+
user_rpc_stats_count *prometheus.Desc
240+
user_rpc_stats_avg_time *prometheus.Desc
241+
user_rpc_stats_total_time *prometheus.Desc
152242
}
153243

154244
// Send all metric descriptions
@@ -165,6 +255,12 @@ func (c *SchedulerCollector) Describe(ch chan<- *prometheus.Desc) {
165255
ch <- c.total_backfilled_jobs_since_start
166256
ch <- c.total_backfilled_jobs_since_cycle
167257
ch <- c.total_backfilled_heterogeneous
258+
ch <- c.rpc_stats_count
259+
ch <- c.rpc_stats_avg_time
260+
ch <- c.rpc_stats_total_time
261+
ch <- c.user_rpc_stats_count
262+
ch <- c.user_rpc_stats_avg_time
263+
ch <- c.user_rpc_stats_total_time
168264
}
169265

170266
// Send the values of all metrics
@@ -182,10 +278,33 @@ func (sc *SchedulerCollector) Collect(ch chan<- prometheus.Metric) {
182278
ch <- prometheus.MustNewConstMetric(sc.total_backfilled_jobs_since_start, prometheus.GaugeValue, sm.total_backfilled_jobs_since_start)
183279
ch <- prometheus.MustNewConstMetric(sc.total_backfilled_jobs_since_cycle, prometheus.GaugeValue, sm.total_backfilled_jobs_since_cycle)
184280
ch <- prometheus.MustNewConstMetric(sc.total_backfilled_heterogeneous, prometheus.GaugeValue, sm.total_backfilled_heterogeneous)
281+
for rpc_type, value := range sm.rpc_stats_count {
282+
ch <- prometheus.MustNewConstMetric(sc.rpc_stats_count, prometheus.GaugeValue, value, rpc_type)
283+
}
284+
for rpc_type, value := range sm.rpc_stats_avg_time {
285+
ch <- prometheus.MustNewConstMetric(sc.rpc_stats_avg_time, prometheus.GaugeValue, value, rpc_type)
286+
}
287+
for rpc_type, value := range sm.rpc_stats_total_time {
288+
ch <- prometheus.MustNewConstMetric(sc.rpc_stats_total_time, prometheus.GaugeValue, value, rpc_type)
289+
}
290+
for user, value := range sm.user_rpc_stats_count {
291+
ch <- prometheus.MustNewConstMetric(sc.user_rpc_stats_count, prometheus.GaugeValue, value, user)
292+
}
293+
for user, value := range sm.user_rpc_stats_avg_time {
294+
ch <- prometheus.MustNewConstMetric(sc.user_rpc_stats_avg_time, prometheus.GaugeValue, value, user)
295+
}
296+
for user, value := range sm.user_rpc_stats_total_time {
297+
ch <- prometheus.MustNewConstMetric(sc.user_rpc_stats_total_time, prometheus.GaugeValue, value, user)
298+
}
299+
185300
}
186301

187302
// Returns the Slurm scheduler collector, used to register with the prometheus client
188303
func NewSchedulerCollector() *SchedulerCollector {
304+
rpc_stats_labels := make([]string, 0, 1)
305+
rpc_stats_labels = append(rpc_stats_labels, "operation")
306+
user_rpc_stats_labels := make([]string, 0, 1)
307+
user_rpc_stats_labels = append(user_rpc_stats_labels, "user")
189308
return &SchedulerCollector{
190309
threads: prometheus.NewDesc(
191310
"slurm_scheduler_threads",
@@ -247,5 +366,35 @@ func NewSchedulerCollector() *SchedulerCollector {
247366
"Information provided by the Slurm sdiag command, number of heterogeneous job components started thanks to backfilling since last Slurm start",
248367
nil,
249368
nil),
369+
rpc_stats_count: prometheus.NewDesc(
370+
"slurm_rpc_stats",
371+
"Information provided by the Slurm sdiag command, rpc count statistic",
372+
rpc_stats_labels,
373+
nil),
374+
rpc_stats_avg_time: prometheus.NewDesc(
375+
"slurm_rpc_stats_avg_time",
376+
"Information provided by the Slurm sdiag command, rpc average time statistic",
377+
rpc_stats_labels,
378+
nil),
379+
rpc_stats_total_time: prometheus.NewDesc(
380+
"slurm_rpc_stats_total_time",
381+
"Information provided by the Slurm sdiag command, rpc total time statistic",
382+
rpc_stats_labels,
383+
nil),
384+
user_rpc_stats_count: prometheus.NewDesc(
385+
"slurm_user_rpc_stats",
386+
"Information provided by the Slurm sdiag command, rpc count statistic per user",
387+
user_rpc_stats_labels,
388+
nil),
389+
user_rpc_stats_avg_time: prometheus.NewDesc(
390+
"slurm_user_rpc_stats_avg_time",
391+
"Information provided by the Slurm sdiag command, rpc average time statistic per user",
392+
user_rpc_stats_labels,
393+
nil),
394+
user_rpc_stats_total_time: prometheus.NewDesc(
395+
"slurm_user_rpc_stats_total_time",
396+
"Information provided by the Slurm sdiag command, rpc total time statistic per user",
397+
user_rpc_stats_labels,
398+
nil),
250399
}
251400
}

0 commit comments

Comments
 (0)