Skip to content
This repository was archived by the owner on Dec 30, 2020. It is now read-only.

Commit 9dfef1a

Browse files
authored
Merge pull request #110 from sylabs/support-job-array
Support job array
2 parents e8f8df9 + 8a6027d commit 9dfef1a

File tree

7 files changed

+295
-145
lines changed

7 files changed

+295
-145
lines changed

cmd/job-companion/main.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,30 +95,49 @@ func runBatch(c api.WorkloadManagerClient, batch string, cOps *collectOptions) e
9595
if err != nil {
9696
return err
9797
}
98-
info := infoResp.Info
98+
info := infoResp.Info[0] // response always contains at leas one element
9999

100100
log.Printf("JobID: %d", jobID)
101101

102102
ctx, cancelTailLogs := context.WithCancel(context.Background())
103-
tailLogsDone := tailLogs(ctx, c, info.StdOut)
103+
var tailLogsDone chan struct{}
104+
// we are not tailing logs for JobArrays
105+
// since there is no a correct solution how we can print multiple parallel running job outputs
106+
// without affecting each other
107+
if info.ArrayId == "" {
108+
tailLogsDone = tailLogs(ctx, c, info.StdOut)
109+
}
110+
111+
stopLogs := func() {
112+
cancelTailLogs()
113+
114+
// if tail logs done is nil that means that job is a JobArray
115+
// and we are not tailing logs for such jobs
116+
if tailLogsDone == nil {
117+
return
118+
}
119+
120+
<-tailLogsDone // need to wail till all logs will be printed, not to ruin formatting
121+
}
104122

