1
+ # pylint: disable=unused-import
2
+ """Loads a CWL document."""
3
+
1
4
import os
2
5
import logging
3
6
import re
4
7
import urlparse
5
- import sys
6
- import json
7
8
from schema_salad .ref_resolver import Loader
8
9
import schema_salad .validate as validate
9
10
import schema_salad .schema as schema
11
+ from avro .schema import Names
10
12
from . import update
11
13
from . import process
14
+ from .process import Process , shortname
15
+ from .errors import WorkflowException
16
+ from typing import Any , Callable , cast , Dict , Tuple , Union
12
17
13
- _logger = logging .getLogger ("cwltool" )
14
18
15
19
def fetch_document (argsworkflow ):
20
+ # type: (Union[str, unicode, dict[unicode, Any]]) -> Tuple[Loader, Dict[unicode, Any], unicode]
21
+ """Retrieve a CWL document."""
16
22
document_loader = Loader ({"cwl" : "https://w3id.org/cwl/cwl#" , "id" : "@id" })
17
23
18
- jobobj = None
19
- uri = None # type: str
24
+ uri = None # type: unicode
20
25
workflowobj = None # type: Dict[unicode, Any]
21
- if isinstance (argsworkflow , basestring ):
26
+ if isinstance (argsworkflow , ( str , unicode ) ):
22
27
split = urlparse .urlsplit (argsworkflow )
23
28
if split .scheme :
24
29
uri = argsworkflow
25
30
else :
26
31
uri = "file://" + os .path .abspath (argsworkflow )
27
- fileuri , urifrag = urlparse .urldefrag (uri )
32
+ fileuri = urlparse .urldefrag (uri )[ 0 ]
28
33
workflowobj = document_loader .fetch (fileuri )
29
34
elif isinstance (argsworkflow , dict ):
30
35
workflowobj = argsworkflow
31
36
uri = "#" + str (id (argsworkflow ))
32
37
else :
33
- raise validate .ValidationException ("Must be URI or object: '%s'" % argsworkflow )
38
+ raise validate .ValidationException (
39
+ "Must be URI or object: '%s'" % argsworkflow )
34
40
35
41
return document_loader , workflowobj , uri
36
42
37
43
38
44
def validate_document (document_loader , workflowobj , uri ,
39
45
enable_dev = False , strict = True , preprocess_only = False ):
46
+ # type: (Loader, Dict[unicode, Any], unicode, bool, bool, bool) -> Tuple[Loader, Names, Any, Dict[str, str], unicode]
47
+ """Validate a CWL document."""
40
48
jobobj = None
41
49
if "cwl:tool" in workflowobj :
42
50
jobobj = workflowobj
43
51
uri = urlparse .urljoin (uri , jobobj ["cwl:tool" ])
44
52
del jobobj ["cwl:tool" ]
45
- workflowobj = fetch_document (uri )
53
+ workflowobj = fetch_document (uri )[ 1 ]
46
54
47
55
if isinstance (workflowobj , list ):
48
56
workflowobj = {
49
57
"$graph" : workflowobj
50
58
}
51
59
52
- fileuri , urifrag = urlparse .urldefrag (uri )
60
+ fileuri = urlparse .urldefrag (uri )[ 0 ]
53
61
54
62
if "cwlVersion" in workflowobj :
55
- workflowobj ["cwlVersion" ] = re .sub (r"^(?:cwl:|https://w3id.org/cwl/cwl#)" , "" , workflowobj ["cwlVersion" ])
63
+ workflowobj ["cwlVersion" ] = re .sub (
64
+ r"^(?:cwl:|https://w3id.org/cwl/cwl#)" , "" ,
65
+ workflowobj ["cwlVersion" ])
56
66
else :
57
67
workflowobj ["cwlVersion" ] = "draft-2"
58
68
59
69
if workflowobj ["cwlVersion" ] == "draft-2" :
60
- workflowobj = update ._draft2toDraft3dev1 (workflowobj , document_loader , uri , updateSteps = False )
70
+ workflowobj = update ._draft2toDraft3dev1 (
71
+ workflowobj , document_loader , uri , update_steps = False )[0 ]
61
72
if "@graph" in workflowobj :
62
73
workflowobj ["$graph" ] = workflowobj ["@graph" ]
63
74
del workflowobj ["@graph" ]
64
75
65
- (document_loader , avsc_names , schema_metadata , schema_loader ) = process .get_schema (workflowobj ["cwlVersion" ])
76
+ (document_loader , avsc_names ) = \
77
+ process .get_schema (workflowobj ["cwlVersion" ])[:2 ]
66
78
67
79
if isinstance (avsc_names , Exception ):
68
80
raise avsc_names
@@ -78,29 +90,36 @@ def validate_document(document_loader, workflowobj, uri,
78
90
79
91
if not metadata :
80
92
metadata = {"$namespaces" : processobj .get ("$namespaces" , {}),
81
- "$schemas" : processobj .get ("$schemas" , []),
82
- "cwlVersion" : processobj ["cwlVersion" ]}
93
+ "$schemas" : processobj .get ("$schemas" , []),
94
+ "cwlVersion" : processobj ["cwlVersion" ]}
83
95
84
- if metadata .get ("cwlVersion" ) != update .latest :
85
- processobj = update .update (processobj , document_loader , fileuri , enable_dev , metadata )
96
+ if metadata .get ("cwlVersion" ) != update .LATEST :
97
+ processobj = update .update (
98
+ processobj , document_loader , fileuri , enable_dev , metadata )
86
99
87
100
if jobobj :
88
101
metadata ["cwl:defaults" ] = jobobj
89
102
90
103
return document_loader , avsc_names , processobj , metadata , uri
91
104
92
105
93
- def make_tool (document_loader , avsc_names , processobj , metadata , uri , makeTool , kwargs ):
94
- processobj , _ = document_loader .resolve_ref (uri )
106
+ def make_tool (document_loader , avsc_names , processobj , metadata , uri , makeTool ,
107
+ kwargs ):
108
+ # type: (Loader, Names, Dict[str, Any], Dict[str, Any], unicode, Callable[..., Process], Dict[str, Any]) -> Process
109
+ """Make a Python CWL object."""
110
+ resolveduri = document_loader .resolve_ref (uri )[0 ]
95
111
96
- if isinstance (processobj , list ):
97
- if 1 == len ( processobj ) :
98
- processobj = processobj [0 ]
112
+ if isinstance (resolveduri , list ):
113
+ if len ( resolveduri ) == 1 :
114
+ processobj = resolveduri [0 ]
99
115
else :
100
- raise WorkflowException (u"Tool file contains graph of multiple objects, "
101
- "must specify one of #%s" %
102
- ", #" .join (urlparse .urldefrag (i ["id" ])[1 ]
103
- for i in processobj if "id" in i ))
116
+ raise WorkflowException (
117
+ u"Tool file contains graph of multiple objects, must specify "
118
+ "one of #%s" % ", #" .join (
119
+ urlparse .urldefrag (i ["id" ])[1 ] for i in resolveduri
120
+ if "id" in i ))
121
+ else :
122
+ processobj = cast (Dict [str , Any ], resolveduri )
104
123
105
124
kwargs = kwargs .copy ()
106
125
kwargs .update ({
@@ -109,25 +128,24 @@ def make_tool(document_loader, avsc_names, processobj, metadata, uri, makeTool,
109
128
"avsc_names" : avsc_names ,
110
129
"metadata" : metadata
111
130
})
112
- t = makeTool (processobj , ** kwargs )
131
+ tool = makeTool (processobj , ** kwargs )
113
132
114
133
if "cwl:defaults" in metadata :
115
134
jobobj = metadata ["cwl:defaults" ]
116
- for inp in t .tool ["inputs" ]:
135
+ for inp in tool .tool ["inputs" ]:
117
136
if shortname (inp ["id" ]) in jobobj :
118
137
inp ["default" ] = jobobj [shortname (inp ["id" ])]
119
138
120
- return t
139
+ return tool
121
140
122
141
123
142
def load_tool (argsworkflow , makeTool , kwargs = None ,
124
143
enable_dev = False ,
125
144
strict = True ):
126
-
145
+ # type: (Union[str,unicode,dict[unicode,Any]], Callable[...,Process], Dict[str, Any], bool, bool) -> Any
127
146
document_loader , workflowobj , uri = fetch_document (argsworkflow )
128
- document_loader , avsc_names , processobj , metadata , uri = validate_document (document_loader ,
129
- workflowobj ,
130
- uri ,
131
- enable_dev = enable_dev ,
132
- strict = strict )
133
- return make_tool (document_loader , avsc_names , processobj , metadata , uri , makeTool , kwargs if kwargs else {})
147
+ document_loader , avsc_names , processobj , metadata , uri = validate_document (
148
+ document_loader , workflowobj , uri , enable_dev = enable_dev ,
149
+ strict = strict )
150
+ return make_tool (document_loader , avsc_names , processobj , metadata , uri ,
151
+ makeTool , kwargs if kwargs else {})
0 commit comments