5
5
import os
6
6
from abc import ABCMeta , abstractmethod
7
7
8
- from typing import Dict , Text , Any
8
+ from typing import Dict , Text , Any , Tuple , Set , List
9
9
10
10
from cwltool .builder import Builder
11
11
from cwltool .errors import WorkflowException
12
12
from cwltool .mutation import MutationManager
13
+ from cwltool .job import JobBase
13
14
from cwltool .process import relocateOutputs , cleanIntermediate , Process
14
15
15
16
@@ -24,9 +25,10 @@ class JobExecutor(object):
24
25
__metaclass__ = ABCMeta
25
26
26
27
def __init__ (self ):
27
- self .final_output = []
28
- self .final_status = []
29
- self .output_dirs = set ()
28
+ # type: (...) -> None
29
+ self .final_output = [] # type: List
30
+ self .final_status = [] # type: List
31
+ self .output_dirs = set () # type: Set
30
32
31
33
def __call__ (self , * args , ** kwargs ):
32
34
return self .execute (* args , ** kwargs )
@@ -38,17 +40,18 @@ def output_callback(self, out, processStatus):
38
40
@abstractmethod
39
41
def run_jobs (self ,
40
42
t , # type: Process
41
- job_order_object , # type: Dict[Text, Any],
43
+ job_order_object , # type: Dict[Text, Any]
42
44
logger ,
43
45
** kwargs # type: Any
44
46
):
45
47
pass
46
48
47
49
def execute (self , t , # type: Process
48
- job_order_object , # type: Dict[Text, Any],
50
+ job_order_object , # type: Dict[Text, Any]
49
51
logger = _logger ,
50
52
** kwargs # type: Any
51
53
):
54
+ # type: (...) -> Tuple[Dict[Text, Any], Text]
52
55
53
56
if "basedir" not in kwargs :
54
57
raise WorkflowException ("Must provide 'basedir' in kwargs" )
@@ -87,7 +90,7 @@ def execute(self, t, # type: Process
87
90
class SingleJobExecutor (JobExecutor ):
88
91
def run_jobs (self ,
89
92
t , # type: Process
90
- job_order_object , # type: Dict[Text, Any],
93
+ job_order_object , # type: Dict[Text, Any]
91
94
logger ,
92
95
** kwargs # type: Any
93
96
):
@@ -120,7 +123,11 @@ def __init__(self):
120
123
self .threads = set ()
121
124
self .exceptions = []
122
125
123
- def run_job (self , job , ** kwargs ):
126
+ def run_job (self ,
127
+ job , # type: JobBase
128
+ ** kwargs # type: Any
129
+ ):
130
+ # type: (...) -> None
124
131
def runner ():
125
132
try :
126
133
job .run (** kwargs )
@@ -136,13 +143,13 @@ def runner():
136
143
self .threads .add (thread )
137
144
thread .start ()
138
145
139
- def wait_for_next_completion (self ):
146
+ def wait_for_next_completion (self ): # type: () -> None
140
147
if self .exceptions :
141
148
raise self .exceptions [0 ]
142
149
143
150
def run_jobs (self ,
144
151
t , # type: Process
145
- job_order_object , # type: Dict[Text, Any],
152
+ job_order_object , # type: Dict[Text, Any]
146
153
logger ,
147
154
** kwargs # type: Any
148
155
):
0 commit comments