40
40
from .loghandler import _logger
41
41
from .process import Process , get_schema , shortname
42
42
from .update import ALLUPDATES
43
- from .utils import ResolverType , visit_class
43
+ from .utils import CWLObjectType , ResolverType , visit_class
44
44
45
45
jobloaderctx = {
46
46
"cwl" : "https://w3id.org/cwl/cwl#" ,
@@ -108,7 +108,7 @@ def resolve_tool_uri(
108
108
109
109
110
110
def fetch_document (
111
- argsworkflow : Union [str , Dict [ str , Any ] ],
111
+ argsworkflow : Union [str , CWLObjectType ],
112
112
loadingContext : Optional [LoadingContext ] = None ,
113
113
) -> Tuple [LoadingContext , CommentedMap , str ]:
114
114
"""Retrieve a CWL document."""
@@ -132,17 +132,23 @@ def fetch_document(
132
132
)
133
133
workflowobj = cast (CommentedMap , loadingContext .loader .fetch (fileuri ))
134
134
return loadingContext , workflowobj , uri
135
- if isinstance (argsworkflow , dict ):
136
- uri = argsworkflow ["id" ] if argsworkflow .get ("id" ) else "_:" + str (uuid .uuid4 ())
137
- workflowobj = cast (CommentedMap , cmap (argsworkflow , fn = uri ))
135
+ if isinstance (argsworkflow , MutableMapping ):
136
+ uri = (
137
+ cast (str , argsworkflow ["id" ])
138
+ if argsworkflow .get ("id" )
139
+ else "_:" + str (uuid .uuid4 ())
140
+ )
141
+ workflowobj = cast (
142
+ CommentedMap , cmap (cast (Dict [str , Any ], argsworkflow ), fn = uri )
143
+ )
138
144
loadingContext .loader .idx [uri ] = workflowobj
139
145
return loadingContext , workflowobj , uri
140
146
raise ValidationException ("Must be URI or object: '%s'" % argsworkflow )
141
147
142
148
143
149
def _convert_stdstreams_to_files (
144
150
workflowobj : Union [
145
- MutableMapping [ str , Any ], MutableSequence [Union [Dict [ str , Any ] , str , int ]], str
151
+ CWLObjectType , MutableSequence [Union [CWLObjectType , str , int ]], str
146
152
]
147
153
) -> None :
148
154
if isinstance (workflowobj , MutableMapping ):
@@ -156,7 +162,9 @@ def _convert_stdstreams_to_files(
156
162
outputs = workflowobj .get ("outputs" , [])
157
163
if not isinstance (outputs , CommentedSeq ):
158
164
raise ValidationException ('"outputs" section is not ' "valid." )
159
- for out in workflowobj .get ("outputs" , []):
165
+ for out in cast (
166
+ MutableSequence [CWLObjectType ], workflowobj .get ("outputs" , [])
167
+ ):
160
168
if not isinstance (out , CommentedMap ):
161
169
raise ValidationException (
162
170
"Output '{}' is not a valid " "OutputParameter." .format (out )
@@ -181,7 +189,9 @@ def _convert_stdstreams_to_files(
181
189
workflowobj [streamtype ] = filename
182
190
out ["type" ] = "File"
183
191
out ["outputBinding" ] = cmap ({"glob" : filename })
184
- for inp in workflowobj .get ("inputs" , []):
192
+ for inp in cast (
193
+ MutableSequence [CWLObjectType ], workflowobj .get ("inputs" , [])
194
+ ):
185
195
if inp .get ("type" ) == "stdin" :
186
196
if "inputBinding" in inp :
187
197
raise ValidationException (
@@ -195,21 +205,38 @@ def _convert_stdstreams_to_files(
195
205
)
196
206
else :
197
207
workflowobj ["stdin" ] = (
198
- "$(inputs.%s.path)" % inp ["id" ].rpartition ("#" )[2 ]
208
+ "$(inputs.%s.path)"
209
+ % cast (str , inp ["id" ]).rpartition ("#" )[2 ]
199
210
)
200
211
inp ["type" ] = "File"
201
212
else :
202
213
for entry in workflowobj .values ():
203
- _convert_stdstreams_to_files (entry )
214
+ _convert_stdstreams_to_files (
215
+ cast (
216
+ Union [
217
+ CWLObjectType ,
218
+ MutableSequence [Union [CWLObjectType , str , int ]],
219
+ str ,
220
+ ],
221
+ entry ,
222
+ )
223
+ )
204
224
if isinstance (workflowobj , MutableSequence ):
205
225
for entry in workflowobj :
206
- _convert_stdstreams_to_files (entry )
226
+ _convert_stdstreams_to_files (
227
+ cast (
228
+ Union [
229
+ CWLObjectType ,
230
+ MutableSequence [Union [CWLObjectType , str , int ]],
231
+ str ,
232
+ ],
233
+ entry ,
234
+ )
235
+ )
207
236
208
237
209
238
def _add_blank_ids (
210
- workflowobj : Union [
211
- MutableMapping [str , Any ], MutableSequence [Union [MutableMapping [str , Any ], str ]]
212
- ]
239
+ workflowobj : Union [CWLObjectType , MutableSequence [Union [CWLObjectType , str ]]]
213
240
) -> None :
214
241
if isinstance (workflowobj , MutableMapping ):
215
242
if (
@@ -220,10 +247,20 @@ def _add_blank_ids(
220
247
):
221
248
workflowobj ["run" ]["id" ] = str (uuid .uuid4 ())
222
249
for entry in workflowobj .values ():
223
- _add_blank_ids (entry )
250
+ _add_blank_ids (
251
+ cast (
252
+ Union [CWLObjectType , MutableSequence [Union [CWLObjectType , str ]]],
253
+ entry ,
254
+ )
255
+ )
224
256
if isinstance (workflowobj , MutableSequence ):
225
257
for entry in workflowobj :
226
- _add_blank_ids (entry )
258
+ _add_blank_ids (
259
+ cast (
260
+ Union [CWLObjectType , MutableSequence [Union [CWLObjectType , str ]]],
261
+ entry ,
262
+ )
263
+ )
227
264
228
265
229
266
def resolve_and_validate_document (
@@ -263,8 +300,8 @@ def resolve_and_validate_document(
263
300
if not cwlVersion and fileuri != uri :
264
301
# The tool we're loading is a fragment of a bigger file. Get
265
302
# the document root element and look for cwlVersion there.
266
- metadata = fetch_document (fileuri , loadingContext )[1 ] # type: Dict[str, Any]
267
- cwlVersion = metadata .get ("cwlVersion" )
303
+ metadata = cast ( CWLObjectType , fetch_document (fileuri , loadingContext )[1 ])
304
+ cwlVersion = cast ( str , metadata .get ("cwlVersion" ) )
268
305
if not cwlVersion :
269
306
raise ValidationException (
270
307
"No cwlVersion found. "
@@ -427,7 +464,7 @@ def make_tool(
427
464
428
465
429
466
def load_tool (
430
- argsworkflow : Union [str , Dict [ str , Any ] ],
467
+ argsworkflow : Union [str , CWLObjectType ],
431
468
loadingContext : Optional [LoadingContext ] = None ,
432
469
) -> Process :
433
470
@@ -441,19 +478,17 @@ def load_tool(
441
478
442
479
443
480
def resolve_overrides (
444
- ov , # type: IdxResultType
445
- ov_uri , # type: str
446
- baseurl , # type: str
447
- ): # type: (...) -> List[Dict[str, Any]]
481
+ ov : IdxResultType , ov_uri : str , baseurl : str ,
482
+ ) -> List [CWLObjectType ]:
448
483
ovloader = Loader (overrides_ctx )
449
484
ret , _ = ovloader .resolve_all (ov , baseurl )
450
485
if not isinstance (ret , CommentedMap ):
451
486
raise Exception ("Expected CommentedMap, got %s" % type (ret ))
452
487
cwl_docloader = get_schema ("v1.0" )[0 ]
453
488
cwl_docloader .resolve_all (ret , ov_uri )
454
- return cast (List [Dict [ str , Any ] ], ret ["http://commonwl.org/cwltool#overrides" ])
489
+ return cast (List [CWLObjectType ], ret ["http://commonwl.org/cwltool#overrides" ])
455
490
456
491
457
- def load_overrides (ov , base_url ): # type: ( str, str ) -> List[Dict[str, Any]]
492
+ def load_overrides (ov : str , base_url : str ) -> List [CWLObjectType ]:
458
493
ovloader = Loader (overrides_ctx )
459
494
return resolve_overrides (ovloader .fetch (ov ), ov , base_url )
0 commit comments