Skip to content

Commit 526a2d3

Browse files
committed
job-list: allow list-id to wait for job state
Problem: It'd be convenient if the job-list.list-id service not only waited for a job id to be legal but to also waited for the job to pass a certain job state. Solution: Support an optional job state in the job-list.list-id service. Return when a job passes this job state or reaches the inactive state. Fixes #2864
1 parent c123d54 commit 526a2d3

File tree

4 files changed

+58
-17
lines changed

4 files changed

+58
-17
lines changed

src/modules/job-list/idsync.c

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ static struct idsync_data *idsync_data_create (flux_t *h,
5454
flux_jobid_t id,
5555
const flux_msg_t *msg,
5656
json_t *attrs,
57+
flux_job_state_t state,
5758
flux_future_t *f_lookup)
5859
{
5960
struct idsync_data *isd = NULL;
@@ -66,6 +67,7 @@ static struct idsync_data *idsync_data_create (flux_t *h,
6667
if (!(isd->msg = flux_msg_copy (msg, false)))
6768
goto error;
6869
isd->attrs = json_incref (attrs);
70+
isd->state = state;
6971
isd->f_lookup = f_lookup;
7072
return isd;
7173

@@ -137,7 +139,8 @@ void idsync_ctx_destroy (struct idsync_ctx *isctx)
137139
struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx,
138140
flux_jobid_t id,
139141
const flux_msg_t *msg,
140-
json_t *attrs)
142+
json_t *attrs,
143+
flux_job_state_t state)
141144
{
142145
flux_future_t *f = NULL;
143146
struct idsync_data *isd = NULL;
@@ -153,7 +156,7 @@ struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx,
153156
goto error;
154157
}
155158

156-
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, f)))
159+
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, state, f)))
157160
goto error;
158161

159162
/* future now owned by struct idsync_data */
@@ -231,11 +234,12 @@ int idsync_wait_valid (struct idsync_ctx *isctx, struct idsync_data *isd)
231234
int idsync_wait_valid_id (struct idsync_ctx *isctx,
232235
flux_jobid_t id,
233236
const flux_msg_t *msg,
234-
json_t *attrs)
237+
json_t *attrs,
238+
flux_job_state_t state)
235239
{
236240
struct idsync_data *isd = NULL;
237241

238-
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, NULL)))
242+
if (!(isd = idsync_data_create (isctx->h, id, msg, attrs, state, NULL)))
239243
return -1;
240244

241245
return idsync_add_waiter (isctx, isd);
@@ -270,10 +274,24 @@ void idsync_check_waiting_id (struct idsync_ctx *isctx, struct job *job)
270274
struct idsync_data *isd;
271275
isd = zlistx_first (iwl->l);
272276
while (isd) {
273-
idsync_data_respond (isctx, isd, job);
277+
/* Some job states can be missed. For example a job that
278+
* is canceled before it runs will never reach the
279+
* FLUX_JOB_STATE_RUN state. To ensure jobs waiting on
280+
* states that are missed will eventually get a response, always
281+
* respond once the job has reached the inactive state.
282+
*/
283+
if (!isd->state
284+
|| (isd->state & job->states_mask)
285+
|| (isd->state && job->state == FLUX_JOB_STATE_INACTIVE)) {
286+
struct idsync_data *tmp;
287+
idsync_data_respond (isctx, isd, job);
288+
tmp = zlistx_detach_cur (iwl->l);
289+
idsync_data_destroy (tmp);
290+
}
274291
isd = zlistx_next (iwl->l);
275292
}
276-
zhashx_delete (isctx->waits, &iwl->id);
293+
if (!zlistx_size (iwl->l))
294+
zhashx_delete (isctx->waits, &job->id);
277295
}
278296
}
279297

src/modules/job-list/idsync.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ struct idsync_data {
2828
flux_jobid_t id;
2929
flux_msg_t *msg;
3030
json_t *attrs;
31+
flux_job_state_t state;
3132

3233
flux_future_t *f_lookup;
3334
};
@@ -45,7 +46,8 @@ void idsync_data_destroy (void *data);
4546
struct idsync_data *idsync_check_id_valid (struct idsync_ctx *isctx,
4647
flux_jobid_t id,
4748
const flux_msg_t *msg,
48-
json_t *attrs);
49+
json_t *attrs,
50+
flux_job_state_t state);
4951

5052

5153
/* free / cleanup 'struct idsync_data' after
@@ -65,7 +67,8 @@ int idsync_wait_valid (struct idsync_ctx *isctx, struct idsync_data *isd);
6567
int idsync_wait_valid_id (struct idsync_ctx *isctx,
6668
flux_jobid_t id,
6769
const flux_msg_t *msg,
68-
json_t *attrs);
70+
json_t *attrs,
71+
flux_job_state_t state);
6972

7073
/* check if 'job' is in waits list, if so respond to original
7174
* message */

