10
10
import json
11
11
import copy
12
12
from typing import (Any , Callable , Dict , Iterable , List , Mapping , Optional ,
13
- Text , Tuple , Union , cast )
13
+ Text , Tuple , Union , cast )
14
14
15
15
import requests .sessions
16
16
from six import iteritems , itervalues , string_types
17
17
from six .moves import urllib
18
18
19
- import schema_salad .schema as schema
20
19
from avro .schema import Names
21
20
from ruamel .yaml .comments import CommentedMap , CommentedSeq
21
+ import schema_salad .schema as schema
22
22
from schema_salad .ref_resolver import ContextType , Fetcher , Loader , file_uri
23
23
from schema_salad .sourceline import cmap , SourceLine
24
24
from schema_salad .validate import ValidationException
55
55
56
56
FetcherConstructorType = Callable [[Dict [Text , Union [Text , bool ]],
57
57
requests .sessions .Session ], Fetcher ]
58
+ ResolverType = Callable [[Loader , Union [Text , Dict [Text , Any ]]], Text ]
58
59
59
60
loaders = {} # type: Dict[FetcherConstructorType, Loader]
60
61
61
62
def default_loader (fetcher_constructor ):
62
63
# type: (Optional[FetcherConstructorType]) -> Loader
63
64
if fetcher_constructor in loaders :
64
65
return loaders [fetcher_constructor ]
65
- else :
66
- loader = Loader (jobloaderctx , fetcher_constructor = fetcher_constructor )
67
- loaders [fetcher_constructor ] = loader
68
- return loader
66
+ loader = Loader (jobloaderctx , fetcher_constructor = fetcher_constructor )
67
+ loaders [fetcher_constructor ] = loader
68
+ return loader
69
69
70
70
def resolve_tool_uri (argsworkflow , # type: Text
71
- resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
71
+ resolver = None , # type: ResolverType
72
72
fetcher_constructor = None , # type: FetcherConstructorType
73
73
document_loader = None # type: Loader
74
- ): # type: (...) -> Tuple[Text, Text]
74
+ ): # type: (...) -> Tuple[Text, Text]
75
75
76
76
uri = None # type: Text
77
77
split = urllib .parse .urlsplit (argsworkflow )
@@ -107,7 +107,7 @@ def fetch_document(argsworkflow, # type: Union[Text, Dict[Text, Any]]
107
107
workflowobj = None # type: CommentedMap
108
108
if isinstance (argsworkflow , string_types ):
109
109
uri , fileuri = resolve_tool_uri (argsworkflow , resolver = resolver ,
110
- document_loader = document_loader )
110
+ document_loader = document_loader )
111
111
workflowobj = document_loader .fetch (fileuri )
112
112
elif isinstance (argsworkflow , dict ):
113
113
uri = "#" + Text (id (argsworkflow ))
@@ -128,11 +128,12 @@ def _convert_stdstreams_to_files(workflowobj):
128
128
outputs = workflowobj .get ('outputs' , [])
129
129
if not isinstance (outputs , CommentedSeq ):
130
130
raise ValidationException ('"outputs" section is not '
131
- 'valid.' )
131
+ 'valid.' )
132
132
for out in workflowobj .get ('outputs' , []):
133
- if type (out ) is not CommentedMap :
134
- raise ValidationException ("Output '{}' is not a "
135
- "valid OutputParameter." .format (out ))
133
+ if not isinstance (out , CommentedMap ):
134
+ raise ValidationException (
135
+ "Output '{}' is not a valid "
136
+ "OutputParameter." .format (out ))
136
137
for streamtype in ['stdout' , 'stderr' ]:
137
138
if out .get ('type' ) == streamtype :
138
139
if 'outputBinding' in out :
@@ -142,8 +143,11 @@ def _convert_stdstreams_to_files(workflowobj):
142
143
if streamtype in workflowobj :
143
144
filename = workflowobj [streamtype ]
144
145
else :
145
- filename = Text (hashlib .sha1 (json .dumps (workflowobj ,
146
- sort_keys = True ).encode ('utf-8' )).hexdigest ())
146
+ filename = Text (
147
+ hashlib .sha1 (json .dumps (workflowobj ,
148
+ sort_keys = True
149
+ ).encode ('utf-8' )
150
+ ).hexdigest ())
147
151
workflowobj [streamtype ] = filename
148
152
out ['type' ] = 'File'
149
153
out ['outputBinding' ] = cmap ({'glob' : filename })
@@ -174,9 +178,9 @@ def _add_blank_ids(workflowobj):
174
178
175
179
if isinstance (workflowobj , dict ):
176
180
if ("run" in workflowobj and
177
- isinstance (workflowobj ["run" ], dict ) and
178
- "id" not in workflowobj ["run" ] and
179
- "$import" not in workflowobj ["run" ]):
181
+ isinstance (workflowobj ["run" ], dict ) and
182
+ "id" not in workflowobj ["run" ] and
183
+ "$import" not in workflowobj ["run" ]):
180
184
workflowobj ["run" ]["id" ] = Text (uuid .uuid4 ())
181
185
for entry in itervalues (workflowobj ):
182
186
_add_blank_ids (entry )
@@ -195,7 +199,7 @@ def validate_document(document_loader, # type: Loader
195
199
overrides = None , # type: List[Dict]
196
200
metadata = None , # type: Optional[Dict]
197
201
do_validate = True
198
- ):
202
+ ):
199
203
# type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
200
204
"""Validate a CWL document."""
201
205
@@ -205,7 +209,8 @@ def validate_document(document_loader, # type: Loader
205
209
}, fn = uri )
206
210
207
211
if not isinstance (workflowobj , dict ):
208
- raise ValueError ("workflowjobj must be a dict, got '%s': %s" % (type (workflowobj ), workflowobj ))
212
+ raise ValueError ("workflowjobj must be a dict, got '{}': {}" .format (
213
+ type (workflowobj ), workflowobj ))
209
214
210
215
jobobj = None
211
216
if "cwl:tool" in workflowobj :
@@ -260,7 +265,10 @@ def var_spool_cwl_detector(obj, # type: Union[Mapping, Iterable, Text]
260
265
"will need to be upgraded first." )
261
266
262
267
if not isinstance (workflowobj ["cwlVersion" ], (str , Text )):
263
- raise Exception ("'cwlVersion' must be a string, got %s" % type (workflowobj ["cwlVersion" ]))
268
+ with SourceLine (workflowobj , "cwlVersion" , ValidationException ):
269
+ raise ValidationException ("'cwlVersion' must be a string, "
270
+ "got {}" .format (
271
+ type (workflowobj ["cwlVersion" ])))
264
272
# strip out version
265
273
workflowobj ["cwlVersion" ] = re .sub (
266
274
r"^(?:cwl:|https://w3id.org/cwl/cwl#)" , "" ,
@@ -273,8 +281,10 @@ def var_spool_cwl_detector(obj, # type: Union[Mapping, Iterable, Text]
273
281
version += " (with --enable-dev flag only)"
274
282
versions .append (version )
275
283
versions .sort ()
276
- raise ValidationException ("The CWL reference runner no longer supports pre CWL v1.0 documents. "
277
- "Supported versions are: \n {}" .format ("\n " .join (versions )))
284
+ raise ValidationException (
285
+ "The CWL reference runner no longer supports pre CWL v1.0 "
286
+ "documents. Supported versions are: "
287
+ "\n {}" .format ("\n " .join (versions )))
278
288
279
289
(sch_document_loader , avsc_names ) = \
280
290
process .get_schema (workflowobj ["cwlVersion" ])[:2 ]
@@ -290,7 +300,8 @@ def var_spool_cwl_detector(obj, # type: Union[Mapping, Iterable, Text]
290
300
_add_blank_ids (workflowobj )
291
301
292
302
workflowobj ["id" ] = fileuri
293
- processobj , new_metadata = document_loader .resolve_all (workflowobj , fileuri , checklinks = do_validate )
303
+ processobj , new_metadata = document_loader .resolve_all (
304
+ workflowobj , fileuri , checklinks = do_validate )
294
305
if not isinstance (processobj , (CommentedMap , CommentedSeq )):
295
306
raise ValidationException ("Workflow must be a dict or list." )
296
307
@@ -322,13 +333,12 @@ def var_spool_cwl_detector(obj, # type: Union[Mapping, Iterable, Text]
322
333
323
334
324
335
def make_tool (document_loader , # type: Loader
325
- avsc_names , # type: Names
326
- metadata , # type: Dict[Text, Any]
327
- uri , # type: Text
328
- makeTool , # type: Callable[..., Process]
329
- kwargs # type: dict
330
- ):
331
- # type: (...) -> Process
336
+ avsc_names , # type: Names
337
+ metadata , # type: Dict[Text, Any]
338
+ uri , # type: Text
339
+ makeTool , # type: Callable[..., Process]
340
+ kwargs # type: Dict
341
+ ): # type: (...) -> Process
332
342
"""Make a Python CWL object."""
333
343
resolveduri = document_loader .resolve_ref (uri )[0 ]
334
344
@@ -367,28 +377,30 @@ def make_tool(document_loader, # type: Loader
367
377
return tool
368
378
369
379
370
- def load_tool (argsworkflow , # type: Union[Text, Dict[Text, Any]]
371
- makeTool , # type: Callable[..., Process]
372
- kwargs = None , # type: Dict
373
- enable_dev = False , # type: bool
374
- strict = True , # type: bool
375
- resolver = None , # type: Callable[[Loader, Union[Text, Dict[Text, Any]]], Text]
380
+ def load_tool (argsworkflow , # type: Union[Text, Dict[Text, Any]]
381
+ makeTool , # type: Callable[..., Process]
382
+ kwargs = None , # type: Dict
383
+ enable_dev = False , # type: bool
384
+ strict = True , # type: bool
385
+ resolver = None , # type: ResolverType
376
386
fetcher_constructor = None , # type: FetcherConstructorType
377
387
overrides = None
378
- ):
379
- # type: (...) -> Process
388
+ ): # type: (...) -> Process
380
389
381
- document_loader , workflowobj , uri = fetch_document (argsworkflow , resolver = resolver ,
382
- fetcher_constructor = fetcher_constructor )
383
- document_loader , avsc_names , processobj , metadata , uri = validate_document (
390
+ document_loader , workflowobj , uri = fetch_document (
391
+ argsworkflow , resolver = resolver , fetcher_constructor = fetcher_constructor )
392
+ document_loader , avsc_names , _ , metadata , uri = validate_document (
384
393
document_loader , workflowobj , uri , enable_dev = enable_dev ,
385
394
strict = strict , fetcher_constructor = fetcher_constructor ,
386
395
overrides = overrides , metadata = kwargs .get ('metadata' , None )
387
396
if kwargs else None )
388
397
return make_tool (document_loader , avsc_names , metadata , uri ,
389
398
makeTool , kwargs if kwargs else {})
390
399
391
- def resolve_overrides (ov , ov_uri , baseurl ): # type: (CommentedMap, Text, Text) -> List[Dict[Text, Any]]
400
+ def resolve_overrides (ov , # Type: CommentedMap
401
+ ov_uri , # Type: Text
402
+ baseurl # type: Text
403
+ ): # type: (...) -> List[Dict[Text, Any]]
392
404
ovloader = Loader (overrides_ctx )
393
405
ret , _ = ovloader .resolve_all (ov , baseurl )
394
406
if not isinstance (ret , CommentedMap ):
0 commit comments