Skip to content

Commit 40c09c7

Browse files
author
Anton Khodak
committed
Merge branch 'master' into singularity-support
2 parents 83272bc + 92abb4e commit 40c09c7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+879
-472
lines changed

.travis.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
sudo: false
22
language: python
3+
cache: pip
34
python:
45
- 2.7
56
- 3.3
@@ -10,9 +11,13 @@ os:
1011
- linux
1112
install:
1213
- pip install tox-travis
14+
jobs:
15+
include:
16+
- stage: release-test
17+
script: RELEASE_SKIP=head ./release-test.sh
1318
script: tox
1419
branches:
1520
only:
1621
- master
1722
notifications:
18-
email: false
23+
email: false

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ install: FORCE
5757
dist: dist/${MODULE}-$(VERSION).tar.gz
5858

5959
dist/${MODULE}-$(VERSION).tar.gz: $(SOURCES)
60-
./setup.py sdist
60+
./setup.py sdist bdist_wheel
6161

6262
## clean : clean up all temporary / machine-generated files
6363
clean: FORCE

appveyor.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ install:
3131
- "python -c \"import struct; print(struct.calcsize('P') * 8)\""
3232

3333
build_script:
34+
- "%CMD_IN_ENV% pip install -U setuptools pip"
3435
- "%CMD_IN_ENV% pip install ."
3536

3637

cwltool/argparser.py

Lines changed: 401 additions & 0 deletions
Large diffs are not rendered by default.

cwltool/docker.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import tempfile
1010
from io import open
1111

12+
import datetime
1213
import requests
1314
from typing import (Dict, List, Text, Any, MutableMapping)
1415

@@ -180,8 +181,9 @@ def add_volumes(self, pathmapper, runtime):
180181
docker_windows_path_adjust(createtmp),
181182
docker_windows_path_adjust(vol.target)))
182183

183-
def create_runtime(self, env, rm_container=True, **kwargs):
184-
# type: (MutableMapping[Text, Text], bool, **Any) -> List
184+
def create_runtime(self, env, rm_container=True, record_container_id=False, cidfile_dir="",
185+
cidfile_prefix="", **kwargs):
186+
# type: (MutableMapping[Text, Text], bool, bool, Text, Text, **Any) -> List
185187
user_space_docker_cmd = kwargs.get("user_space_docker_cmd")
186188
if user_space_docker_cmd:
187189
runtime = [user_space_docker_cmd, u"run"]
@@ -236,6 +238,26 @@ def create_runtime(self, env, rm_container=True, **kwargs):
236238
# runtime.append("--env=HOME=/tmp")
237239
runtime.append(u"--env=HOME=%s" % self.builder.outdir)
238240

241+
# add parameters to docker to write a container ID file
242+
if record_container_id:
243+
if cidfile_dir != "":
244+
if not os.path.isdir(cidfile_dir):
245+
_logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
246+
cidfile_dir + " is not a directory or "
247+
"directory doesn't exist, please check it first")
248+
exit(2)
249+
if not os.path.exists(cidfile_dir):
250+
_logger.error("--cidfile-dir %s error:\n%s", cidfile_dir,
251+
"directory doesn't exist, please create it first")
252+
exit(2)
253+
else:
254+
cidfile_dir = os.getcwd()
255+
cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid"
256+
if cidfile_prefix != "":
257+
cidfile_name = str(cidfile_prefix + "-" + cidfile_name)
258+
cidfile_path = os.path.join(cidfile_dir, cidfile_name)
259+
runtime.append(u"--cidfile=%s" % cidfile_path)
260+
239261
for t, v in self.environment.items():
240262
runtime.append(u"--env=%s=%s" % (t, v))
241263

cwltool/draft2tool.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ def revmap_file(builder, outdir, f):
153153
return f
154154

155155

156-
raise WorkflowException(u"Output File object is missing both `location` and `path` fields: %s" % f)
156+
raise WorkflowException(u"Output File object is missing both 'location' "
157+
"and 'path' fields: %s" % f)
157158

158159

159160
class CallbackJob(object):

