@@ -166,8 +166,10 @@ def static_checker(
166
166
for parm in src_parms :
167
167
src_dict [parm ["id" ]] = parm
168
168
169
- step_inputs_val = check_all_types (src_dict , step_inputs , "source" )
170
- workflow_outputs_val = check_all_types (src_dict , workflow_outputs , "outputSource" )
169
+ step_inputs_val = check_all_types (src_dict , step_inputs , "source" , param_to_step )
170
+ workflow_outputs_val = check_all_types (
171
+ src_dict , workflow_outputs , "outputSource" , param_to_step
172
+ )
171
173
172
174
warnings = step_inputs_val ["warning" ] + workflow_outputs_val ["warning" ]
173
175
exceptions = step_inputs_val ["exception" ] + workflow_outputs_val ["exception" ]
@@ -212,18 +214,21 @@ def static_checker(
212
214
"%s\n %s" % (msg1 , bullets ([msg3 , msg4 , msg5 ], " " ))
213
215
)
214
216
elif sink .get ("not_connected" ):
215
- msg = SourceLine (sink , "type" ).makeError (
216
- "'%s' is not an input parameter of %s, expected %s"
217
- % (
218
- shortname (sink ["id" ]),
219
- param_to_step [sink ["id" ]]["run" ],
220
- ", " .join (
221
- shortname (s ["id" ])
222
- for s in param_to_step [sink ["id" ]]["inputs" ]
223
- if not s .get ("not_connected" )
224
- ),
217
+ if not sink .get ("used_by_step" ):
218
+ msg = SourceLine (sink , "type" ).makeError (
219
+ "'%s' is not an input parameter of %s, expected %s"
220
+ % (
221
+ shortname (sink ["id" ]),
222
+ param_to_step [sink ["id" ]]["run" ],
223
+ ", " .join (
224
+ shortname (s ["id" ])
225
+ for s in param_to_step [sink ["id" ]]["inputs" ]
226
+ if not s .get ("not_connected" )
227
+ ),
228
+ )
225
229
)
226
- )
230
+ else :
231
+ msg = ""
227
232
else :
228
233
msg = (
229
234
SourceLine (src , "type" ).makeError (
@@ -241,11 +246,17 @@ def static_checker(
241
246
" source has linkMerge method %s" % linkMerge
242
247
)
243
248
244
- warning_msgs .append (msg )
249
+ if warning .message is not None :
250
+ msg += "\n " + SourceLine (sink ).makeError (" " + warning .message )
251
+
252
+ if msg :
253
+ warning_msgs .append (msg )
254
+
245
255
for exception in exceptions :
246
256
src = exception .src
247
257
sink = exception .sink
248
258
linkMerge = exception .linkMerge
259
+ extra_message = exception .message
249
260
msg = (
250
261
SourceLine (src , "type" ).makeError (
251
262
"Source '%s' of type %s is incompatible"
@@ -257,6 +268,9 @@ def static_checker(
257
268
% (shortname (sink ["id" ]), json_dumps (sink ["type" ]))
258
269
)
259
270
)
271
+ if extra_message is not None :
272
+ msg += "\n " + SourceLine (sink ).makeError (" " + extra_message )
273
+
260
274
if linkMerge is not None :
261
275
msg += "\n " + SourceLine (sink ).makeError (
262
276
" source has linkMerge method %s" % linkMerge
@@ -278,19 +292,19 @@ def static_checker(
278
292
exception_msgs .append (msg )
279
293
280
294
all_warning_msg = strip_dup_lineno ("\n " .join (warning_msgs ))
281
- all_exception_msg = strip_dup_lineno ("\n " .join (exception_msgs ))
295
+ all_exception_msg = strip_dup_lineno ("\n " + " \n " .join (exception_msgs ))
282
296
283
- if warnings :
297
+ if all_warning_msg :
284
298
_logger .warning ("Workflow checker warning:\n %s" , all_warning_msg )
285
299
if exceptions :
286
300
raise validate .ValidationException (all_exception_msg )
287
301
288
302
289
- SrcSink = namedtuple ("SrcSink" , ["src" , "sink" , "linkMerge" ])
303
+ SrcSink = namedtuple ("SrcSink" , ["src" , "sink" , "linkMerge" , "message" ])
290
304
291
305
292
- def check_all_types (src_dict , sinks , sourceField ):
293
- # type: (Dict[str, Any], List[Dict[str, Any]], str) -> Dict[str, List[SrcSink]]
306
+ def check_all_types (src_dict , sinks , sourceField , param_to_step ):
307
+ # type: (Dict[str, Any], List[Dict[str, Any]], str, Dict[str, Dict[str, Any]] ) -> Dict[str, List[SrcSink]]
294
308
"""
295
309
Given a list of sinks, check if their types match with the types of their sources.
296
310
@@ -299,21 +313,94 @@ def check_all_types(src_dict, sinks, sourceField):
299
313
validation = {"warning" : [], "exception" : []} # type: Dict[str, List[SrcSink]]
300
314
for sink in sinks :
301
315
if sourceField in sink :
316
+
302
317
valueFrom = sink .get ("valueFrom" )
318
+ pickValue = sink .get ("pickValue" )
319
+
320
+ extra_message = None
321
+ if pickValue is not None :
322
+ extra_message = "pickValue is: %s" % pickValue
323
+
303
324
if isinstance (sink [sourceField ], MutableSequence ):
304
- srcs_of_sink = [src_dict [parm_id ] for parm_id in sink [sourceField ]]
305
325
linkMerge = sink .get (
306
326
"linkMerge" ,
307
327
("merge_nested" if len (sink [sourceField ]) > 1 else None ),
308
328
)
329
+
330
+ if pickValue in ["first_non_null" , "only_non_null" ]:
331
+ linkMerge = None
332
+
333
+ srcs_of_sink = [] # type: List[Any]
334
+ for parm_id in sink [sourceField ]:
335
+ srcs_of_sink += [src_dict [parm_id ]]
336
+ if (
337
+ is_conditional_step (param_to_step , parm_id )
338
+ and pickValue is None
339
+ ):
340
+ validation ["warning" ].append (
341
+ SrcSink (
342
+ src_dict [parm_id ],
343
+ sink ,
344
+ linkMerge ,
345
+ message = "Source is from conditional step, but pickValue is not used" ,
346
+ )
347
+ )
309
348
else :
310
349
parm_id = sink [sourceField ]
311
350
srcs_of_sink = [src_dict [parm_id ]]
312
351
linkMerge = None
352
+
353
+ if pickValue is not None :
354
+ validation ["warning" ].append (
355
+ SrcSink (
356
+ src_dict [parm_id ],
357
+ sink ,
358
+ linkMerge ,
359
+ message = "pickValue is used but only a single input source is declared" ,
360
+ )
361
+ )
362
+
363
+ if is_conditional_step (param_to_step , parm_id ):
364
+ src_typ = srcs_of_sink [0 ]["type" ]
365
+ snk_typ = sink ["type" ]
366
+
367
+ if not isinstance (src_typ , list ):
368
+ src_typ = [src_typ ]
369
+ if "null" not in src_typ :
370
+ src_typ = ["null" ] + src_typ
371
+
372
+ if (
373
+ "null" not in snk_typ
374
+ ): # Given our type names this works even if not a list
375
+ validation ["warning" ].append (
376
+ SrcSink (
377
+ src_dict [parm_id ],
378
+ sink ,
379
+ linkMerge ,
380
+ message = "Source is from conditional step and may produce `null`" ,
381
+ )
382
+ )
383
+
384
+ srcs_of_sink [0 ]["type" ] = src_typ
385
+
313
386
for src in srcs_of_sink :
314
387
check_result = check_types (src , sink , linkMerge , valueFrom )
315
388
if check_result == "warning" :
316
- validation ["warning" ].append (SrcSink (src , sink , linkMerge ))
389
+ validation ["warning" ].append (
390
+ SrcSink (src , sink , linkMerge , message = extra_message )
391
+ )
317
392
elif check_result == "exception" :
318
- validation ["exception" ].append (SrcSink (src , sink , linkMerge ))
393
+ validation ["exception" ].append (
394
+ SrcSink (src , sink , linkMerge , message = extra_message )
395
+ )
396
+
319
397
return validation
398
+
399
+
400
+ def is_conditional_step (param_to_step : Dict [str , Dict [str , Any ]],
401
+ parm_id : str ) -> bool :
402
+ source_step = param_to_step .get (parm_id )
403
+ if source_step is not None :
404
+ if source_step .get ("when" ) is not None :
405
+ return True
406
+ return False
0 commit comments