src/modules/job-list/job_state.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,8 @@ static void update_job_state_and_list (struct job_state_ctx *jsctx,
267267
zlistx_reorder (jsctx->pending,
268268
job->list_handle,
269269
search_direction (job));
270+
271+
idsync_check_waiting_id (jsctx->isctx, job);
270272
}
271273

272274
static void state_depend_lookup_continuation (flux_future_t *f, void *arg)
@@ -291,7 +293,6 @@ static void state_depend_lookup_continuation (flux_future_t *f, void *arg)
291293
st = zlist_head (job->next_states);
292294
assert (st);
293295
update_job_state_and_list (jsctx, job, st->state, st->timestamp);
294-
idsync_check_waiting_id (jsctx->isctx, job);
295296
zlist_remove (job->next_states, st);
296297
process_next_state (jsctx, job);
297298

src/modules/job-list/list.c

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx,
3232
const flux_msg_t *msg,
3333
flux_jobid_t id,
3434
json_t *attrs,
35+
flux_job_state_t state,
3536
bool *stall);
3637

3738
/* Filter test to determine if job desired by caller */
@@ -294,7 +295,7 @@ void check_id_valid_continuation (flux_future_t *f, void *arg)
294295
else {
295296
json_t *o;
296297
if (!(o = get_job_by_id (jsctx, NULL, isd->msg,
297-
isd->id, isd->attrs, NULL))) {
298+
isd->id, isd->attrs, isd->state, NULL))) {
298299
flux_log_error (jsctx->h, "%s: get_job_by_id", __FUNCTION__);
299300
goto cleanup;
300301
}
@@ -314,14 +315,16 @@ void check_id_valid_continuation (flux_future_t *f, void *arg)
314315
int check_id_valid (struct job_state_ctx *jsctx,
315316
const flux_msg_t *msg,
316317
flux_jobid_t id,
317-
json_t *attrs)
318+
json_t *attrs,
319+
flux_job_state_t state)
318320
{
319321
struct idsync_data *isd = NULL;
320322

321323
if (!(isd = idsync_check_id_valid (jsctx->isctx,
322324
id,
323325
msg,
324-
attrs))
326+
attrs,
327+
state))
325328
|| flux_future_aux_set (isd->f_lookup,
326329
"job_state_ctx",
327330
jsctx,
@@ -349,13 +352,14 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx,
349352
const flux_msg_t *msg,
350353
flux_jobid_t id,
351354
json_t *attrs,
355+
flux_job_state_t state,
352356
bool *stall)
353357
{
354358
struct job *job;
355359

356360
if (!(job = zhashx_lookup (jsctx->index, &id))) {
357361
if (stall) {
358-
if (check_id_valid (jsctx, msg, id, attrs) < 0) {
362+
if (check_id_valid (jsctx, msg, id, attrs, state) < 0) {
359363
flux_log_error (jsctx->h, "%s: check_id_valid", __FUNCTION__);
360364
return NULL;
361365
}
@@ -367,7 +371,7 @@ json_t *get_job_by_id (struct job_state_ctx *jsctx,
367371
if (job->state == FLUX_JOB_STATE_NEW) {
368372
if (stall) {
369373
/* Must wait for job-list to see state change */
370-
if (idsync_wait_valid_id (jsctx->isctx, id, msg, attrs) < 0) {
374+
if (idsync_wait_valid_id (jsctx->isctx, id, msg, attrs, state) < 0) {
371375
flux_log_error (jsctx->h, "%s: idsync_wait_valid_id",
372376
__FUNCTION__);
373377
return NULL;
@@ -388,11 +392,14 @@ void list_id_cb (flux_t *h, flux_msg_handler_t *mh,
388392
json_t *job;
389393
flux_jobid_t id;
390394
json_t *attrs;
395+
int state = 0;
396+
int valid_states = FLUX_JOB_STATE_ACTIVE | FLUX_JOB_STATE_INACTIVE;
391397
bool stall = false;
392398

393-
if (flux_request_unpack (msg, NULL, "{s:I s:o}",
399+
if (flux_request_unpack (msg, NULL, "{s:I s:o s?i}",
394400
"id", &id,
395-
"attrs", &attrs) < 0) {
401+
"attrs", &attrs,
402+
"state", &state) < 0) {
396403
seterror (&err, "invalid payload: %s", flux_msg_last_error (msg));
397404
errno = EPROTO;
398405
goto error;
@@ -404,7 +411,19 @@ void list_id_cb (flux_t *h, flux_msg_handler_t *mh,
404411
goto error;
405412
}
406413

407-
if (!(job = get_job_by_id (ctx->jsctx, &err, msg, id, attrs, &stall))) {
414+
if (state && (state & ~valid_states)) {
415+
seterror (&err, "invalid payload: invalid state specified");
416+
errno = EPROTO;
417+
goto error;
418+
}
419+
420+
if (!(job = get_job_by_id (ctx->jsctx,
421+
&err,
422+
msg,
423+
id,
424+
attrs,
425+
state,
426+
&stall))) {
408427
/* response handled after KVS lookup complete */
409428
if (stall)
410429
goto stall;

0 commit comments

Comments
 (0)