Skip to content

Commit 1e51de1

Browse files
committed
job-list: move job data into its own files
Problem: `struct job` in job_state.h contains `struct list_ctx` from job-list.h. `struct list_ctx` in job-list.h contains `struct job_state_ctx` from `job_state.h`. This circular dependency isn't a problem at the moment, but it can become an issue in the future when future services are added. In addition, job_state.[ch] files are getting long. Solution: Move struct job and job create/destroy functions into their own files. Also move jobspec parsing and R parsing into the new file.
1 parent 7923de5 commit 1e51de1

File tree

11 files changed

+469
-401
lines changed

11 files changed

+469
-401
lines changed

src/modules/job-list/Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ job_list_la_SOURCES = \
2020
job-list.h \
2121
job_state.h \
2222
job_state.c \
23+
job_data.h \
24+
job_data.c \
2325
list.h \
2426
list.c \
2527
job_util.h \

src/modules/job-list/job-list.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "job-list.h"
1919
#include "job_state.h"
20+
#include "job_data.h"
2021
#include "list.h"
2122
#include "idsync.h"
2223

src/modules/job-list/job_data.c

Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
/************************************************************\
2+
* Copyright 2022 Lawrence Livermore National Security, LLC
3+
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
*
5+
* This file is part of the Flux resource manager framework.
6+
* For details, see https://github.com/flux-framework.
7+
*
8+
* SPDX-License-Identifier: LGPL-3.0
9+
\************************************************************/
10+
11+
/* job_data.c - primary struct job helper functions */
12+
13+
#if HAVE_CONFIG_H
14+
#include "config.h"
15+
#endif
16+
#include <jansson.h>
17+
#include <flux/core.h>
18+
#include <assert.h>
19+
20+
#include "src/common/libczmqcontainers/czmq_containers.h"
21+
#include "src/common/librlist/rlist.h"
22+
23+
#include "job_data.h"
24+
25+
void job_destroy (void *data)
26+
{
27+
struct job *job = data;
28+
if (job) {
29+
free (job->ranks);
30+
free (job->nodelist);
31+
json_decref (job->annotations);
32+
grudgeset_destroy (job->dependencies);
33+
json_decref (job->jobspec);
34+
json_decref (job->R);
35+
json_decref (job->exception_context);
36+
zlist_destroy (&job->next_states);
37+
free (job);
38+
}
39+
}
40+
41+
struct job *job_create (struct list_ctx *ctx, flux_jobid_t id)
42+
{
43+
struct job *job = NULL;
44+
45+
if (!(job = calloc (1, sizeof (*job))))
46+
return NULL;
47+
job->h = ctx->h;
48+
job->ctx = ctx;
49+
job->id = id;
50+
job->userid = FLUX_USERID_UNKNOWN;
51+
job->urgency = -1;
52+
/* pending jobs that are not yet assigned a priority shall be
53+
* listed after those who do, so we set the job priority to MIN */
54+
job->priority = FLUX_JOB_PRIORITY_MIN;
55+
job->state = FLUX_JOB_STATE_NEW;
56+
job->ntasks = -1;
57+
job->nnodes = -1;
58+
job->expiration = -1.0;
59+
job->wait_status = -1;
60+
job->result = FLUX_JOB_RESULT_FAILED;
61+
62+
if (!(job->next_states = zlist_new ())) {
63+
errno = ENOMEM;
64+
job_destroy (job);
65+
return NULL;
66+
}
67+
68+
job->states_mask = FLUX_JOB_STATE_NEW;
69+
job->states_events_mask = FLUX_JOB_STATE_NEW;
70+
job->eventlog_seq = -1;
71+
return job;
72+
}
73+
74+
struct res_level {
75+
const char *type;
76+
int count;
77+
json_t *with;
78+
};
79+
80+
static int parse_res_level (struct job *job,
81+
json_t *o,
82+
struct res_level *resp)
83+
{
84+
json_error_t error;
85+
struct res_level res;
86+
87+
res.with = NULL;
88+
/* For jobspec version 1, expect exactly one array element per level.
89+
*/
90+
if (json_unpack_ex (o, &error, 0,
91+
"[{s:s s:i s?o}]",
92+
"type", &res.type,
93+
"count", &res.count,
94+
"with", &res.with) < 0) {
95+
flux_log (job->h, LOG_ERR,
96+
"%s: job %ju invalid jobspec: %s",
97+
__FUNCTION__, (uintmax_t)job->id, error.text);
98+
return -1;
99+
}
100+
*resp = res;
101+
return 0;
102+
}
103+
104+
/* Return basename of path if there is a '/' in path. Otherwise return
105+
* full path */
106+
static const char *parse_job_name (const char *path)
107+
{
108+
char *p = strrchr (path, '/');
109+
if (p) {
110+
p++;
111+
/* user mistake, specified a directory with trailing '/',
112+
* return full path */
113+
if (*p == '\0')
114+
return path;
115+
return p;
116+
}
117+
return path;
118+
}
119+
120+
int job_parse_jobspec (struct job *job, const char *s)
121+
{
122+
json_error_t error;
123+
json_t *jobspec_job = NULL;
124+
json_t *command = NULL;
125+
json_t *tasks, *resources;
126+
struct res_level res[3];
127+
int rc = -1;
128+
129+
if (!(job->jobspec = json_loads (s, 0, &error))) {
130+
flux_log (job->h, LOG_ERR,
131+
"%s: job %ju invalid jobspec: %s",
132+
__FUNCTION__, (uintmax_t)job->id, error.text);
133+
goto error;
134+
}
135+
136+
if (json_unpack_ex (job->jobspec, &error, 0,
137+
"{s:{s:{s?:o}}}",
138+
"attributes",
139+
"system",
140+
"job",
141+
&jobspec_job) < 0) {
142+
flux_log (job->h, LOG_ERR,
143+
"%s: job %ju invalid jobspec: %s",
144+
__FUNCTION__, (uintmax_t)job->id, error.text);
145+
goto nonfatal_error;
146+
}
147+
148+
if (jobspec_job) {
149+
if (!json_is_object (jobspec_job)) {
150+
flux_log (job->h, LOG_ERR,
151+
"%s: job %ju invalid jobspec",
152+
__FUNCTION__, (uintmax_t)job->id);
153+
goto nonfatal_error;
154+
}
155+
}
156+
157+
if (json_unpack_ex (job->jobspec, &error, 0,
158+
"{s:o}",
159+
"tasks", &tasks) < 0) {
160+
flux_log (job->h, LOG_ERR,
161+
"%s: job %ju invalid jobspec: %s",
162+
__FUNCTION__, (uintmax_t)job->id, error.text);
163+
goto nonfatal_error;
164+
}
165+
if (json_unpack_ex (tasks, &error, 0,
166+
"[{s:o}]",
167+
"command", &command) < 0) {
168+
flux_log (job->h, LOG_ERR,
169+
"%s: job %ju invalid jobspec: %s",
170+
__FUNCTION__, (uintmax_t)job->id, error.text);
171+
goto nonfatal_error;
172+
}
173+
174+
if (!json_is_array (command)) {
175+
flux_log (job->h, LOG_ERR,
176+
"%s: job %ju invalid jobspec",
177+
__FUNCTION__, (uintmax_t)job->id);
178+
goto nonfatal_error;
179+
}
180+
181+
if (jobspec_job) {
182+
if (json_unpack_ex (jobspec_job, &error, 0,
183+
"{s?:s}",
184+
"name", &job->name) < 0) {
185+
flux_log (job->h, LOG_ERR,
186+
"%s: job %ju invalid job dictionary: %s",
187+
__FUNCTION__, (uintmax_t)job->id, error.text);
188+
goto nonfatal_error;
189+
}
190+
}
191+
192+
/* If user did not specify job.name, we treat arg 0 of the command
193+
* as the job name */
194+
if (!job->name) {
195+
json_t *arg0 = json_array_get (command, 0);
196+
if (!arg0 || !json_is_string (arg0)) {
197+
flux_log (job->h, LOG_ERR,
198+
"%s: job %ju invalid job command",
199+
__FUNCTION__, (uintmax_t)job->id);
200+
goto nonfatal_error;
201+
}
202+
job->name = parse_job_name (json_string_value (arg0));
203+
assert (job->name);
204+
}
205+
206+
if (json_unpack_ex (job->jobspec, &error, 0,
207+
"{s:o}",
208+
"resources", &resources) < 0) {
209+
flux_log (job->h, LOG_ERR,
210+
"%s: job %ju invalid jobspec: %s",
211+
__FUNCTION__, (uintmax_t)job->id, error.text);
212+
goto nonfatal_error;
213+
}
214+
215+
/* For jobspec version 1, expect either:
216+
* - node->slot->core->NIL
217+
* - slot->core->NIL
218+
*/
219+
memset (res, 0, sizeof (res));
220+
if (parse_res_level (job, resources, &res[0]) < 0)
221+
goto nonfatal_error;
222+
if (res[0].with && parse_res_level (job, res[0].with, &res[1]) < 0)
223+
goto nonfatal_error;
224+
if (res[1].with && parse_res_level (job, res[1].with, &res[2]) < 0)
225+
goto nonfatal_error;
226+
227+
/* Set job->nnodes if available. In jobspec version 1, only if
228+
* resources listed as node->slot->core->NIL
229+
*/
230+
if (res[0].type != NULL && !strcmp (res[0].type, "node")
231+
&& res[1].type != NULL && !strcmp (res[1].type, "slot")
232+
&& res[2].type != NULL && !strcmp (res[2].type, "core")
233+
&& res[2].with == NULL)
234+
job->nnodes = res[0].count;
235+
236+
/* Set job->ntasks
237+
*/
238+
if (json_unpack_ex (tasks, NULL, 0,
239+
"[{s:{s:i}}]",
240+
"count", "total", &job->ntasks) < 0) {
241+
int per_slot, slot_count = 0;
242+
243+
if (json_unpack_ex (tasks, &error, 0,
244+
"[{s:{s:i}}]",
245+
"count", "per_slot", &per_slot) < 0) {
246+
flux_log (job->h, LOG_ERR,
247+
"%s: job %ju invalid jobspec: %s",
248+
__FUNCTION__, (uintmax_t)job->id, error.text);
249+
goto nonfatal_error;
250+
}
251+
if (per_slot != 1) {
252+
flux_log (job->h, LOG_ERR,
253+
"%s: job %ju: per_slot count: expected 1 got %d",
254+
__FUNCTION__, (uintmax_t)job->id, per_slot);
255+
goto nonfatal_error;
256+
}
257+
if (res[0].type != NULL && !strcmp (res[0].type, "slot")
258+
&& res[1].type != NULL && !strcmp (res[1].type, "core")
259+
&& res[1].with == NULL) {
260+
slot_count = res[0].count;
261+
}
262+
else if (res[0].type != NULL && !strcmp (res[0].type, "node")
263+
&& res[1].type != NULL && !strcmp (res[1].type, "slot")
264+
&& res[2].type != NULL && !strcmp (res[2].type, "core")
265+
&& res[2].with == NULL) {
266+
slot_count = res[0].count * res[1].count;
267+
}
268+
else {
269+
flux_log (job->h, LOG_WARNING,
270+
"%s: job %ju: Unexpected resources: %s->%s->%s%s",
271+
__FUNCTION__,
272+
(uintmax_t)job->id,
273+
res[0].type ? res[0].type : "NULL",
274+
res[1].type ? res[1].type : "NULL",
275+
res[2].type ? res[2].type : "NULL",
276+
res[2].with ? "->..." : NULL);
277+
slot_count = -1;
278+
}
279+
job->ntasks = slot_count;
280+
}
281+
282+
/* nonfatal error - jobspec illegal, but we'll continue on. job
283+
* listing will return whatever data is available */
284+
nonfatal_error:
285+
rc = 0;
286+
error:
287+
return rc;
288+
}
289+
290+
int job_parse_R (struct job *job, const char *s)
291+
{
292+
struct rlist *rl = NULL;
293+
struct idset *idset = NULL;
294+
struct hostlist *hl = NULL;
295+
json_error_t error;
296+
int flags = IDSET_FLAG_BRACKETS | IDSET_FLAG_RANGE;
297+
int saved_errno, rc = -1;
298+
299+
if (!(job->R = json_loads (s, 0, &error))) {
300+
flux_log (job->h, LOG_ERR,
301+
"%s: job %ju invalid R: %s",
302+
__FUNCTION__, (uintmax_t)job->id, error.text);
303+
goto nonfatal_error;
304+
}
305+
306+
if (!(rl = rlist_from_json (job->R, &error))) {
307+
flux_log_error (job->h, "rlist_from_json: %s", error.text);
308+
goto nonfatal_error;
309+
}
310+
311+
job->expiration = rl->expiration;
312+
313+
if (!(idset = rlist_ranks (rl)))
314+
goto nonfatal_error;
315+
316+
job->nnodes = idset_count (idset);
317+
if (!(job->ranks = idset_encode (idset, flags)))
318+
goto nonfatal_error;
319+
320+
/* reading nodelist from R directly would avoid the creation /
321+
* destruction of a hostlist. However, we get a hostlist to
322+
* ensure that the nodelist we return to users is consistently
323+
* formatted.
324+
*/
325+
if (!(hl = rlist_nodelist (rl)))
326+
goto nonfatal_error;
327+
328+
if (!(job->nodelist = hostlist_encode (hl)))
329+
goto nonfatal_error;
330+
331+
/* nonfatal error - invalid R, but we'll continue on. job listing
332+
* will get initialized data */
333+
nonfatal_error:
334+
rc = 0;
335+
saved_errno = errno;
336+
hostlist_destroy (hl);
337+
idset_destroy (idset);
338+
rlist_destroy (rl);
339+
errno = saved_errno;
340+
return rc;
341+
}
342+
343+
/*
344+
* vi:tabstop=4 shiftwidth=4 expandtab
345+
*/

0 commit comments

Comments
 (0)