4545#define STATE_TRANSITION_FLAG_REVERT 0x1
4646#define STATE_TRANSITION_FLAG_CONDITIONAL 0x2
4747
48- typedef enum {
49- JOB_UPDATE_TYPE_STATE_TRANSITION ,
50- JOB_UPDATE_TYPE_JOBSPEC_UPDATE ,
51- JOB_UPDATE_TYPE_RESOURCE_UPDATE ,
52- } job_update_type_t ;
53-
54- struct job_update {
55- job_update_type_t type ;
56-
57- /* state transitions */
58- flux_job_state_t state ;
59- double timestamp ;
60- int flags ;
61- flux_job_state_t expected_state ;
62-
63- /* jobspec_update, resource_update */
64- json_t * update_context ;
65-
66- /* all updates */
67- bool processing ; /* indicates we are waiting for
68- * current update to complete */
69- bool finished ; /* indicates we are done, can remove
70- * from list */
71- };
72-
7348static int submit_context_parse (flux_t * h ,
7449 struct job * job ,
7550 json_t * context );
@@ -94,8 +69,6 @@ static int memo_update (flux_t *h,
9469 struct job * job ,
9570 json_t * context );
9671
97- static void process_updates (struct job_state_ctx * jsctx , struct job * job );
98-
9972static int journal_process_events (struct job_state_ctx * jsctx ,
10073 const flux_msg_t * msg );
10174
@@ -291,154 +264,56 @@ static void eventlog_inactive_complete (struct job *job)
291264 }
292265}
293266
294- static void job_update_destroy (void * data )
295- {
296- struct job_update * updt = data ;
297- if (updt ) {
298- int saved_errno = errno ;
299- json_decref (updt -> update_context );
300- free (updt );
301- errno = saved_errno ;
302- }
303- }
304-
305- static struct job_update * job_update_create (job_update_type_t type )
306- {
307- struct job_update * updt = NULL ;
308-
309- if (!(updt = calloc (1 , sizeof (* updt ))))
310- return NULL ;
311- updt -> type = type ;
312- updt -> processing = false;
313- updt -> finished = false;
314- return updt ;
315- }
316-
317- static int append_update (struct job * job , struct job_update * updt )
318- {
319- if (zlist_append (job -> updates , updt ) < 0 ) {
320- errno = ENOMEM ;
321- return -1 ;
322- }
323- zlist_freefn (job -> updates , updt , job_update_destroy , true);
324- return 0 ;
325- }
326-
327- static int add_state_transition (struct job * job ,
328- flux_job_state_t newstate ,
329- double timestamp ,
330- int flags ,
331- flux_job_state_t expected_state )
332- {
333- struct job_update * updt = NULL ;
334-
335- if (!((flags & STATE_TRANSITION_FLAG_REVERT )
336- || (flags & STATE_TRANSITION_FLAG_CONDITIONAL ))
337- && (newstate & job -> states_events_mask ))
338- return 0 ;
339-
340- if (!(updt = job_update_create (JOB_UPDATE_TYPE_STATE_TRANSITION )))
341- return -1 ;
342-
343- updt -> state = newstate ;
344- updt -> timestamp = timestamp ;
345- updt -> flags = flags ;
346- updt -> expected_state = expected_state ;
347-
348- if (append_update (job , updt ) < 0 )
349- goto cleanup ;
350-
351- job -> states_events_mask |= newstate ;
352- return 0 ;
353-
354- cleanup :
355- job_update_destroy (updt );
356- return -1 ;
357- }
358-
359- static int add_update (struct job * job , json_t * context , job_update_type_t type )
360- {
361- struct job_update * updt = NULL ;
362-
363- if (!(updt = job_update_create (type )))
364- return -1 ;
365-
366- updt -> update_context = json_incref (context );
367-
368- if (append_update (job , updt ) < 0 )
369- goto cleanup ;
370-
371- return 0 ;
372-
373- cleanup :
374- job_update_destroy (updt );
375- return -1 ;
376- }
377-
378- static int add_jobspec_update (struct job * job , json_t * context )
379- {
380- return add_update (job , context , JOB_UPDATE_TYPE_JOBSPEC_UPDATE );
381- }
382-
383- static int add_resource_update (struct job * job , json_t * context )
384- {
385- return add_update (job , context , JOB_UPDATE_TYPE_RESOURCE_UPDATE );
386- }
387-
388267static void process_state_transition_update (struct job_state_ctx * jsctx ,
389268 struct job * job ,
390- struct job_update * updt )
269+ flux_job_state_t state ,
270+ double timestamp ,
271+ int flags ,
272+ flux_job_state_t expected_state )
391273{
392- if ((updt -> flags & STATE_TRANSITION_FLAG_REVERT )) {
274+ if ((flags & STATE_TRANSITION_FLAG_REVERT )) {
393275 /* only revert if the current state is what is expected */
394- if (job -> state == updt -> expected_state ) {
276+ if (job -> state == expected_state ) {
395277 job -> states_mask &= ~job -> state ;
396- job -> states_mask &= ~updt -> state ;
397- update_job_state_and_list (jsctx , job , updt -> state , updt -> timestamp );
278+ job -> states_mask &= ~state ;
279+ update_job_state_and_list (jsctx , job , state , timestamp );
398280 }
399- else {
400- updt -> finished = true;
281+ else
401282 return ;
402- }
403283 }
404- else if ((updt -> flags & STATE_TRANSITION_FLAG_CONDITIONAL )) {
284+ else if ((flags & STATE_TRANSITION_FLAG_CONDITIONAL )) {
405285 /* if current state isn't what we expected, move on */
406- if (job -> state != updt -> expected_state ) {
407- updt -> finished = true;
286+ if (job -> state != expected_state )
408287 return ;
409- }
410288 }
411- if (updt -> state == FLUX_JOB_STATE_DEPEND ) {
289+ if (state == FLUX_JOB_STATE_DEPEND ) {
412290 // process job->jobspec which was obtained from journal
413291 if (job_parse_jobspec_cached (job , job -> jobspec_updates ) < 0 ) {
414292 flux_log_error (jsctx -> h ,
415293 "%s: error parsing jobspec" ,
416294 idf58 (job -> id ));
417295 }
418- update_job_state_and_list (jsctx , job , updt -> state , updt -> timestamp );
419- updt -> finished = true;
296+ update_job_state_and_list (jsctx , job , state , timestamp );
420297 }
421- else if (updt -> state == FLUX_JOB_STATE_RUN ) {
298+ else if (state == FLUX_JOB_STATE_RUN ) {
422299 // process job->R which was obtained from journal
423300 if (job_parse_R_cached (job , NULL ) < 0 ) {
424301 flux_log_error (jsctx -> h ,
425302 "%s: error parsing R" ,
426303 idf58 (job -> id ));
427304 }
428- update_job_state_and_list (jsctx , job , updt -> state , updt -> timestamp );
429- updt -> finished = true;
305+ update_job_state_and_list (jsctx , job , state , timestamp );
430306 }
431307 else {
432308 /* FLUX_JOB_STATE_PRIORITY */
433309 /* FLUX_JOB_STATE_SCHED */
434310 /* FLUX_JOB_STATE_CLEANUP */
435311 /* FLUX_JOB_STATE_INACTIVE */
436312
437- if (updt -> state == FLUX_JOB_STATE_INACTIVE )
313+ if (state == FLUX_JOB_STATE_INACTIVE )
438314 eventlog_inactive_complete (job );
439315
440- update_job_state_and_list (jsctx , job , updt -> state , updt -> timestamp );
441- updt -> finished = true;
316+ update_job_state_and_list (jsctx , job , state , timestamp );
442317 }
443318}
444319
@@ -474,23 +349,6 @@ static void update_jobspec (struct job_state_ctx *jsctx,
474349 job_stats_add_queue (jsctx -> statsctx , job );
475350}
476351
477- static void process_jobspec_update (struct job_state_ctx * jsctx ,
478- struct job * job ,
479- struct job_update * updt )
480- {
481- /* Generally speaking, after a job is running, jobspec-update
482- * events should have no effect. Note that in some cases,
483- * such as job duration, jobspec-updates can alter a job's
484- * behavior, but it is via an update to R. In this case, we
485- * elect to not update the job duration seen by the user in
486- * the jobspec. The effect will be seen changes in R (in this
487- * example, via the job expiration time in R).
488- */
489- if (job -> state < FLUX_JOB_STATE_RUN )
490- update_jobspec (jsctx , job , updt -> update_context , true);
491- updt -> finished = true;
492- }
493-
494352static void update_resource (struct job_state_ctx * jsctx ,
495353 struct job * job ,
496354 json_t * context )
@@ -513,40 +371,6 @@ static void update_resource (struct job_state_ctx *jsctx,
513371 job_R_update (job , context );
514372}
515373
516- static void process_resource_update (struct job_state_ctx * jsctx ,
517- struct job * job ,
518- struct job_update * updt )
519- {
520- /* Generally speaking, resource-update events only have an effect
521- * when a job is running. */
522- if (job -> state == FLUX_JOB_STATE_RUN )
523- update_resource (jsctx , job , updt -> update_context );
524- updt -> finished = true;
525- }
526-
527- static void process_updates (struct job_state_ctx * jsctx , struct job * job )
528- {
529- struct job_update * updt ;
530-
531- while ((updt = zlist_head (job -> updates ))
532- && (!updt -> processing || updt -> finished )) {
533-
534- if (updt -> finished )
535- goto next ;
536-
537- if (updt -> type == JOB_UPDATE_TYPE_STATE_TRANSITION )
538- process_state_transition_update (jsctx , job , updt );
539- else if (updt -> type == JOB_UPDATE_TYPE_JOBSPEC_UPDATE )
540- process_jobspec_update (jsctx , job , updt );
541- else /* updt->type == JOB_UPDATE_TYPE_RESOURCE_UPDATE */
542- process_resource_update (jsctx , job , updt );
543-
544- next :
545- if (updt -> finished )
546- zlist_remove (job -> updates , updt );
547- }
548- }
549-
550374void job_state_pause_cb (flux_t * h , flux_msg_handler_t * mh ,
551375 const flux_msg_t * msg , void * arg )
552376{
@@ -602,16 +426,19 @@ static int job_transition_state (struct job_state_ctx *jsctx,
602426 int flags ,
603427 flux_job_state_t expected_state )
604428{
605- if (add_state_transition (job ,
606- newstate ,
607- timestamp ,
608- flags ,
609- expected_state ) < 0 ) {
610- flux_log_error (jsctx -> h , "%s: add_state_transition" ,
611- __FUNCTION__ );
612- return -1 ;
613- }
614- process_updates (jsctx , job );
429+ if (!((flags & STATE_TRANSITION_FLAG_REVERT )
430+ || (flags & STATE_TRANSITION_FLAG_CONDITIONAL ))
431+ && (newstate & job -> states_events_mask ))
432+ return 0 ;
433+
434+ job -> states_events_mask |= newstate ;
435+
436+ process_state_transition_update (jsctx ,
437+ job ,
438+ newstate ,
439+ timestamp ,
440+ flags ,
441+ expected_state );
615442 return 0 ;
616443}
617444
@@ -979,12 +806,16 @@ static int journal_jobspec_update_event (struct job_state_ctx *jsctx,
979806 errno = EPROTO ;
980807 return -1 ;
981808 }
982-
983- if (add_jobspec_update (job , context ) < 0 ) {
984- flux_log_error (jsctx -> h , "%s: add_jobspec_update" , __FUNCTION__ );
985- return -1 ;
986- }
987- process_updates (jsctx , job );
809+ /* Generally speaking, after a job is running, jobspec-update
810+ * events should have no effect. Note that in some cases,
811+ * such as job duration, jobspec-updates can alter a job's
812+ * behavior, but it is via an update to R. In this case, we
813+ * elect to not update the job duration seen by the user in
814+ * the jobspec. The effect will be seen changes in R (in this
815+ * example, via the job expiration time in R).
816+ */
817+ if (job -> state < FLUX_JOB_STATE_RUN )
818+ update_jobspec (jsctx , job , context , true);
988819 return 0 ;
989820}
990821
@@ -999,12 +830,10 @@ static int journal_resource_update_event (struct job_state_ctx *jsctx,
999830 errno = EPROTO ;
1000831 return -1 ;
1001832 }
1002-
1003- if (add_resource_update (job , context ) < 0 ) {
1004- flux_log_error (jsctx -> h , "%s: add_resource_update" , __FUNCTION__ );
1005- return -1 ;
1006- }
1007- process_updates (jsctx , job );
833+ /* Generally speaking, resource-update events only have an effect
834+ * when a job is running. */
835+ if (job -> state == FLUX_JOB_STATE_RUN )
836+ update_resource (jsctx , job , context );
1008837 return 0 ;
1009838}
1010839
0 commit comments