Skip to content

Commit b0629e9

Browse files
author
Anton Khodak
committed
Refactor executors & use OOP approach to executors
* add an option for choosing executors: sequential or multithreaded * use class-based approach to executors and move them to a separate file
1 parent 7b77c4d commit b0629e9

File tree

3 files changed

+194
-98
lines changed

3 files changed

+194
-98
lines changed

cwltool/executors.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import logging
2+
import tempfile
3+
import threading
4+
5+
import os
6+
from abc import ABCMeta, abstractmethod
7+
8+
from typing import Dict, Text, Any
9+
10+
from cwltool.builder import Builder
11+
from cwltool.errors import WorkflowException
12+
from cwltool.mutation import MutationManager
13+
from cwltool.process import relocateOutputs, cleanIntermediate, Process
14+
15+
16+
_logger = logging.getLogger("cwltool")
17+
18+
defaultStreamHandler = logging.StreamHandler()
19+
_logger.addHandler(defaultStreamHandler)
20+
_logger.setLevel(logging.INFO)
21+
22+
23+
class JobExecutor(object):
24+
__metaclass__ = ABCMeta
25+
26+
def __init__(self):
27+
self.final_output = []
28+
self.final_status = []
29+
self.output_dirs = set()
30+
31+
def __call__(self, *args, **kwargs):
32+
return self.execute(*args, **kwargs)
33+
34+
def output_callback(self, out, processStatus):
35+
self.final_status.append(processStatus)
36+
self.final_output.append(out)
37+
38+
@abstractmethod
39+
def run_jobs(self,
40+
t, # type: Process
41+
job_order_object, # type: Dict[Text, Any],
42+
logger,
43+
**kwargs # type: Any
44+
):
45+
pass
46+
47+
def execute(self, t, # type: Process
48+
job_order_object, # type: Dict[Text, Any],
49+
logger=_logger,
50+
**kwargs # type: Any
51+
):
52+
53+
if "basedir" not in kwargs:
54+
raise WorkflowException("Must provide 'basedir' in kwargs")
55+
56+
finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None
57+
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get(
58+
"tmp_outdir_prefix") else tempfile.mkdtemp()
59+
self.output_dirs.add(kwargs["outdir"])
60+
kwargs["mutation_manager"] = MutationManager()
61+
62+
jobReqs = None
63+
if "cwl:requirements" in job_order_object:
64+
jobReqs = job_order_object["cwl:requirements"]
65+
elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]):
66+
jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"]
67+
if jobReqs:
68+
for req in jobReqs:
69+
t.requirements.append(req)
70+
71+
self.run_jobs(t, job_order_object, logger, **kwargs)
72+
73+
if self.final_output and self.final_output[0] and finaloutdir:
74+
self.final_output[0] = relocateOutputs(self.final_output[0], finaloutdir,
75+
self.output_dirs, kwargs.get("move_outputs"),
76+
kwargs["make_fs_access"](""))
77+
78+
if kwargs.get("rm_tmpdir"):
79+
cleanIntermediate(self.output_dirs)
80+
81+
if self.final_output and self.final_status:
82+
return (self.final_output[0], self.final_status[0])
83+
else:
84+
return (None, "permanentFail")
85+
86+
87+
class SingleJobExecutor(JobExecutor):
88+
def run_jobs(self,
89+
t, # type: Process
90+
job_order_object, # type: Dict[Text, Any],
91+
logger,
92+
**kwargs # type: Any
93+
):
94+
jobiter = t.job(job_order_object,
95+
self.output_callback,
96+
**kwargs)
97+
98+
try:
99+
for r in jobiter:
100+
if r:
101+
builder = kwargs.get("builder", None) # type: Builder
102+
if builder is not None:
103+
r.builder = builder
104+
if r.outdir:
105+
self.output_dirs.add(r.outdir)
106+
r.run(**kwargs)
107+
else:
108+
logger.error("Workflow cannot make any more progress.")
109+
break
110+
except WorkflowException:
111+
raise
112+
except Exception as e:
113+
logger.exception("Got workflow error")
114+
raise WorkflowException(Text(e))
115+
116+
117+
class MultithreadedJobExecutor(JobExecutor):
118+
def __init__(self):
119+
super(MultithreadedJobExecutor, self).__init__()
120+
self.fetch_iter_lock = threading.Lock()
121+
self.threads = set()
122+
self.exceptions = []
123+
124+
def run_job(self, job, **kwargs):
125+
def runner():
126+
try:
127+
job.run(**kwargs)
128+
except WorkflowException as e:
129+
self.exceptions.append(e)
130+
except Exception as e:
131+
self.exceptions.append(WorkflowException(Text(e)))
132+
133+
self.threads.remove(thread)
134+
135+
if self.fetch_iter_lock.locked():
136+
self.fetch_iter_lock.release()
137+
138+
thread = threading.Thread(target=runner)
139+
thread.daemon = True
140+
self.threads.add(thread)
141+
thread.start()
142+
143+
def wait_for_next_completion(self):
144+
self.fetch_iter_lock.acquire()
145+
self.fetch_iter_lock.acquire()
146+
self.fetch_iter_lock.release()
147+
if self.exceptions:
148+
raise self.exceptions[0]
149+
150+
def run_jobs(self,
151+
t, # type: Process
152+
job_order_object, # type: Dict[Text, Any],
153+
logger,
154+
**kwargs # type: Any
155+
):
156+
157+
jobiter = t.job(job_order_object, self.output_callback, **kwargs)
158+
159+
for r in jobiter:
160+
if r:
161+
builder = kwargs.get("builder", None) # type: Builder
162+
if builder is not None:
163+
r.builder = builder
164+
if r.outdir:
165+
self.output_dirs.add(r.outdir)
166+
self.run_job(r, **kwargs)
167+
else:
168+
if len(self.threads):
169+
self.wait_for_next_completion()
170+
else:
171+
logger.error("Workflow cannot make any more progress.")
172+
break
173+
174+
while len(self.threads) > 0:
175+
self.wait_for_next_completion()

