Skip to content

Commit fe3b799

Browse files
authored
Merge pull request #4332 from chu11/job_list_cleanup
job-list: misc cleanup
2 parents c48ba00 + 30db1a6 commit fe3b799

File tree

10 files changed

+113
-174
lines changed

10 files changed

+113
-174
lines changed

src/modules/job-archive/job-archive.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ int job_info_lookup (struct job_archive_ctx *ctx, json_t *job)
412412
double t_run = 0.0;
413413

414414
if (json_unpack (job, "{s:I s?:f}", "id", &id, "t_run", &t_run) < 0) {
415-
flux_log (ctx->h, LOG_ERR, "%s: parse t_inactive", __FUNCTION__);
415+
flux_log (ctx->h, LOG_ERR, "%s: parse t_run", __FUNCTION__);
416416
goto error;
417417
}
418418

src/modules/job-list/idsync.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717
#include <flux/core.h>
1818

1919
#include "src/common/libczmqcontainers/czmq_containers.h"
20-
#include "src/common/libutil/errno_safe.h"
2120
#include "src/common/libjob/job_hash.h"
2221

2322
#include "idsync.h"
24-
#include "job_state.h"
2523

2624
void idsync_data_destroy (void *data)
2725
{

src/modules/job-list/job_state.c

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,14 @@ static void job_destroy (void *data)
118118
{
119119
struct job *job = data;
120120
if (job) {
121-
json_decref (job->exception_context);
121+
free (job->ranks);
122+
free (job->nodelist);
122123
json_decref (job->annotations);
124+
grudgeset_destroy (job->dependencies);
123125
json_decref (job->jobspec_job);
124126
json_decref (job->jobspec_cmd);
125127
json_decref (job->R);
126-
grudgeset_destroy (job->dependencies);
127-
free (job->ranks);
128-
free (job->nodelist);
128+
json_decref (job->exception_context);
129129
zlist_destroy (&job->next_states);
130130
free (job);
131131
}
@@ -145,25 +145,27 @@ static struct job *job_create (struct list_ctx *ctx, flux_jobid_t id)
145145
return NULL;
146146
job->ctx = ctx;
147147
job->id = id;
148-
job->state = FLUX_JOB_STATE_NEW;
149148
job->userid = FLUX_USERID_UNKNOWN;
150-
job->ntasks = -1;
151-
job->nnodes = -1;
152149
job->urgency = -1;
153-
job->expiration = -1.0;
154-
job->wait_status = -1;
155150
/* pending jobs that are not yet assigned a priority shall be
156151
* listed after those who do, so we set the job priority to MIN */
157152
job->priority = FLUX_JOB_PRIORITY_MIN;
153+
job->state = FLUX_JOB_STATE_NEW;
154+
job->ntasks = -1;
155+
job->nnodes = -1;
156+
job->expiration = -1.0;
157+
job->wait_status = -1;
158158
job->result = FLUX_JOB_RESULT_FAILED;
159-
job->eventlog_seq = -1;
160159

161160
if (!(job->next_states = zlist_new ())) {
162161
errno = ENOMEM;
163162
job_destroy (job);
164163
return NULL;
165164
}
166165

166+
job->states_mask = FLUX_JOB_STATE_NEW;
167+
job->states_events_mask = FLUX_JOB_STATE_NEW;
168+
job->eventlog_seq = -1;
167169
return job;
168170
}
169171

@@ -824,14 +826,16 @@ static void process_next_state (struct list_ctx *ctx, struct job *job)
824826
if (st->state == FLUX_JOB_STATE_DEPEND) {
825827
/* get initial jobspec */
826828
if (!(f = state_depend_lookup (jsctx, job))) {
827-
flux_log_error (jsctx->h, "%s: state_depend_lookup", __FUNCTION__);
829+
flux_log_error (jsctx->h, "%s: state_depend_lookup",
830+
__FUNCTION__);
828831
return;
829832
}
830833
}
831834
else { /* st->state == FLUX_JOB_STATE_RUN */
832835
/* get R to get node count, etc. */
833836
if (!(f = state_run_lookup (jsctx, job))) {
834-
flux_log_error (jsctx->h, "%s: state_run_lookup", __FUNCTION__);
837+
flux_log_error (jsctx->h, "%s: state_run_lookup",
838+
__FUNCTION__);
835839
return;
836840
}
837841
}
@@ -1231,14 +1235,12 @@ static int submit_context_parse (flux_t *h,
12311235
{
12321236
int urgency;
12331237
int userid;
1234-
int flags;
12351238

12361239
if (!context
12371240
|| json_unpack (context,
1238-
"{ s:i s:i s:i }",
1241+
"{ s:i s:i }",
12391242
"urgency", &urgency,
1240-
"userid", &userid,
1241-
"flags", &flags) < 0) {
1243+
"userid", &userid) < 0) {
12421244
flux_log (h, LOG_ERR, "%s: submit context invalid: %ju",
12431245
__FUNCTION__, (uintmax_t)job->id);
12441246
errno = EPROTO;

src/modules/job-list/job_state.h

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ struct job_state_ctx {
5959
flux_future_t *events;
6060
};
6161

62+
/* timestamp of when we enter the state
63+
*
64+
* associated eventlog entries when restarting
65+
*
66+
* t_depend - "submit"
67+
* t_priority - "priority" (not saved, can be entered multiple times)
68+
* t_sched - "depend" (not saved, can be entered multiple times)
69+
* t_run - "alloc"
70+
* t_cleanup - "finish" or "exception" w/ severity == 0
71+
* t_inactive - "clean"
72+
*/
6273
struct job {
6374
struct list_ctx *ctx;
6475

@@ -67,7 +78,11 @@ struct job {
6778
int urgency;
6879
int64_t priority;
6980
double t_submit;
70-
int flags;
81+
// t_depend is identical to t_submit
82+
// double t_depend;
83+
double t_run;
84+
double t_cleanup;
85+
double t_inactive;
7186
flux_job_state_t state;
7287
const char *name;
7388
int ntasks;
@@ -78,19 +93,18 @@ struct job {
7893
int wait_status;
7994
bool success;
8095
bool exception_occurred;
81-
json_t *exception_context;
8296
int exception_severity;
8397
const char *exception_type;
8498
const char *exception_note;
8599
flux_job_result_t result;
86100
json_t *annotations;
87101
struct grudgeset *dependencies;
88-
int eventlog_seq; /* last event seq read */
89102

90103
/* cache of job information */
91104
json_t *jobspec_job;
92105
json_t *jobspec_cmd;
93106
json_t *R;
107+
json_t *exception_context;
94108

95109
/* Track which states we have seen and have completed transition
96110
* to. We do not immediately update to the new state and place
@@ -109,22 +123,7 @@ struct job {
109123
unsigned int states_events_mask;
110124
void *list_handle;
111125

112-
/* timestamp of when we enter the state
113-
*
114-
* associated eventlog entries when restarting
115-
*
116-
* depend - "submit"
117-
* priority - "priority" (not saved, can be entered multiple times)
118-
* sched - "depend" (not saved, can be entered multiple times)
119-
* run - "alloc"
120-
* cleanup - "finish" or "exception" w/ severity == 0
121-
* inactive - "clean"
122-
*/
123-
// t_depend is identical to t_submit above, use that
124-
// double t_depend;
125-
double t_run;
126-
double t_cleanup;
127-
double t_inactive;
126+
int eventlog_seq; /* last event seq read */
128127
};
129128

130129
struct job_state_ctx *job_state_create (struct list_ctx *ctx);

src/modules/job-list/list.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "src/common/libutil/errno_safe.h"
2121
#include "src/common/libczmqcontainers/czmq_containers.h"
2222

23+
#include "job-list.h"
2324
#include "idsync.h"
2425
#include "list.h"
2526
#include "job_util.h"

src/modules/job-list/list.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313

1414
#include <flux/core.h>
1515

16-
#include "job-list.h"
17-
1816
void list_cb (flux_t *h, flux_msg_handler_t *mh,
1917
const flux_msg_t *msg, void *arg);
2018

t/Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ dist_check_SCRIPTS = \
323323
job-exec/imp-fail.sh \
324324
job-list/list-id.py \
325325
job-list/list-rpc.py \
326+
job-list/job-list-helper.sh \
326327
job-list/jobspec-permissive.jsonschema \
327328
ingest/bad-validate.py
328329

t/job-list/job-list-helper.sh

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/bin/sh
2+
#
3+
4+
# job-list test helper functions
5+
6+
JOB_LIST_WAIT_ITERS=50
7+
8+
# Return the expected jobids list in a given state:
9+
# "all", "pending", "running", "inactive", "active",
10+
# "completed", "canceled", "failed"
11+
#
12+
job_list_state_ids() {
13+
for f in "$@"; do
14+
cat ${f}.ids
15+
done
16+
}
17+
18+
# Return the expected count of jobs in a given state (See above for list)
19+
#
20+
job_list_state_count() {
21+
job_list_state_ids "$@" | wc -l
22+
}
23+
24+
# the job-list module has eventual consistency with the jobs stored in
25+
# the job-manager's queue. To ensure no raciness in tests, we spin
26+
# until all of the pending jobs have reached SCHED state, running jobs
27+
# have reached RUN state, and inactive jobs have reached INACTIVE
28+
# state.
29+
#
30+
# job ids for jobs in these states are expected to be in pending.ids,
31+
# running.ids, and inactive.ids respectively.
32+
33+
job_list_wait_states() {
34+
pending=$(job_list_state_count pending)
35+
running=$(job_list_state_count running)
36+
inactive=$(job_list_state_count inactive)
37+
local i=0
38+
while ( [ "$(flux job list --states=sched | wc -l)" != "$pending" ] \
39+
|| [ "$(flux job list --states=run | wc -l)" != "$running" ] \
40+
|| [ "$(flux job list --states=inactive | wc -l)" != "$inactive" ]) \
41+
&& [ $i -lt ${JOB_LIST_WAIT_ITERS} ]
42+
do
43+
sleep 0.1
44+
i=$((i + 1))
45+
done
46+
if [ "$i" -eq "${JOB_LIST_WAIT_ITERS}" ]
47+
then
48+
return 1
49+
fi
50+
return 0
51+
}

0 commit comments

Comments
 (0)