@@ -28,8 +28,12 @@ struct pipeline {
2828 struct workcrew * validate ;
2929 int process_count ;
3030 flux_watcher_t * shutdown_timer ;
31+ bool validator_bypass ;
3132};
3233
34+ static const char * cmd_validator = "job-validator" ;
35+
36+
3337/* Timeout (seconds) to wait for workers to terminate when
3438 * stopped by closing their stdin. If the timer expires, stop the reactor
3539 * and allow workcrew_destroy() to signal them.
@@ -97,15 +101,15 @@ int pipeline_process_job (struct pipeline *pl,
97101 flux_future_t * * fp ,
98102 flux_error_t * error )
99103{
100- if ((! pl -> validate || (job -> flags & FLUX_JOB_NOVALIDATE ))) {
104+ if ((pl -> validator_bypass || (job -> flags & FLUX_JOB_NOVALIDATE ))) {
101105 * fp = NULL ;
106+ return 0 ;
102107 }
103- else {
104- flux_future_t * f ;
105- if (!(f = validate_job (pl , job , error )))
106- return -1 ;
107- * fp = f ;
108- }
108+
109+ flux_future_t * f ;
110+ if (!(f = validate_job (pl , job , error )))
111+ return -1 ;
112+ * fp = f ;
109113 return 0 ;
110114}
111115
@@ -139,6 +143,11 @@ static int unpack_ingest_subtable (json_t *o,
139143 errno = EINVAL ;
140144 return -1 ;
141145 }
146+ if (!disablep && disable ) {
147+ errprintf (error , "[ingest.%s]: 'disable' key is unknown" , name );
148+ errno = EINVAL ;
149+ return -1 ;
150+ }
142151 if (op ) {
143152 if (!(plugins = util_join_arguments (op ))) {
144153 errprintf (error , "error in [ingest.%s] plugins array" , name );
@@ -154,7 +163,8 @@ static int unpack_ingest_subtable (json_t *o,
154163 }
155164 * pluginsp = plugins ;
156165 * argsp = args ;
157- * disablep = disable ? true : false;
166+ if (disablep )
167+ * disablep = disable ? true : false;
158168 return 0 ;
159169error :
160170 ERRNO_SAFE_WRAP (free , args );
@@ -172,7 +182,6 @@ int pipeline_configure (struct pipeline *pl,
172182 json_t * ingest = NULL ;
173183 char * validator_plugins = NULL ;
174184 char * validator_args = NULL ;
175- bool disable_validator = false;
176185
177186 /* Process toml
178187 */
@@ -189,7 +198,7 @@ int pipeline_configure (struct pipeline *pl,
189198 "validator" ,
190199 & validator_plugins ,
191200 & validator_args ,
192- & disable_validator ,
201+ & pl -> validator_bypass ,
193202 error ) < 0 )
194203 return -1 ;
195204
@@ -205,44 +214,24 @@ int pipeline_configure (struct pipeline *pl,
205214 validator_plugins = strdup (argv [i ] + 18 );
206215 }
207216 else if (!strcmp (argv [i ], "disable-validator" ))
208- disable_validator = 1 ;
217+ pl -> validator_bypass = true ;
209218 }
210219
211- /* Take action on configuration update.
212- */
213- if (disable_validator ) {
214- if (pl -> validate ) {
215- errprintf (error , "Unable to disable validator at runtime" );
216- errno = EINVAL ;
217- goto error ;
218- }
219- // Checked for by t2111-job-ingest-config.t
220- flux_log (pl -> h , LOG_DEBUG , "Disabling job validator" );
221- }
222- else {
223- if (!pl -> validate ) {
224- if (!(pl -> validate = workcrew_create (pl -> h ))) {
225- errprintf (error ,
226- "Error creating validator workcrew: %s" ,
227- strerror (errno ));
228- goto error ;
229- }
230- }
231- // Checked for by t2111-job-ingest-config.t
232- flux_log (pl -> h ,
233- LOG_DEBUG ,
234- "configuring with plugins=%s, args=%s" ,
235- validator_plugins ,
236- validator_args );
237- if (workcrew_configure (pl -> validate ,
238- "job-validator" ,
239- validator_plugins ,
240- validator_args ) < 0 ) {
241- errprintf (error ,
242- "Error (re-)configuring validator workcrew: %s" ,
243- strerror (errno ));
244- goto error ;
245- }
220+ // Checked for by t2111-job-ingest-config.t
221+ flux_log (pl -> h ,
222+ LOG_DEBUG ,
223+ "configuring validator with plugins=%s, args=%s (%s)" ,
224+ validator_plugins ,
225+ validator_args ,
226+ pl -> validator_bypass ? "disabled" : "enabled" );
227+ if (workcrew_configure (pl -> validate ,
228+ cmd_validator ,
229+ validator_plugins ,
230+ validator_args ) < 0 ) {
231+ errprintf (error ,
232+ "Error (re-)configuring validator workcrew: %s" ,
233+ strerror (errno ));
234+ goto error ;
246235 }
247236
248237 free (validator_plugins );
@@ -291,6 +280,9 @@ struct pipeline *pipeline_create (flux_t *h)
291280 shutdown_timeout_cb ,
292281 pl )))
293282 goto error ;
283+ if (!(pl -> validate = workcrew_create (pl -> h ))
284+ || workcrew_configure (pl -> validate , cmd_validator , NULL , NULL ) < 0 )
285+ goto error ;
294286 return pl ;
295287error :
296288 pipeline_destroy (pl );
0 commit comments