cwltool/executors.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import logging
2+
import tempfile
3+
import threading
4+
5+
import os
6+
from abc import ABCMeta, abstractmethod
7+
8+
from typing import Dict, Text, Any, Tuple, Set, List
9+
10+
from .builder import Builder
11+
from .errors import WorkflowException
12+
from .mutation import MutationManager
13+
from .job import JobBase
14+
from .process import relocateOutputs, cleanIntermediate, Process
15+
from . import loghandler
16+
17+
_logger = logging.getLogger("cwltool")
18+
19+
class JobExecutor(object):
20+
__metaclass__ = ABCMeta
21+
22+
def __init__(self):
23+
# type: (...) -> None
24+
self.final_output = [] # type: List
25+
self.final_status = [] # type: List
26+
self.output_dirs = set() # type: Set
27+
28+
def __call__(self, *args, **kwargs):
29+
return self.execute(*args, **kwargs)
30+
31+
def output_callback(self, out, processStatus):
32+
self.final_status.append(processStatus)
33+
self.final_output.append(out)
34+
35+
@abstractmethod
36+
def run_jobs(self,
37+
t, # type: Process
38+
job_order_object, # type: Dict[Text, Any]
39+
logger,
40+
**kwargs # type: Any
41+
):
42+
pass
43+
44+
def execute(self, t, # type: Process
45+
job_order_object, # type: Dict[Text, Any]
46+
logger=_logger,
47+
**kwargs # type: Any
48+
):
49+
# type: (...) -> Tuple[Dict[Text, Any], Text]
50+
51+
if "basedir" not in kwargs:
52+
raise WorkflowException("Must provide 'basedir' in kwargs")
53+
54+
finaloutdir = os.path.abspath(kwargs.get("outdir")) if kwargs.get("outdir") else None
55+
kwargs["outdir"] = tempfile.mkdtemp(prefix=kwargs["tmp_outdir_prefix"]) if kwargs.get(
56+
"tmp_outdir_prefix") else tempfile.mkdtemp()
57+
self.output_dirs.add(kwargs["outdir"])
58+
kwargs["mutation_manager"] = MutationManager()
59+
60+
jobReqs = None
61+
if "cwl:requirements" in job_order_object:
62+
jobReqs = job_order_object["cwl:requirements"]
63+
elif ("cwl:defaults" in t.metadata and "cwl:requirements" in t.metadata["cwl:defaults"]):
64+
jobReqs = t.metadata["cwl:defaults"]["cwl:requirements"]
65+
if jobReqs:
66+
for req in jobReqs:
67+
t.requirements.append(req)
68+
69+
self.run_jobs(t, job_order_object, logger, **kwargs)
70+
71+
if self.final_output and self.final_output[0] and finaloutdir:
72+
self.final_output[0] = relocateOutputs(self.final_output[0], finaloutdir,
73+
self.output_dirs, kwargs.get("move_outputs"),
74+
kwargs["make_fs_access"](""))
75+
76+
if kwargs.get("rm_tmpdir"):
77+
cleanIntermediate(self.output_dirs)
78+
79+
if self.final_output and self.final_status:
80+
return (self.final_output[0], self.final_status[0])
81+
else:
82+
return (None, "permanentFail")
83+
84+
85+
class SingleJobExecutor(JobExecutor):
86+
def run_jobs(self,
87+
t, # type: Process
88+
job_order_object, # type: Dict[Text, Any]
89+
logger,
90+
**kwargs # type: Any
91+
):
92+
jobiter = t.job(job_order_object,
93+
self.output_callback,
94+
**kwargs)
95+
96+
try:
97+
for r in jobiter:
98+
if r:
99+
builder = kwargs.get("builder", None) # type: Builder
100+
if builder is not None:
101+
r.builder = builder
102+
if r.outdir:
103+
self.output_dirs.add(r.outdir)
104+
r.run(**kwargs)
105+
else:
106+
logger.error("Workflow cannot make any more progress.")
107+
break
108+
except WorkflowException:
109+
raise
110+
except Exception as e:
111+
logger.exception("Got workflow error")
112+
raise WorkflowException(Text(e))
113+
114+
115+
class MultithreadedJobExecutor(JobExecutor):
116+
def __init__(self):
117+
super(MultithreadedJobExecutor, self).__init__()
118+
self.threads = set()
119+
self.exceptions = []
120+
121+
def run_job(self,
122+
job, # type: JobBase
123+
**kwargs # type: Any
124+
):
125+
# type: (...) -> None
126+
def runner():
127+
try:
128+
job.run(**kwargs)
129+
except WorkflowException as e:
130+
self.exceptions.append(e)
131+
except Exception as e:
132+
self.exceptions.append(WorkflowException(Text(e)))
133+
134+
self.threads.remove(thread)
135+
136+
thread = threading.Thread(target=runner)
137+
thread.daemon = True
138+
self.threads.add(thread)
139+
thread.start()
140+
141+
def wait_for_next_completion(self): # type: () -> None
142+
if self.exceptions:
143+
raise self.exceptions[0]
144+
145+
def run_jobs(self,
146+
t, # type: Process
147+
job_order_object, # type: Dict[Text, Any]
148+
logger,
149+
**kwargs # type: Any
150+
):
151+
152+
jobiter = t.job(job_order_object, self.output_callback, **kwargs)
153+
154+
for r in jobiter:
155+
if r:
156+
builder = kwargs.get("builder", None) # type: Builder
157+
if builder is not None:
158+
r.builder = builder
159+
if r.outdir:
160+
self.output_dirs.add(r.outdir)
161+
self.run_job(r, **kwargs)
162+
else:
163+
if len(self.threads):
164+
self.wait_for_next_completion()
165+
else:
166+
logger.error("Workflow cannot make any more progress.")
167+
break
168+
169+
while len(self.threads) > 0:
170+
self.wait_for_next_completion()