105123
for {
106124
time.Sleep(1 * time.Second)
107125

108126
infoResp, err = c.JobInfo(context.Background(), &api.JobInfoRequest{JobId: jobID})
109127
if err != nil {
110-
cancelTailLogs()
128+
// waits till logs read reaches EOF
129+
stopLogs()
111130
return err
112131
}
113-
info = infoResp.Info
132+
info = infoResp.Info[0]
114133

115134
state := info.Status
116135
if state == api.JobStatus_COMPLETED ||
117136
state == api.JobStatus_FAILED ||
118137
state == api.JobStatus_CANCELLED {
119138

120-
cancelTailLogs()
121-
<-tailLogsDone // need to wail till all logs will be printed, not to ruin formatting
139+
// waits till logs read reaches EOF
140+
stopLogs()
122141

123142
switch state {
124143
case api.JobStatus_FAILED:

internal/red-box/api/slurm.go

Lines changed: 55 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ func (a *Slurm) JobInfo(ctx context.Context, r *api.JobInfoRequest) (*api.JobInf
7575
return nil, errors.Wrap(err, "can't convert slurm info into proto info")
7676
}
7777

78+
if len(pInfo) == 0 {
79+
return nil, errors.New("job info slice is empty, probably invalid scontrol output")
80+
}
81+
7882
return &api.JobInfoResponse{Info: pInfo}, nil
7983
}
8084

@@ -223,60 +227,65 @@ func mapSStepsToProtoSteps(ss []*slurm.JobStepInfo) ([]*api.JobStepInfo, error)
223227
return pSteps, nil
224228
}
225229

226-
func mapSInfoToProtoInfo(si *slurm.JobInfo) (*api.JobInfo, error) {
227-
var submitTime *timestamp.Timestamp
228-
if si.SubmitTime != nil {
229-
pt, err := ptypes.TimestampProto(*si.SubmitTime)
230-
if err != nil {
231-
return nil, errors.Wrap(err, "can't convert submit go time to proto time")
230+
func mapSInfoToProtoInfo(si []*slurm.JobInfo) ([]*api.JobInfo, error) {
231+
pInfs := make([]*api.JobInfo, len(si))
232+
for i, inf := range si {
233+
var submitTime *timestamp.Timestamp
234+
if inf.SubmitTime != nil {
235+
pt, err := ptypes.TimestampProto(*inf.SubmitTime)
236+
if err != nil {
237+
return nil, errors.Wrap(err, "can't convert submit go time to proto time")
238+
}
239+
240+
submitTime = pt
232241
}
233242

234-
submitTime = pt
235-
}
243+
var startTime *timestamp.Timestamp
244+
if inf.StartTime != nil {
245+
pt, err := ptypes.TimestampProto(*inf.StartTime)
246+
if err != nil {
247+
return nil, errors.Wrap(err, "can't convert start go time to proto time")
248+
}
236249

237-
var startTime *timestamp.Timestamp
238-
if si.StartTime != nil {
239-
pt, err := ptypes.TimestampProto(*si.StartTime)
240-
if err != nil {
241-
return nil, errors.Wrap(err, "can't convert start go time to proto time")
250+
startTime = pt
242251
}
243252

244-
startTime = pt
245-
}
246-
247-
var runTime *duration.Duration
248-
if si.RunTime != nil {
249-
runTime = ptypes.DurationProto(*si.RunTime)
250-
}
253+
var runTime *duration.Duration
254+
if inf.RunTime != nil {
255+
runTime = ptypes.DurationProto(*inf.RunTime)
256+
}
251257

252-
var timeLimit *duration.Duration
253-
if si.TimeLimit != nil {
254-
timeLimit = ptypes.DurationProto(*si.TimeLimit)
255-
}
258+
var timeLimit *duration.Duration
259+
if inf.TimeLimit != nil {
260+
timeLimit = ptypes.DurationProto(*inf.TimeLimit)
261+
}
256262

257-
status, ok := api.JobStatus_value[si.State]
258-
if !ok {
259-
status = int32(api.JobStatus_UNKNOWN)
260-
}
263+
status, ok := api.JobStatus_value[inf.State]
264+
if !ok {
265+
status = int32(api.JobStatus_UNKNOWN)
266+
}
261267

262-
pi := api.JobInfo{
263-
Id: si.ID,
264-
UserId: si.UserID,
265-
Name: si.Name,
266-
ExitCode: si.ExitCode,
267-
Status: api.JobStatus(status),
268-
SubmitTime: submitTime,
269-
StartTime: startTime,
270-
RunTime: runTime,
271-
TimeLimit: timeLimit,
272-
WorkingDir: si.WorkDir,
273-
StdOut: si.StdOut,
274-
StdErr: si.StdErr,
275-
Partition: si.Partition,
276-
NodeList: si.NodeList,
277-
BatchHost: si.BatchHost,
278-
NumNodes: si.NumNodes,
268+
pi := api.JobInfo{
269+
Id: inf.ID,
270+
UserId: inf.UserID,
271+
Name: inf.Name,
272+
ExitCode: inf.ExitCode,
273+
Status: api.JobStatus(status),
274+
SubmitTime: submitTime,
275+
StartTime: startTime,
276+
RunTime: runTime,
277+
TimeLimit: timeLimit,
278+
WorkingDir: inf.WorkDir,
279+
StdOut: inf.StdOut,
280+
StdErr: inf.StdErr,
281+
Partition: inf.Partition,
282+
NodeList: inf.NodeList,
283+
BatchHost: inf.BatchHost,
284+
NumNodes: inf.NumNodes,
285+
ArrayId: inf.ArrayJobID,
286+
}
287+
pInfs[i] = &pi
279288
}
280289

281-
return &pi, nil
290+
return pInfs, nil
282291
}

internal/red-box/api/slurm_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ func Test_mapSInfoToProtoInfo(t *testing.T) {
4141
NodeList: "node1",
4242
BatchHost: "host1",
4343
NumNodes: "2",
44+
ArrayJobID: "111",
4445
}
45-
pi, err := mapSInfoToProtoInfo(&testInfo)
46+
pinfs, err := mapSInfoToProtoInfo([]*slurm.JobInfo{&testInfo})
4647
require.NoError(t, err)
48+
require.Len(t, pinfs, 1)
49+
pi := pinfs[0]
4750

4851
require.EqualValues(t, testInfo.ID, pi.Id)
4952
require.EqualValues(t, testInfo.UserID, pi.UserId)
@@ -63,6 +66,7 @@ func Test_mapSInfoToProtoInfo(t *testing.T) {
6366
require.EqualValues(t, testInfo.NodeList, pi.NodeList)
6467
require.EqualValues(t, testInfo.BatchHost, pi.BatchHost)
6568
require.EqualValues(t, testInfo.NumNodes, pi.NumNodes)
69+
require.EqualValues(t, testInfo.ArrayJobID, pi.ArrayId)
6670
}
6771

6872
func Test_mapSStepsToProtoSteps(t *testing.T) {

pkg/slurm/slurm.go

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@ func NewClient() (*Client, error) {
6565
return &Client{}, nil
6666
}
6767

68-
// JobInfo contains information about a single Slurm job.
68+
// JobInfo contains information about a Slurm job.
6969
type JobInfo struct {
7070
ID string `json:"id" slurm:"JobId"`
7171
UserID string `json:"user_id" slurm:"UserId"`
72+
ArrayJobID string `json:"array_job_id" slurm:"ArrayJobId"`
7273
Name string `json:"name" slurm:"JobName"`
7374
ExitCode string `json:"exit_code" slurm:"ExitCode"`
7475
State string `json:"state" slurm:"JobState"`
@@ -145,7 +146,7 @@ func (*Client) Tail(path string) (io.ReadCloser, error) {
145146
return tr, nil
146147
}
147148

148-
func (*Client) SJobInfo(jobID int64) (*JobInfo, error) {
149+
func (*Client) SJobInfo(jobID int64) ([]*JobInfo, error) {
149150
cmd := exec.Command(scontrolBinaryName, "show", "jobid", strconv.FormatInt(jobID, 10))
150151

151152
out, err := cmd.Output()
@@ -188,24 +189,32 @@ func (*Client) SJobSteps(jobID int64) ([]*JobStepInfo, error) {
188189
return jInfo, nil
189190
}
190191

191-
func JobInfoFromScontrolResponse(r string) (*JobInfo, error) {
192-
rFields := strings.Fields(r)
193-
slurmFields := make(map[string]string)
194-
for _, f := range rFields {
195-
s := strings.Split(f, "=")
196-
if len(s) != 2 {
197-
// just skipping empty fields
198-
continue
192+
func JobInfoFromScontrolResponse(r string) ([]*JobInfo, error) {
193+
r = strings.TrimSpace(r)
194+
rawInfos := strings.Split(r, "\n\n")
195+
196+
infos := make([]*JobInfo, len(rawInfos))
197+
198+
for i, raw := range rawInfos {
199+
rFields := strings.Fields(raw)
200+
slurmFields := make(map[string]string)
201+
for _, f := range rFields {
202+
s := strings.Split(f, "=")
203+
if len(s) != 2 {
204+
// just skipping empty fields
205+
continue
206+
}
207+
slurmFields[s[0]] = s[1]
199208
}
200-
slurmFields[s[0]] = s[1]
201-
}
202209

203-
ji := &JobInfo{}
204-
if err := ji.fillFromSlurmFields(slurmFields); err != nil {
205-
return nil, err
210+
ji := &JobInfo{}
211+
if err := ji.fillFromSlurmFields(slurmFields); err != nil {
212+
return nil, err
213+
}
214+
infos[i] = ji
206215
}
207216

208-
return ji, nil
217+
return infos, nil
209218
}
210219

211220
// ParseSacctResponse is a helper that parses sacct output and

0 commit comments

Comments
 (0)