10
10
import json
11
11
import copy
12
12
from typing import (Any , Callable , Dict , Iterable , List , Mapping , Optional ,
13
- Text , Tuple , Union , cast )
13
+ Text , Tuple , Union , cast )
14
14
15
15
import requests .sessions
16
- from six import itervalues , string_types
16
+ from six import iteritems , itervalues , string_types
17
17
from six .moves import urllib
18
18
19
- import schema_salad .schema as schema
20
- from avro .schema import Names
21
19
from ruamel .yaml .comments import CommentedMap , CommentedSeq
20
+ import schema_salad .schema as schema
22
21
from schema_salad .ref_resolver import ContextType , Fetcher , Loader , file_uri
23
22
from schema_salad .sourceline import cmap , SourceLine
24
23
from schema_salad .validate import ValidationException
55
54
56
55
FetcherConstructorType = Callable [[Dict [Text , Union [Text , bool ]],
57
56
requests .sessions .Session ], Fetcher ]
57
+ ResolverType = Callable [[Loader , Union [Text , Dict [Text , Any ]]], Text ]
58
58
59
59
loaders = {} # type: Dict[FetcherConstructorType, Loader]
60
60
61
61
def default_loader (fetcher_constructor ):
62
62
# type: (Optional[FetcherConstructorType]) -> Loader
63
63
if fetcher_constructor in loaders :
64
64
return loaders [fetcher_constructor ]
65
- else :
66
- loader = Loader (jobloaderctx , fetcher_constructor = fetcher_constructor )
67
- loaders [fetcher_constructor ] = loader
68
- return loader
65
+ loader = Loader (jobloaderctx , fetcher_constructor = fetcher_constructor )
66
+ loaders [fetcher_constructor ] = loader
67
+ return loader
69
68
70
69
def resolve_tool_uri (argsworkflow , # type: Text
71
- resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
70
+ resolver = None , # type: ResolverType
72
71
fetcher_constructor = None , # type: FetcherConstructorType
73
72
document_loader = None # type: Loader
74
- ): # type: (...) -> Tuple[Text, Text]
73
+ ): # type: (...) -> Tuple[Text, Text]
75
74
76
75
uri = None # type: Text
77
76
split = urllib .parse .urlsplit (argsworkflow )
@@ -107,7 +106,7 @@ def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]]
107
106
workflowobj = None # type: CommentedMap
108
107
if isinstance (argsworkflow , string_types ):
109
108
uri , fileuri = resolve_tool_uri (argsworkflow , resolver = resolver ,
110
- document_loader = document_loader )
109
+ document_loader = document_loader )
111
110
workflowobj = document_loader .fetch (fileuri )
112
111
elif isinstance (argsworkflow , dict ):
113
112
uri = "#" + Text (id (argsworkflow ))
@@ -128,11 +127,12 @@ def _convert_stdstreams_to_files(workflowobj):
128
127
outputs = workflowobj .get ('outputs' , [])
129
128
if not isinstance (outputs , CommentedSeq ):
130
129
raise ValidationException ('"outputs" section is not '
131
- 'valid.' )
130
+ 'valid.' )
132
131
for out in workflowobj .get ('outputs' , []):
133
- if type (out ) is not CommentedMap :
134
- raise ValidationException ("Output '{}' is not a "
135
- "valid OutputParameter." .format (out ))
132
+ if not isinstance (out , CommentedMap ):
133
+ raise ValidationException (
134
+ "Output '{}' is not a valid "
135
+ "OutputParameter." .format (out ))
136
136
for streamtype in ['stdout' , 'stderr' ]:
137
137
if out .get ('type' ) == streamtype :
138
138
if 'outputBinding' in out :
@@ -142,8 +142,11 @@ def _convert_stdstreams_to_files(workflowobj):
142
142
if streamtype in workflowobj :
143
143
filename = workflowobj [streamtype ]
144
144
else :
145
- filename = Text (hashlib .sha1 (json .dumps (workflowobj ,
146
- sort_keys = True ).encode ('utf-8' )).hexdigest ())
145
+ filename = Text (
146
+ hashlib .sha1 (json .dumps (workflowobj ,
147
+ sort_keys = True
148
+ ).encode ('utf-8' )
149
+ ).hexdigest ())
147
150
workflowobj [streamtype ] = filename
148
151
out ['type' ] = 'File'
149
152
out ['outputBinding' ] = cmap ({'glob' : filename })
@@ -174,9 +177,9 @@ def _add_blank_ids(workflowobj):
174
177
175
178
if isinstance (workflowobj , dict ):
176
179
if ("run" in workflowobj and
177
- isinstance (workflowobj ["run" ], dict ) and
178
- "id" not in workflowobj ["run" ] and
179
- "$import" not in workflowobj ["run" ]):
180
+ isinstance (workflowobj ["run" ], dict ) and
181
+ "id" not in workflowobj ["run" ] and
182
+ "$import" not in workflowobj ["run" ]):
180
183
workflowobj ["run" ]["id" ] = Text (uuid .uuid4 ())
181
184
for entry in itervalues (workflowobj ):
182
185
_add_blank_ids (entry )
@@ -195,8 +198,8 @@ def validate_document(document_loader, # type: Loader
195
198
overrides = None , # type: List[Dict]
196
199
metadata = None , # type: Optional[Dict]
197
200
do_validate = True
198
- ):
199
- # type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
201
+ ):
202
+ # type: (...) -> Tuple[Loader, schema. Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
200
203
"""Validate a CWL document."""
201
204
202
205
if isinstance (workflowobj , list ):
@@ -205,7 +208,8 @@ def validate_document(document_loader, # type: Loader
205
208
}, fn = uri )
206
209
207
210
if not isinstance (workflowobj , dict ):
208
- raise ValueError ("workflowjobj must be a dict, got '%s': %s" % (type (workflowobj ), workflowobj ))
211
+ raise ValueError ("workflowjobj must be a dict, got '{}': {}" .format (
212
+ type (workflowobj ), workflowobj ))
209
213
210
214
jobobj = None
211
215
if "cwl:tool" in workflowobj :
@@ -220,6 +224,33 @@ def validate_document(document_loader, # type: Loader
220
224
221
225
workflowobj = fetch_document (uri , fetcher_constructor = fetcher_constructor )[1 ]
222
226
227
+ def var_spool_cwl_detector (obj , # type: Union[Mapping, Iterable, Text]
228
+ item = None , # type: Optional[Any]
229
+ obj_key = None , # type: Optional[Any]
230
+ ): # type: (...)->None
231
+ """ Detects any textual reference to /var/spool/cwl. """
232
+ if isinstance (obj , string_types ):
233
+ if "var/spool/cwl" in obj :
234
+ message = SourceLine (
235
+ item = item , key = obj_key , raise_type = Text ,
236
+ include_traceback = _logger .isEnabledFor (logging .DEBUG )).makeError (
237
+ "Non-portable reference to /var/spool/cwl found: "
238
+ "'{}'.\n Replace with /var/spool/cwl/ with "
239
+ "$(runtime.outdir)." .format (obj ))
240
+ if not strict :
241
+ _logger .warning (message )
242
+ else :
243
+ raise ValidationException (message )
244
+ else :
245
+ return
246
+ elif isinstance (obj , Mapping ):
247
+ for key , value in iteritems (obj ):
248
+ var_spool_cwl_detector (value , obj , key )
249
+ elif isinstance (obj , Iterable ):
250
+ for element in obj :
251
+ var_spool_cwl_detector (element , obj , None )
252
+ var_spool_cwl_detector (workflowobj )
253
+
223
254
fileuri = urllib .parse .urldefrag (uri )[0 ]
224
255
if "cwlVersion" not in workflowobj :
225
256
if metadata and 'cwlVersion' in metadata :
@@ -233,7 +264,10 @@ def validate_document(document_loader, # type: Loader
233
264
"will need to be upgraded first." )
234
265
235
266
if not isinstance (workflowobj ["cwlVersion" ], (str , Text )):
236
- raise Exception ("'cwlVersion' must be a string, got %s" % type (workflowobj ["cwlVersion" ]))
267
+ with SourceLine (workflowobj , "cwlVersion" , ValidationException ):
268
+ raise ValidationException ("'cwlVersion' must be a string, "
269
+ "got {}" .format (
270
+ type (workflowobj ["cwlVersion" ])))
237
271
# strip out version
238
272
workflowobj ["cwlVersion" ] = re .sub (
239
273
r"^(?:cwl:|https://w3id.org/cwl/cwl#)" , "" ,
@@ -246,8 +280,10 @@ def validate_document(document_loader, # type: Loader
246
280
version += " (with --enable-dev flag only)"
247
281
versions .append (version )
248
282
versions .sort ()
249
- raise ValidationException ("The CWL reference runner no longer supports pre CWL v1.0 documents. "
250
- "Supported versions are: \n {}" .format ("\n " .join (versions )))
283
+ raise ValidationException (
284
+ "The CWL reference runner no longer supports pre CWL v1.0 "
285
+ "documents. Supported versions are: "
286
+ "\n {}" .format ("\n " .join (versions )))
251
287
252
288
(sch_document_loader , avsc_names ) = \
253
289
process .get_schema (workflowobj ["cwlVersion" ])[:2 ]
@@ -263,7 +299,8 @@ def validate_document(document_loader, # type: Loader
263
299
_add_blank_ids (workflowobj )
264
300
265
301
workflowobj ["id" ] = fileuri
266
- processobj , new_metadata = document_loader .resolve_all (workflowobj , fileuri , checklinks = do_validate )
302
+ processobj , new_metadata = document_loader .resolve_all (
303
+ workflowobj , fileuri , checklinks = do_validate )
267
304
if not isinstance (processobj , (CommentedMap , CommentedSeq )):
268
305
raise ValidationException ("Workflow must be a dict or list." )
269
306
@@ -295,13 +332,12 @@ def validate_document(document_loader, # type: Loader
295
332
296
333
297
334
def make_tool (document_loader , # type: Loader
298
- avsc_names , # type: Names
299
- metadata , # type: Dict[Text, Any]
300
- uri , # type: Text
301
- makeTool , # type: Callable[..., Process]
302
- kwargs # type: dict
303
- ):
304
- # type: (...) -> Process
335
+ avsc_names , # type: schema.Names
336
+ metadata , # type: Dict[Text, Any]
337
+ uri , # type: Text
338
+ makeTool , # type: Callable[..., Process]
339
+ kwargs # type: Dict
340
+ ): # type: (...) -> Process
305
341
"""Make a Python CWL object."""
306
342
resolveduri = document_loader .resolve_ref (uri )[0 ]
307
343
@@ -340,28 +376,30 @@ def make_tool(document_loader, # type: Loader
340
376
return tool
341
377
342
378
343
- def load_tool (argsworkflow , # type: Union[Text, Dict[Text, Any]]
344
- makeTool , # type: Callable[..., Process]
345
- kwargs = None , # type: Dict
346
- enable_dev = False , # type: bool
347
- strict = True , # type: bool
348
- resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
379
+ def load_tool (argsworkflow , # type: Union[Text, Dict[Text, Any]]
380
+ makeTool , # type: Callable[..., Process]
381
+ kwargs = None , # type: Dict
382
+ enable_dev = False , # type: bool
383
+ strict = True , # type: bool
384
+ resolver = None , # type: ResolverType
349
385
fetcher_constructor = None , # type: FetcherConstructorType
350
386
overrides = None
351
- ):
352
- # type: (...) -> Process
387
+ ): # type: (...) -> Process
353
388
354
- document_loader , workflowobj , uri = fetch_document (argsworkflow , resolver = resolver ,
355
- fetcher_constructor = fetcher_constructor )
356
- document_loader , avsc_names , processobj , metadata , uri = validate_document (
389
+ document_loader , workflowobj , uri = fetch_document (
390
+ argsworkflow , resolver = resolver , fetcher_constructor = fetcher_constructor )
391
+ document_loader , avsc_names , _ , metadata , uri = validate_document (
357
392
document_loader , workflowobj , uri , enable_dev = enable_dev ,
358
393
strict = strict , fetcher_constructor = fetcher_constructor ,
359
394
overrides = overrides , metadata = kwargs .get ('metadata' , None )
360
395
if kwargs else None )
361
396
return make_tool (document_loader , avsc_names , metadata , uri ,
362
397
makeTool , kwargs if kwargs else {})
363
398
364
- def resolve_overrides (ov , ov_uri , baseurl ): # type: (CommentedMap, Text, Text) -> List[Dict[Text, Any]]
399
+ def resolve_overrides (ov , # Type: CommentedMap
400
+ ov_uri , # Type: Text
401
+ baseurl # type: Text
402
+ ): # type: (...) -> List[Dict[Text, Any]]
365
403
ovloader = Loader (overrides_ctx )
366
404
ret , _ = ovloader .resolve_all (ov , baseurl )
367
405
if not isinstance (ret , CommentedMap ):
0 commit comments