2828
2929#include "util.h"
3030#include "job.h"
31- #include "workcrew .h"
31+ #include "pipeline .h"
3232
3333/* job-ingest takes in signed jobspec submitted through flux_job_submit(),
3434 * performing the following tasks for each job:
7070 * Too large, and individual job submit latency will suffer.
7171 * Too small, and KVS commit overhead will increase.
7272 */
73- const double batch_timeout = 0.01 ;
74-
75- /* Timeout (seconds) to wait for validators to terminate when
76- * stopped by closing their stdin. If the timer pops, stop the reactor
77- * and allow workcrew_destroy() to signal them.
78- */
79- const double shutdown_timeout = 5. ;
80-
73+ static const double batch_timeout = 0.01 ;
8174
8275struct job_ingest_ctx {
8376 flux_t * h ;
84- struct workcrew * validate ;
77+ struct pipeline * pipeline ;
8578#if HAVE_FLUX_SECURITY
8679 flux_security_t * sec ;
8780#else
@@ -95,9 +88,7 @@ struct job_ingest_ctx {
9588
9689 int batch_count ; // if nonzero, batch by count not timer
9790
98- bool shutdown ; // no new jobs are accepted in shutdown mode
99- int shutdown_process_count ; // number of validators executing at shutdown
100- flux_watcher_t * shutdown_timer ;
91+ bool shutdown ;
10192};
10293
10394struct batch {
@@ -528,7 +519,7 @@ static int ingest_add_job (struct job_ingest_ctx *ctx, struct job *job)
528519 return 0 ;
529520}
530521
531- void validate_continuation (flux_future_t * f , void * arg )
522+ void pipeline_continuation (flux_future_t * f , void * arg )
532523{
533524 struct job * job = arg ;
534525 struct job_ingest_ctx * ctx = flux_future_aux_get (f , "ctx" );
@@ -564,81 +555,40 @@ static void submit_cb (flux_t *h, flux_msg_handler_t *mh,
564555 const char * errmsg = NULL ;
565556 flux_error_t error ;
566557 flux_future_t * f = NULL ;
567- json_error_t json_error ;
568- json_t * o = NULL ;
569558
570559 if (ctx -> shutdown ) {
571560 errno = ENOSYS ;
572561 goto error ;
573562 }
574-
575- /* Parse request.
576- */
577563 if (!(job = job_create_from_request (msg , ctx -> sec , & error ))) {
578564 errmsg = error .text ;
579565 goto error ;
580566 }
581- if (ctx -> validate && !(job -> flags & FLUX_JOB_NOVALIDATE )) {
582- /* Validate jobspec asynchronously.
583- * Continue submission process in validate_continuation().
584- */
585- if (!(o = json_pack_ex (& json_error ,
586- 0 ,
587- "{s:O s:I s:i s:i s:i}" ,
588- "jobspec" , job -> jobspec_obj ,
589- "userid" , (json_int_t ) job -> cred .userid ,
590- "rolemask" , job -> cred .rolemask ,
591- "urgency" , job -> urgency ,
592- "flags" , job -> flags ))) {
593- errprintf (& error , "Internal error: %s" , json_error .text );
594- errmsg = error .text ;
567+ if (pipeline_process_job (ctx -> pipeline , job , & f , & error ) < 0 ) {
568+ errmsg = error .text ;
569+ goto error ;
570+ }
571+ if (f ) {
572+ if (flux_future_then (f , -1. , pipeline_continuation , job ) < 0
573+ || flux_future_aux_set (f , "ctx" , ctx , NULL ) < 0 ) {
595574 goto error ;
596575 }
597- if (!(f = workcrew_process_job (ctx -> validate , o )))
598- goto error ;
599- if (flux_future_then (f , -1. , validate_continuation , job ) < 0
600- || flux_future_aux_set (f , "ctx" , ctx , NULL ) < 0 )
576+ }
577+ else {
578+ if (ingest_add_job (ctx , job ) < 0 )
601579 goto error ;
602- json_decref (o );
603580 }
604- else if (ingest_add_job (ctx , job ) < 0 )
605- goto error ;
606581 return ;
607582error :
608- json_decref (o );
609583 if (flux_respond_error (h , msg , errno , errmsg ) < 0 )
610584 flux_log_error (h , "%s: flux_respond_error" , __FUNCTION__ );
611585 job_destroy (job );
612586 flux_future_destroy (f );
613587}
614588
615- static void exit_cb (void * arg )
616- {
617- struct job_ingest_ctx * ctx = arg ;
618-
619- if (-- ctx -> shutdown_process_count == 0 ) {
620- flux_watcher_stop (ctx -> shutdown_timer );
621- flux_reactor_stop (flux_get_reactor (ctx -> h ));
622- }
623- }
624-
625- static void shutdown_timeout_cb (flux_reactor_t * r ,
626- flux_watcher_t * w ,
627- int revents ,
628- void * arg )
629- {
630- struct job_ingest_ctx * ctx = arg ;
631-
632- flux_log (ctx -> h ,
633- LOG_ERR ,
634- "shutdown timed out with %d validators active" ,
635- ctx -> shutdown_process_count );
636- flux_reactor_stop (flux_get_reactor (ctx -> h ));
637- }
638-
639589/* Override built-in shutdown handler that calls flux_reactor_stop().
640- * Since libsubprocess client is not able to run outside of the reactor ,
641- * take care of cleaning up validator before exiting reactor.
590+ * Since libsubprocess clients must run in reactive mode ,
591+ * take care of cleaning up the pipeline before exiting reactor.
642592 */
643593static void shutdown_cb (flux_t * h ,
644594 flux_msg_handler_t * mh ,
@@ -648,15 +598,8 @@ static void shutdown_cb (flux_t *h,
648598 struct job_ingest_ctx * ctx = arg ;
649599
650600 ctx -> shutdown = true; // fail any new submit requests
651- ctx -> shutdown_process_count = workcrew_stop_notify (ctx -> validate ,
652- exit_cb ,
653- ctx );
654- if (ctx -> shutdown_process_count == 0 )
655- flux_reactor_stop (flux_get_reactor (h ));
656- else {
657- flux_timer_watcher_reset (ctx -> shutdown_timer , shutdown_timeout , 0. );
658- flux_watcher_start (ctx -> shutdown_timer );
659- }
601+
602+ pipeline_shutdown (ctx -> pipeline );
660603}
661604
662605static void getinfo_cb (flux_t * h ,
@@ -698,105 +641,45 @@ static void getinfo_cb (flux_t *h,
698641static int job_ingest_configure (struct job_ingest_ctx * ctx ,
699642 const flux_conf_t * conf ,
700643 int argc ,
701- char * * argv )
644+ char * * argv ,
645+ flux_error_t * error )
702646{
703- flux_error_t error ;
704- json_t * plugins = NULL ;
705- json_t * args = NULL ;
706- char * validator_plugins = NULL ;
707- char * validator_args = NULL ;
708- int disable_validator = 0 ;
709- int rc = -1 ;
647+ flux_error_t conf_error ;
710648
649+ if (pipeline_configure (ctx -> pipeline , conf , argc , argv , error ) < 0 )
650+ return -1 ;
711651 if (flux_conf_unpack (conf ,
712- & error ,
713- "{s?{s?{s?o s?o s?b !} s?i ! }}" ,
652+ & conf_error ,
653+ "{s?{s?i }}" ,
714654 "ingest" ,
715- "validator" ,
716- "args" , & args ,
717- "plugins" , & plugins ,
718- "disable" , & disable_validator ,
719655 "batch-count" , & ctx -> batch_count ) < 0 ) {
720- flux_log ( ctx -> h , LOG_ERR ,
656+ errprintf ( error ,
721657 "error reading [ingest] config table: %s" ,
722- error .text );
723- goto out ;
724- }
725-
726- if (plugins && !(validator_plugins = util_join_arguments (plugins ))) {
727- flux_log_error (ctx -> h ,
728- "error in [ingest.validator] plugins array" );
729- goto out ;
730- }
731- if (args && !(validator_args = util_join_arguments (args ))) {
732- flux_log_error (ctx -> h ,
733- "error in [ingest.validator] args array" );
734- goto out ;
658+ conf_error .text );
659+ return -1 ;
735660 }
736-
737- /* Process cmdline args */
738661 for (int i = 0 ; i < argc ; i ++ ) {
739- if (!strncmp (argv [i ], "validator-args=" , 15 )) {
740- free (validator_args );
741- validator_args = strdup (argv [i ] + 15 );
742- }
743- else if (!strncmp (argv [i ], "validator-plugins=" , 18 )) {
744- free (validator_plugins );
745- validator_plugins = strdup (argv [i ] + 18 );
662+ if (!strncmp (argv [i ], "validator-args=" , 15 )
663+ || !strncmp (argv [i ], "validator-plugins=" , 18 )
664+ || !strcmp (argv [i ], "disable-validator" )) {
665+ /* handled in pipeline.c */
746666 }
747667 else if (!strncmp (argv [i ], "batch-count=" , 12 )) {
748668 char * endptr ;
749669 ctx -> batch_count = strtol (argv [i ]+ 12 , & endptr , 0 );
750670 if (* endptr != '\0' || ctx -> batch_count < 0 ) {
751- flux_log (ctx -> h , LOG_ERR , "Invalid batch-count: %s" , argv [i ]);
752- goto out ;
671+ errprintf (error , "Invalid batch-count: %s" , argv [i ]);
672+ errno = EINVAL ;
673+ return -1 ;
753674 }
754675 }
755- else if (!strcmp (argv [i ], "disable-validator" )) {
756- disable_validator = 1 ;
757- }
758676 else {
759- flux_log ( ctx -> h , LOG_ERR , "invalid option %s" , argv [i ]);
677+ errprintf ( error , "Invalid option: %s" , argv [i ]);
760678 errno = EINVAL ;
761- goto out ;
762- }
763- }
764- if (disable_validator ) {
765- if (ctx -> validate )
766- flux_log (ctx -> h ,
767- LOG_ERR ,
768- "Unable to disable validator at runtime" );
769- else {
770- flux_log (ctx -> h ,
771- LOG_DEBUG ,
772- "Disabling job validator" );
773- rc = 0 ;
679+ return -1 ;
774680 }
775- goto out ;
776- }
777- if (!ctx -> validate && !(ctx -> validate = workcrew_create (ctx -> h ))) {
778- flux_log_error (ctx -> h , "workcrew_create" );
779- goto out ;
780- }
781-
782- flux_log (ctx -> h ,
783- LOG_DEBUG ,
784- "configuring with plugins=%s, args=%s" ,
785- validator_plugins ,
786- validator_args );
787-
788- if (workcrew_configure (ctx -> validate ,
789- "job-validator" ,
790- validator_plugins ,
791- validator_args ) < 0 ) {
792- flux_log_error (ctx -> h , "failed to configure validator workcrew" );
793- goto out ;
794681 }
795- rc = 0 ;
796- out :
797- free (validator_plugins );
798- free (validator_args );
799- return rc ;
682+ return 0 ;
800683}
801684
802685static void reload_cb (flux_t * h ,
@@ -806,11 +689,17 @@ static void reload_cb (flux_t *h,
806689{
807690 struct job_ingest_ctx * ctx = arg ;
808691 const flux_conf_t * conf ;
809- const char * errstr = "Failed to reconfigure job-ingest" ;
810- if (flux_conf_reload_decode (msg , & conf ) < 0 )
692+ flux_error_t error ;
693+ const char * errstr = NULL ;
694+
695+ if (flux_conf_reload_decode (msg , & conf ) < 0 ) {
696+ errstr = "Failed to parse config-reload request" ;
811697 goto error ;
812- if (job_ingest_configure (ctx , conf , 0 , NULL ) < 0 )
698+ }
699+ if (job_ingest_configure (ctx , conf , 0 , NULL , & error ) < 0 ) {
700+ errstr = error .text ;
813701 goto error ;
702+ }
814703 if (flux_respond (h , msg , NULL ) < 0 )
815704 flux_log_error (h , "error responding to config-reload request" );
816705 return ;
@@ -835,10 +724,16 @@ int job_ingest_ctx_init (struct job_ingest_ctx *ctx,
835724 flux_reactor_t * r = flux_get_reactor (h );
836725 memset (ctx , 0 , sizeof (* ctx ));
837726 ctx -> h = h ;
727+ flux_error_t error ;
838728
839- if (job_ingest_configure (ctx , flux_get_conf (h ), argc , argv ) < 0 )
729+ if (!(ctx -> pipeline = pipeline_create (h ))) {
730+ flux_log_error (h , "error initializing job preprocessing pipeline" );
840731 return -1 ;
841-
732+ }
733+ if (job_ingest_configure (ctx , flux_get_conf (h ), argc , argv , & error ) < 0 ) {
734+ flux_log (h , LOG_ERR , "%s" , error .text );
735+ return -1 ;
736+ }
842737#if HAVE_FLUX_SECURITY
843738 if (!(ctx -> sec = flux_security_create (0 ))) {
844739 flux_log_error (h , "flux_security_create" );
@@ -860,14 +755,6 @@ int job_ingest_ctx_init (struct job_ingest_ctx *ctx,
860755 flux_log_error (h , "flux_timer_watcher_create" );
861756 return -1 ;
862757 }
863- if (!(ctx -> shutdown_timer = flux_timer_watcher_create (r ,
864- 0. ,
865- 0. ,
866- shutdown_timeout_cb ,
867- ctx ))) {
868- flux_log_error (h , "flux_timer_watcher_create" );
869- return -1 ;
870- }
871758 return 0 ;
872759}
873760
@@ -947,11 +834,10 @@ int mod_main (flux_t *h, int argc, char **argv)
947834done :
948835 flux_msg_handler_delvec (ctx .handlers );
949836 flux_watcher_destroy (ctx .timer );
950- flux_watcher_destroy (ctx .shutdown_timer );
951837#if HAVE_FLUX_SECURITY
952838 flux_security_destroy (ctx .sec );
953839#endif
954- workcrew_destroy (ctx .validate );
840+ pipeline_destroy (ctx .pipeline );
955841 return rc ;
956842}
957843
0 commit comments