cwltool/expression.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def interpolate(scan, rootvars,
185185
e = evaluator(scan[w[0] + 1:w[1]], jslib, rootvars, fullJS=fullJS,
186186
timeout=timeout, force_docker_pull=force_docker_pull,
187187
debug=debug, js_console=js_console)
188-
if w[0] == 0 and w[1] == len(scan):
188+
if w[0] == 0 and w[1] == len(scan) and len(parts) <= 1:
189189
return e
190190
leaf = json.dumps(e, sort_keys=True)
191191
if leaf[0] == '"':

cwltool/factory.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
from typing import Callable as tCallable
44
from typing import Any, Dict, Text, Tuple, Union
55

6-
from . import load_tool, main, workflow
6+
from . import load_tool, workflow
7+
from .argparser import get_default_args
8+
from .executors import SingleJobExecutor
79
from .process import Process
810

911

@@ -35,13 +37,21 @@ class Factory(object):
3537
def __init__(self,
3638
makeTool=workflow.defaultMakeTool, # type: tCallable[[Any], Process]
3739
# should be tCallable[[Dict[Text, Any], Any], Process] ?
38-
executor=main.single_job_executor, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
40+
executor=None, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
3941
**execkwargs # type: Any
4042
):
4143
# type: (...) -> None
4244
self.makeTool = makeTool
45+
if executor is None:
46+
executor = SingleJobExecutor()
4347
self.executor = executor
44-
self.execkwargs = execkwargs
48+
49+
kwargs = get_default_args()
50+
kwargs.pop("job_order")
51+
kwargs.pop("workflow")
52+
kwargs.pop("outdir")
53+
kwargs.update(execkwargs)
54+
self.execkwargs = kwargs
4555

4656
def make(self, cwl):
4757
"""Instantiate a CWL object from a CWl document."""

cwltool/job.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import tempfile
1515
from abc import ABCMeta, abstractmethod
1616
from io import open
17+
from threading import Lock
1718

1819
import shellescape
1920
from typing import (IO, Any, Callable, Dict, Iterable, List, MutableMapping, Text,
@@ -31,6 +32,8 @@
3132

3233
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
3334

35+
job_output_lock = Lock()
36+
3437
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"
3538

3639
SHELL_COMMAND_TEMPLATE = """#!/bin/bash
@@ -266,7 +269,8 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"):
266269
if _logger.isEnabledFor(logging.DEBUG):
267270
_logger.debug(u"[job %s] %s", self.name, json.dumps(outputs, indent=4))
268271

269-
self.output_callback(outputs, processStatus)
272+
with job_output_lock:
273+
self.output_callback(outputs, processStatus)
270274

271275
if self.stagedir and os.path.exists(self.stagedir):
272276
_logger.debug(u"[job %s] Removing input staging directory %s", self.name, self.stagedir)
@@ -320,13 +324,16 @@ def get_from_requirements(self, r, req, pull_image, dry_run=False):
320324
pass
321325

322326
@abstractmethod
323-
def create_runtime(self, env, rm_container=True, **kwargs):
324-
# type: (MutableMapping[Text, Text], bool, **Any) -> List
327+
def create_runtime(self, env, rm_container, record_container_id, cidfile_dir,
328+
cidfile_prefix, **kwargs):
329+
# type: (MutableMapping[Text, Text], bool, bool, Text, Text, **Any) -> List
325330
pass
326331

327332
def run(self, pull_image=True, rm_container=True,
333+
record_container_id=False, cidfile_dir="",
334+
cidfile_prefix="",
328335
rm_tmpdir=True, move_outputs="move", **kwargs):
329-
# type: (bool, bool, bool, Text, **Any) -> None
336+
# type: (bool, bool, bool, Text, Text, bool, Text, **Any) -> None
330337

331338
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
332339

@@ -372,7 +379,7 @@ def run(self, pull_image=True, rm_container=True,
372379
"--user-space-docker-cmd.: {1}".format(container, e))
373380

374381
self._setup(kwargs)
375-
runtime = self.create_runtime(env, rm_container, **kwargs)
382+
runtime = self.create_runtime(env, rm_container, record_container_id, cidfile_dir, cidfile_prefix, **kwargs)
376383
runtime.append(img_id)
377384

378385
self._execute(runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs)

0 commit comments

Comments
 (0)