cwltool/factory.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from typing import Callable as tCallable
44
from typing import Any, Dict, Text, Tuple, Union
55

6-
from . import load_tool, main, workflow
6+
from . import load_tool, workflow
7+
from .executors import SingleJobExecutor
78
from .process import Process
89

910

@@ -35,7 +36,7 @@ class Factory(object):
3536
def __init__(self,
3637
makeTool=workflow.defaultMakeTool, # type: tCallable[[Any], Process]
3738
# should be tCallable[[Dict[Text, Any], Any], Process] ?
38-
executor=main.job_executor, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
39+
executor=SingleJobExecutor(), # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
3940
**execkwargs # type: Any
4041
):
4142
# type: (...) -> None

cwltool/main.py

Lines changed: 16 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from .builder import Builder
2929
from .cwlrdf import printdot, printrdf
3030
from .errors import UnsupportedRequirement, WorkflowException
31+
from .executors import SingleJobExecutor, MultithreadedJobExecutor
3132
from .load_tool import (FetcherConstructorType, resolve_tool_uri,
3233
fetch_document, make_tool, validate_document, jobloaderctx,
3334
resolve_overrides, load_overrides)
@@ -64,7 +65,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
6465
parser.add_argument("--no-container", action="store_false", default=True,
6566
help="Do not execute jobs in a Docker container, even when specified by the CommandLineTool",
6667
dest="use_container")
67-
68+
parser.add_argument("--executor", type=Text, choices={"single", "parallel"}, default="single",
69+
help="Workflow executor type: sequential/multithreaded. Default: sequential")
6870
parser.add_argument("--preserve-environment", type=Text, action="append",
6971
help="Preserve specific environment variable when running CommandLineTools. May be provided multiple times.",
7072
metavar="ENVVAR",
@@ -253,100 +255,6 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
253255
return parser
254256

255257

256-
def job_executor(t, # type: Process
257-
job_order_object, # type: Dict[Text, Any]
258-
**kwargs # type: Any
259-
):
260-
# type: (...) -> Tuple[Dict[Text, Any], Text]
261-
final_output = []
262-
final_status = []
263-
264-
def output_callback(out, processStatus):
265-
final_status.append(processStatus)
266-
final_output.append(out)
267-
268-
if "basedir" not in kwargs:
269-
raise WorkflowException("Must provide 'basedir' in kwargs")
270-
271-
output_dirs = set()
272-
finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None
273-
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get(
274-
"tmp_outdir_prefix") else tempfile.mkdtemp()
275-
output_dirs.add(kwargs["outdir"])
276-
kwargs["mutation_manager"] = MutationManager()
277-
278-
jobReqs = None
279-
if "cwl:requirements" in job_order_object:
280-
jobReqs = job_order_object["cwl:requirements"]
281-
elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]):
282-
jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"]
283-
if jobReqs:
284-
for req in jobReqs:
285-
t.requirements.append(req)
286-
287-
fetch_iter_lock = threading.Lock()
288-
threads = set()
289-
exceptions = []
290-
291-
def run_job(job):
292-
def runner():
293-
try:
294-
job.run(**kwargs)
295-
except WorkflowException as e:
296-
exceptions.append(e)
297-
except Exception as e:
298-
exceptions.append(WorkflowException(Text(e)))
299-
300-
threads.remove(thread)
301-
302-
if fetch_iter_lock.locked():
303-
fetch_iter_lock.release()
304-
305-
thread = threading.Thread(target=runner)
306-
thread.daemon = True
307-
threads.add(thread)
308-
thread.start()
309-
310-
def wait_for_next_completion():
311-
fetch_iter_lock.acquire()
312-
fetch_iter_lock.acquire()
313-
fetch_iter_lock.release()
314-
if exceptions:
315-
raise exceptions[0]
316-
317-
jobiter = t.job(job_order_object, output_callback, **kwargs)
318-
319-
for r in jobiter:
320-
if r:
321-
builder = kwargs.get("builder", None) # type: Builder
322-
if builder is not None:
323-
r.builder = builder
324-
if r.outdir:
325-
output_dirs.add(r.outdir)
326-
run_job(r)
327-
else:
328-
if len(threads):
329-
wait_for_next_completion()
330-
else:
331-
_logger.error("Workflow cannot make any more progress.")
332-
break
333-
334-
while len(threads) > 0:
335-
wait_for_next_completion()
336-
337-
if final_output and final_output[0] and finaloutdir:
338-
final_output[0] = relocateOutputs(final_output[0], finaloutdir,
339-
output_dirs, kwargs.get("move_outputs"),
340-
kwargs["make_fs_access"](""))
341-
342-
if kwargs.get("rm_tmpdir"):
343-
cleanIntermediate(output_dirs)
344-
345-
if final_output and final_status:
346-
return (final_output[0], final_status[0])
347-
else:
348-
return (None, "permanentFail")
349-
350258

