Skip to content

Commit 75a8f09

Browse files
committed
refactor(collector): 🔧 Centralize command execution and add timeout
- Centralizes all external command executions into a new `collector.Execute` function. This function handles logging, errors, and context-based timeouts. - Replaces all `exec.Command` calls in the collectors with the new centralized function, removing hardcoded binary paths and fixing latent bugs in argument passing. - Makes the execution logic mockable for improved testing. - Adds a `--command.timeout` flag (default 5s) to prevent the exporter from hanging on unresponsive Slurm commands. - Adds a `--log.level` flag to control logging verbosity. - Cleans up verbose debug logging for successful commands.
1 parent be8705c commit 75a8f09

File tree

14 files changed

+97
-227
lines changed

14 files changed

+97
-227
lines changed

collector/accounts.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */
1616
package collector
1717

1818
import (
19-
"io"
20-
"os/exec"
2119
"regexp"
2220
"strconv"
2321
"strings"
@@ -28,22 +26,7 @@ import (
2826
)
2927

3028
func AccountsData(logger log.Logger) ([]byte, error) {
31-
cmd := exec.Command("squeue", "-a", "-r", "-h", "-o %A|%a|%T|%C")
32-
stdout, err := cmd.StdoutPipe()
33-
if err != nil {
34-
level.Error(logger).Log("msg", "Failed to create stdout pipe", "err", err)
35-
return nil, err
36-
}
37-
if err := cmd.Start(); err != nil {
38-
level.Error(logger).Log("msg", "Failed to start command", "err", err)
39-
return nil, err
40-
}
41-
out, _ := io.ReadAll(stdout)
42-
if err := cmd.Wait(); err != nil {
43-
level.Error(logger).Log("msg", "Failed to wait for command", "err", err)
44-
return nil, err
45-
}
46-
return out, nil
29+
return Execute(logger, "squeue", []string{"-a", "-r", "-h", "-o", "%A|%a|%T|%C"})
4730
}
4831

4932
type JobMetrics struct {

collector/cpus.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */
1616
package collector
1717

1818
import (
19-
"io"
20-
"os/exec"
2119
"strconv"
2220
"strings"
2321

@@ -55,22 +53,7 @@ func ParseCPUsMetrics(input []byte) *CPUsMetrics {
5553

5654
// Execute the sinfo command and return its output
5755
func CPUsData(logger log.Logger) ([]byte, error) {
58-
cmd := exec.Command("sinfo", "-h", "-o %C")
59-
stdout, err := cmd.StdoutPipe()
60-
if err != nil {
61-
level.Error(logger).Log("msg", "Failed to create stdout pipe", "err", err)
62-
return nil, err
63-
}
64-
if err := cmd.Start(); err != nil {
65-
level.Error(logger).Log("msg", "Failed to start command", "err", err)
66-
return nil, err
67-
}
68-
out, _ := io.ReadAll(stdout)
69-
if err := cmd.Wait(); err != nil {
70-
level.Error(logger).Log("msg", "Failed to wait for command", "err", err)
71-
return nil, err
72-
}
73-
return out, nil
56+
return Execute(logger, "sinfo", []string{"-h", "-o", "%C"})
7457
}
7558

7659
/*

collector/execute.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package collector
2+
3+
import (
4+
"context"
5+
"os/exec"
6+
"strings"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
)
12+
13+
var (
14+
commandTimeout time.Duration
15+
)
16+
17+
// SetCommandTimeout sets the timeout for external commands.
18+
func SetCommandTimeout(t time.Duration) {
19+
commandTimeout = t
20+
}
21+
22+
// Execute is a wrapper around exec.CommandContext to provide logging and a timeout.
23+
var Execute = func(logger log.Logger, command string, args []string) ([]byte, error) {
24+
level.Debug(logger).Log("msg", "Executing command", "command", command, "args", strings.Join(args, " "))
25+
26+
ctx, cancel := context.WithTimeout(context.Background(), commandTimeout)
27+
defer cancel()
28+
29+
cmd := exec.CommandContext(ctx, command, args...)
30+
out, err := cmd.CombinedOutput()
31+
if err != nil {
32+
// Check if the error is due to the context deadline exceeding.
33+
if ctx.Err() == context.DeadlineExceeded {
34+
level.Error(logger).Log("msg", "Command timed out", "command", command, "args", strings.Join(args, " "), "timeout", commandTimeout)
35+
return nil, ctx.Err()
36+
}
37+
level.Error(logger).Log("msg", "Failed to execute command", "command", command, "args", strings.Join(args, " "), "output", string(out), "err", err)
38+
return nil, err
39+
}
40+
41+
level.Debug(logger).Log("msg", "Command executed successfully", "command", command)
42+
return out, nil
43+
}

collector/gpus.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package collector
22

33
import (
4-
"os/exec"
54
"regexp"
65
"strconv"
76
"strings"
@@ -211,33 +210,21 @@ func ParseGPUsMetrics(logger log.Logger) (*GPUsMetrics, error) {
211210
return &gm, nil
212211
}
213212

214-
var executeCommand = Execute
215-
216213
func AllocatedGPUsData(logger log.Logger) ([]byte, error) {
217214
args := []string{"-a", "-h", "--Format=Nodes: ,GresUsed:", "--state=allocated"}
218-
return executeCommand(logger, "sinfo", args)
215+
return Execute(logger, "sinfo", args)
219216
}
220217

221218
func IdleGPUsData(logger log.Logger) ([]byte, error) {
222219
args := []string{"-a", "-h", "--Format=Nodes: ,Gres: ,GresUsed:", "--state=idle,allocated"}
223-
return executeCommand(logger, "sinfo", args)
220+
return Execute(logger, "sinfo", args)
224221
}
225222

226223
func TotalGPUsData(logger log.Logger) ([]byte, error) {
227224
args := []string{"-a", "-h", "--Format=Nodes: ,Gres:"}
228-
return executeCommand(logger, "sinfo", args)
225+
return Execute(logger, "sinfo", args)
229226
}
230227

231-
// Execute the sinfo command and return its output
232-
func Execute(logger log.Logger, command string, arguments []string) ([]byte, error) {
233-
cmd := exec.Command(command, arguments...)
234-
out, err := cmd.CombinedOutput()
235-
if err != nil {
236-
level.Error(logger).Log("msg", "Failed to execute command", "command", command, "args", strings.Join(arguments, " "), "err", err)
237-
return nil, err
238-
}
239-
return out, nil
240-
}
241228

242229
/*
243230
* Implement the Prometheus Collector interface and feed the

collector/gpus_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ func TestGPUsMetrics(t *testing.T) {
6161
}
6262

6363
func TestGPUsGetMetrics(t *testing.T) {
64-
oldExecuteCommand := executeCommand
65-
defer func() { executeCommand = oldExecuteCommand }()
64+
oldExecute := Execute
65+
defer func() { Execute = oldExecute }()
6666

6767
test_data_paths, _ := filepath.Glob("../test_data/slurm-*")
6868
for _, test_data_path := range test_data_paths {
6969
slurm_version := strings.TrimPrefix(test_data_path, "../test_data/slurm-")
7070
t.Run(slurm_version, func(t *testing.T) {
71-
executeCommand = func(logger log.Logger, command string, arguments []string) ([]byte, error) {
71+
Execute = func(logger log.Logger, command string, arguments []string) ([]byte, error) {
7272
var file string
7373
if strings.Contains(arguments[2], "GresUsed:") && strings.Contains(arguments[2], "Gres:") {
7474
file = filepath.Join(test_data_path, "sinfo_gpus_idle.txt")

collector/node.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package collector
22

33
import (
4-
"os/exec"
54
"sort"
65
"strconv"
76
"strings"
@@ -81,13 +80,8 @@ func ParseNodeMetrics(input []byte) map[string]*NodeMetrics {
8180
// NodeData executes the sinfo command to get data for each node
8281
// It returns the output of the sinfo command
8382
func NodeData(logger log.Logger) ([]byte, error) {
84-
cmd := exec.Command("sinfo", "-h", "-N", "-O", "NodeList,AllocMem,Memory,CPUsState,StateLong,Partition")
85-
out, err := cmd.Output()
86-
if err != nil {
87-
level.Error(logger).Log("msg", "Failed to execute sinfo command", "err", err)
88-
return nil, err
89-
}
90-
return out, nil
83+
args := []string{"-h", "-N", "-O", "NodeList,AllocMem,Memory,CPUsState,StateLong,Partition"}
84+
return Execute(logger, "sinfo", args)
9185
}
9286

9387
type NodeCollector struct {

collector/nodes.go

Lines changed: 21 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package collector
22

33
import (
4-
"io"
5-
"os/exec"
64
"regexp"
75
"sort"
86
"strconv"
@@ -135,69 +133,41 @@ func ParseNodesMetrics(input []byte) *NodesMetrics {
135133

136134
// Execute the sinfo command and return its output
137135
func NodesData(logger log.Logger, part string) ([]byte, error) {
138-
cmd := exec.Command("sinfo", "-h", "-o %D|%T|%b", "-p", part, "| sort", "| uniq")
139-
stdout, err := cmd.StdoutPipe()
140-
if err != nil {
141-
level.Error(logger).Log("msg", "Failed to create stdout pipe", "err", err)
142-
return nil, err
143-
}
144-
if err := cmd.Start(); err != nil {
145-
level.Error(logger).Log("msg", "Failed to start command", "err", err)
146-
return nil, err
147-
}
148-
out, _ := io.ReadAll(stdout)
149-
if err := cmd.Wait(); err != nil {
150-
level.Error(logger).Log("msg", "Failed to wait for command", "err", err)
151-
return nil, err
152-
}
153-
return out, nil
136+
return Execute(logger, "sinfo", []string{"-h", "-o", "%D|%T|%b", "-p", part})
154137
}
155138

156139
func SlurmGetTotal(logger log.Logger) (float64, error) {
157-
cmd := exec.Command("bash", "-c", "scontrol show nodes -o | grep -c NodeName=[a-z]*[0-9]*")
158-
stdout, err := cmd.StdoutPipe()
159-
if err != nil {
160-
level.Error(logger).Log("msg", "Failed to create stdout pipe", "err", err)
161-
return 0, err
162-
}
163-
stderr, err := cmd.StderrPipe()
140+
out, err := Execute(logger, "scontrol", []string{"show", "nodes", "-o"})
164141
if err != nil {
165-
level.Error(logger).Log("msg", "Failed to create stderr pipe", "err", err)
166142
return 0, err
167143
}
168-
if err := cmd.Start(); err != nil {
169-
level.Error(logger).Log("msg", "Failed to start command", "err", err)
170-
return 0, err
171-
}
172-
out, _ := io.ReadAll(stdout)
173-
err_out, _ := io.ReadAll(stderr)
174-
if err := cmd.Wait(); err != nil {
175-
level.Error(logger).Log("msg", "Failed to wait for command", "err", err, "stdout", string(out), "stderr", string(err_out))
176-
return 0, err
144+
// Filter out empty lines before counting
145+
lines := strings.Split(string(out), "\n")
146+
count := 0
147+
for _, line := range lines {
148+
if strings.TrimSpace(line) != "" {
149+
count++
150+
}
177151
}
178-
data := strings.Split(string(out), "\n")
179-
total, _ := strconv.ParseFloat(data[0], 64)
180-
return total, nil
152+
return float64(count), nil
181153
}
182154

183155
func SlurmGetPartitions(logger log.Logger) ([]string, error) {
184-
cmd := exec.Command("sinfo", "-h", "-o %R", "| sort", "| uniq")
185-
stdout, err := cmd.StdoutPipe()
156+
out, err := Execute(logger, "sinfo", []string{"-h", "-o", "%R"})
186157
if err != nil {
187-
level.Error(logger).Log("msg", "Failed to create stdout pipe", "err", err)
188-
return nil, err
189-
}
190-
if err := cmd.Start(); err != nil {
191-
level.Error(logger).Log("msg", "Failed to start command", "err", err)
192-
return nil, err
193-
}
194-
out, _ := io.ReadAll(stdout)
195-
if err := cmd.Wait(); err != nil {
196-
level.Error(logger).Log("msg", "Failed to wait for command", "err", err)
197158
return nil, err
198159
}
199160
partitions := strings.Split(string(out), "\n")
200-
return partitions, nil
161+
// Trim whitespace and remove empty strings
162+
var cleanedPartitions []string
163+
for _, p := range partitions {
164+
p = strings.TrimSpace(p)
165+
if p != "" {
166+
cleanedPartitions = append(cleanedPartitions, p)
167+
}
168+
}
169+
sort.Strings(cleanedPartitions)
170+
return RemoveDuplicates(cleanedPartitions), nil
201171
}
202172

203173
/*

collector/partitions.go

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package collector
22

33
import (
4-
"io"
5-
"os/exec"
64
"strconv"
75
"strings"
86

@@ -12,41 +10,11 @@ import (
1210
)
1311

1412
func PartitionsData(logger log.Logger) ([]byte, error) {
15-
cmd := exec.Command("sinfo", "-h", "-o%R,%C")
16-
stdout, err := cmd.StdoutPipe()
17-
if err != nil {
18-
level.Error(logger).Log("msg", "Failed to create stdout pipe", "err", err)
19-
return nil, err
20-
}
21-
if err := cmd.Start(); err != nil {
22-
level.Error(logger).Log("msg", "Failed to start command", "err", err)
23-
return nil, err
24-
}
25-
out, _ := io.ReadAll(stdout)
26-
if err := cmd.Wait(); err != nil {
27-
level.Error(logger).Log("msg", "Failed to wait for command", "err", err)
28-
return nil, err
29-
}
30-
return out, nil
13+
return Execute(logger, "sinfo", []string{"-h", "-o", "%R,%C"})
3114
}
3215

3316
func PartitionsPendingJobsData(logger log.Logger) ([]byte, error) {
34-
cmd := exec.Command("squeue", "-a", "-r", "-h", "-o%P", "--states=PENDING")
35-
stdout, err := cmd.StdoutPipe()
36-
if err != nil {
37-
level.Error(logger).Log("msg", "Failed to create stdout pipe", "err", err)
38-
return nil, err
39-
}
40-
if err := cmd.Start(); err != nil {
41-
level.Error(logger).Log("msg", "Failed to start command", "err", err)
42-
return nil, err
43-
}
44-
out, _ := io.ReadAll(stdout)
45-
if err := cmd.Wait(); err != nil {
46-
level.Error(logger).Log("msg", "Failed to wait for command", "err", err)
47-
return nil, err
48-
}
49-
return out, nil
17+
return Execute(logger, "squeue", []string{"-a", "-r", "-h", "-o", "%P", "--states=PENDING"})
5018
}
5119

5220
type PartitionMetrics struct {

collector/queue.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package collector
22

33
import (
4-
"io"
5-
"os/exec"
64
"strconv"
75
"strings"
86

@@ -150,22 +148,7 @@ func ParseQueueMetrics(input []byte) *QueueMetrics {
150148

151149
// Execute the squeue command and return its output
152150
func QueueData(logger log.Logger) ([]byte, error) {
153-
cmd := exec.Command("/usr/bin/squeue", "-h", "-o %P,%T,%C,%r,%u")
154-
stdout, err := cmd.StdoutPipe()
155-
if err != nil {
156-
level.Error(logger).Log("msg", "Failed to create stdout pipe", "err", err)
157-
return nil, err
158-
}
159-
if err := cmd.Start(); err != nil {
160-
level.Error(logger).Log("msg", "Failed to start command", "err", err)
161-
return nil, err
162-
}
163-
out, _ := io.ReadAll(stdout)
164-
if err := cmd.Wait(); err != nil {
165-
level.Error(logger).Log("msg", "Failed to wait for command", "err", err)
166-
return nil, err
167-
}
168-
return out, nil
151+
return Execute(logger, "squeue", []string{"-h", "-o", "%P,%T,%C,%r,%u"})
169152
}
170153

171154
/*

0 commit comments

Comments
 (0)