88 * SPDX-License-Identifier: LGPL-3.0
99\************************************************************/
1010
11- /* pipeline.c - run jobspec through ingest pipeline: frob | validate */
11+ /* pipeline.c - run jobspec through ingest pipeline: frobnicator | validator */
1212
1313#if HAVE_CONFIG_H
1414#include "config.h"
2626struct pipeline {
2727 flux_t * h ;
2828 struct workcrew * validate ;
29+ struct workcrew * frobnicate ;
2930 int process_count ;
3031 flux_watcher_t * shutdown_timer ;
3132 bool validator_bypass ;
33+ bool frobnicate_enable ;
3234};
3335
3436static const char * cmd_validator = "job-validator" ;
37+ static const char * cmd_frobnicator = "job-frobnicator" ;
3538
3639
3740/* Timeout (seconds) to wait for workers to terminate when
@@ -59,14 +62,15 @@ static void shutdown_timeout_cb (flux_reactor_t *r,
5962
6063 flux_log (pl -> h ,
6164 LOG_ERR ,
62- "shutdown timed out with %d validators active" ,
65+ "shutdown timed out with %d workers active" ,
6366 pl -> process_count );
6467 flux_reactor_stop (r );
6568}
6669
6770void pipeline_shutdown (struct pipeline * pl )
6871{
6972 pl -> process_count = workcrew_stop_notify (pl -> validate , exit_cb , pl );
73+ pl -> process_count += workcrew_stop_notify (pl -> frobnicate , exit_cb , pl );
7074 if (pl -> process_count == 0 )
7175 flux_reactor_stop (flux_get_reactor (pl -> h ));
7276 else {
@@ -76,6 +80,13 @@ void pipeline_shutdown (struct pipeline *pl)
7680
7781}
7882
83+ static bool validator_bypass (struct pipeline * pl , struct job * job )
84+ {
85+ if ((pl -> validator_bypass || (job -> flags & FLUX_JOB_NOVALIDATE )))
86+ return true;
87+ return false;
88+ }
89+
7990static flux_future_t * validate_job (struct pipeline * pl ,
8091 struct job * job ,
8192 flux_error_t * error )
@@ -96,20 +107,103 @@ static flux_future_t *validate_job (struct pipeline *pl,
96107 return NULL ;
97108}
98109
110+ static flux_future_t * frobnicate_job (struct pipeline * pl ,
111+ struct job * job ,
112+ flux_error_t * error )
113+ {
114+ json_t * input ;
115+ flux_future_t * f ;
116+
117+ if (!(input = job_json_object (job , error )))
118+ return NULL ;
119+ if (!(f = workcrew_process_job (pl -> frobnicate , input ))) {
120+ errprintf (error , "Error passing job to frobnicator" );
121+ goto error ;
122+ }
123+ json_decref (input );
124+ return f ;
125+ error :
126+ ERRNO_SAFE_WRAP (json_decref , input );
127+ return NULL ;
128+ }
129+
130+ static void frobnicate_continuation (flux_future_t * f1 , void * arg )
131+ {
132+ struct pipeline * pl = arg ;
133+ struct job * job = flux_future_aux_get (f1 , "job" );
134+ const char * s ;
135+ json_t * jobspec ;
136+ const char * errmsg = NULL ;
137+ flux_error_t error ;
138+
139+ if (flux_future_get (f1 , (const void * * )& s ) < 0 ) {
140+ errmsg = future_strerror (f1 , errno );
141+ goto error ;
142+ }
143+
144+ if (!(jobspec = json_loads (s , 0 , NULL ))) {
145+ errmsg = "error decoding jobspec from frobnicator" ;
146+ errno = EINVAL ;
147+ goto error ;
148+ }
149+ json_decref (job -> jobspec );
150+ job -> jobspec = jobspec ;
151+
152+ if (!validator_bypass (pl , job )) {
153+ flux_future_t * f2 ;
154+
155+ if (!(f2 = validate_job (pl , job , & error ))) {
156+ errmsg = error .text ;
157+ goto error ;
158+ }
159+ if (flux_future_continue (f1 , f2 ) < 0 ) {
160+ flux_future_destroy (f2 );
161+ errmsg = "error continuing validator" ;
162+ goto error ;
163+ }
164+ }
165+ goto done ;
166+ error :
167+ flux_future_continue_error (f1 , errno , errmsg );
168+ done :
169+ flux_future_destroy (f1 );
170+ }
171+
172+ /* N.B. this function could be a little simpler if futures for the pipeline
173+ * stages were unconditionally chained; instead, minimize overhead for:
174+ * - frobnicator not configured
175+ * - frobnicator not configured AND validator bypassed
176+ */
99177int pipeline_process_job (struct pipeline * pl ,
100178 struct job * job ,
101179 flux_future_t * * fp ,
102180 flux_error_t * error )
103181{
104- if ((pl -> validator_bypass || (job -> flags & FLUX_JOB_NOVALIDATE ))) {
105- * fp = NULL ;
106- return 0 ;
182+ if (pl -> frobnicate_enable ) {
183+ flux_future_t * f1 ;
184+ flux_future_t * f_comp ;
185+
186+ if (!(f1 = frobnicate_job (pl , job , error ))
187+ || flux_future_aux_set (f1 , "job" , job , NULL ) < 0
188+ || !(f_comp = flux_future_and_then (f1 ,
189+ frobnicate_continuation ,
190+ pl ))) {
191+ flux_future_destroy (f1 );
192+ return -1 ;
193+ }
194+ * fp = f_comp ;
107195 }
196+ else {
197+ flux_future_t * f ;
108198
109- flux_future_t * f ;
110- if (!(f = validate_job (pl , job , error )))
111- return -1 ;
112- * fp = f ;
199+ if (validator_bypass (pl , job ))
200+ * fp = NULL ;
201+ else {
202+ if (!(f = validate_job (pl , job , error )))
203+ return -1 ;
204+ * fp = f ;
205+ }
206+ }
113207 return 0 ;
114208}
115209
@@ -182,6 +276,9 @@ int pipeline_configure (struct pipeline *pl,
182276 json_t * ingest = NULL ;
183277 char * validator_plugins = NULL ;
184278 char * validator_args = NULL ;
279+ char * frobnicator_plugins = NULL ;
280+ char * frobnicator_args = NULL ;
281+ int rc = -1 ;
185282
186283 /* Process toml
187284 */
@@ -201,6 +298,13 @@ int pipeline_configure (struct pipeline *pl,
201298 & pl -> validator_bypass ,
202299 error ) < 0 )
203300 return -1 ;
301+ if (unpack_ingest_subtable (ingest ,
302+ "frobnicator" ,
303+ & frobnicator_plugins ,
304+ & frobnicator_args ,
305+ NULL ,
306+ error ) < 0 )
307+ return -1 ;
204308
205309 /* Process module command line
206310 */
@@ -217,6 +321,20 @@ int pipeline_configure (struct pipeline *pl,
217321 pl -> validator_bypass = true;
218322 }
219323
324+ if (frobnicator_plugins && strlen (frobnicator_plugins ) > 0 )
325+ pl -> frobnicate_enable = true;
326+ else
327+ pl -> frobnicate_enable = false;
328+ if (workcrew_configure (pl -> frobnicate ,
329+ cmd_frobnicator ,
330+ frobnicator_plugins ,
331+ frobnicator_args ) < 0 ) {
332+ errprintf (error ,
333+ "Error (re-)configuring frobnicator workcrew: %s" ,
334+ strerror (errno ));
335+ goto error ;
336+ }
337+
220338 // Checked for by t2111-job-ingest-config.t
221339 flux_log (pl -> h ,
222340 LOG_DEBUG ,
@@ -233,23 +351,25 @@ int pipeline_configure (struct pipeline *pl,
233351 strerror (errno ));
234352 goto error ;
235353 }
236-
237- free (validator_plugins );
238- free (validator_args );
239- return 0 ;
354+ rc = 0 ;
240355error :
241356 ERRNO_SAFE_WRAP (free , validator_plugins );
242357 ERRNO_SAFE_WRAP (free , validator_args );
243- return -1 ;
358+ ERRNO_SAFE_WRAP (free , frobnicator_plugins );
359+ ERRNO_SAFE_WRAP (free , frobnicator_args );
360+ return rc ;
244361}
245362
246363json_t * pipeline_stats_get (struct pipeline * pl )
247364{
248365 json_t * o = NULL ;
249366 if (pl ) {
367+ json_t * fo = workcrew_stats_get (pl -> frobnicate );
250368 json_t * vo = workcrew_stats_get (pl -> validate );
251- o = json_pack ("{s:O}" ,
369+ o = json_pack ("{s:O s:O}" ,
370+ "frobnicator" , fo ,
252371 "validator" , vo );
372+ json_decref (fo );
253373 json_decref (vo );
254374 }
255375 return o ? o : json_null ();
@@ -260,6 +380,7 @@ void pipeline_destroy (struct pipeline *pl)
260380 if (pl ) {
261381 int saved_errno = errno ;
262382 workcrew_destroy (pl -> validate );
383+ workcrew_destroy (pl -> frobnicate );
263384 flux_watcher_destroy (pl -> shutdown_timer );
264385 free (pl );
265386 errno = saved_errno ;
@@ -283,6 +404,9 @@ struct pipeline *pipeline_create (flux_t *h)
283404 if (!(pl -> validate = workcrew_create (pl -> h ))
284405 || workcrew_configure (pl -> validate , cmd_validator , NULL , NULL ) < 0 )
285406 goto error ;
407+ if (!(pl -> frobnicate = workcrew_create (pl -> h ))
408+ || workcrew_configure (pl -> frobnicate , cmd_frobnicator , NULL , NULL ) < 0 )
409+ goto error ;
286410 return pl ;
287411error :
288412 pipeline_destroy (pl );
0 commit comments