351259
class FSAction(argparse.Action):
352260
objclass = None # type: Text
@@ -756,7 +664,7 @@ def supportedCWLversions(enable_dev):
756664

757665
def main(argsl=None, # type: List[str]
758666
args=None, # type: argparse.Namespace
759-
executor=job_executor, # type: Callable[..., Tuple[Dict[Text, Any], Text]]
667+
executor=None, # type: Callable[..., Tuple[Dict[Text, Any], Text]]
760668
makeTool=workflow.defaultMakeTool, # type: Callable[..., Process]
761669
selectResources=None, # type: Callable[[Dict[Text, int]], Dict[Text, int]]
762670
stdin=sys.stdin, # type: IO[Any]
@@ -991,6 +899,17 @@ def main(argsl=None, # type: List[str]
991899
except SystemExit as e:
992900
return e.code
993901

902+
if not executor:
903+
if args.executor == "single":
904+
executor = SingleJobExecutor()
905+
elif args.executor == "parallel":
906+
executor = MultithreadedJobExecutor()
907+
else:
908+
_logger.error("Unknow type of executor: {0}".format(args.executor))
909+
arg_parser().print_help()
910+
return 1
911+
912+
994913
if isinstance(job_order_object, int):
995914
return job_order_object
996915

@@ -999,6 +918,7 @@ def main(argsl=None, # type: List[str]
999918
del args.workflow
1000919
del args.job_order
1001920
(out, status) = executor(tool, job_order_object,
921+
logger=_logger,
1002922
makeTool=makeTool,
1003923
select_resources=selectResources,
1004924
make_fs_access=make_fs_access,

0 commit comments

Comments
 (0)