Skip to content

Commit c628dea

Browse files
committed
Add an extension point allowing building scripts for cwltool jobs.
1 parent a39e0cd commit c628dea

File tree

2 files changed

+143
-34
lines changed

2 files changed

+143
-34
lines changed

cwltool/builder.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(self): # type: () -> None
3535
self.pathmapper = None # type: PathMapper
3636
self.stagedir = None # type: Text
3737
self.make_fs_access = None # type: Type[StdFsAccess]
38+
self.build_job_script = None # type: Callable[[List[str]], Text]
3839

3940
def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]):
4041
# type: (Dict[Text, Any], Any, Union[int, List[int]], List[int]) -> List[Dict[Text, Any]]

cwltool/job.py

Lines changed: 142 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import stat
1515
import re
1616
import shellescape
17+
import string
1718
from .docker_uid import docker_vm_uid
1819
from .builder import Builder
1920
from typing import (Any, Callable, Union, Iterable, Mapping, MutableMapping,
@@ -25,6 +26,58 @@
2526

2627
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
2728

29+
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "1") == "1"
30+
31+
SHELL_COMMAND_TEMPLATE = """#!/bin/bash
32+
python "run_job.py" "job.json"
33+
"""
34+
35+
PYTHON_RUN_SCRIPT = """
36+
import json
37+
import sys
38+
import subprocess
39+
40+
with open(sys.argv[1], "r") as f:
41+
popen_description = json.load(f)
42+
commands = popen_description["commands"]
43+
cwd = popen_description["cwd"]
44+
env = popen_description["env"]
45+
stdin_path = popen_description["stdin_path"]
46+
stdout_path = popen_description["stdout_path"]
47+
stderr_path = popen_description["stderr_path"]
48+
if stdin_path is not None:
49+
stdin = open(stdin_path, "rb")
50+
else:
51+
stdin = subprocess.PIPE
52+
if stdout_path is not None:
53+
stdout = open(stdout_path, "wb")
54+
else:
55+
stdout = sys.stderr
56+
if stderr_path is not None:
57+
stderr = open(stderr_path, "wb")
58+
else:
59+
stderr = sys.stderr
60+
sp = subprocess.Popen(commands,
61+
shell=False,
62+
close_fds=True,
63+
stdin=stdin,
64+
stdout=stdout,
65+
stderr=stderr,
66+
env=env,
67+
cwd=cwd)
68+
if sp.stdin:
69+
sp.stdin.close()
70+
rcode = sp.wait()
71+
if isinstance(stdin, file):
72+
stdin.close()
73+
if stdout is not sys.stderr:
74+
stdout.close()
75+
if stderr is not sys.stderr:
76+
stderr.close()
77+
sys.exit(rcode)
78+
"""
79+
80+
2881
def deref_links(outputs): # type: (Any) -> None
2982
if isinstance(outputs, dict):
3083
if outputs.get("class") == "File":
@@ -207,13 +260,15 @@ def linkoutdir(src, tgt):
207260
os.makedirs(dn)
208261
stdout_path = absout
209262

263+
build_job_script = self.builder.build_job_script # type: Callable[[List[str]], Text]
210264
rcode = _job_popen(
211265
[Text(x).encode('utf-8') for x in runtime + self.command_line],
212266
stdin_path=stdin_path,
213267
stdout_path=stdout_path,
214268
stderr_path=stderr_path,
215269
env=env,
216270
cwd=self.outdir,
271+
build_job_script=build_job_script,
217272
)
218273

219274
if self.successCodes and rcode in self.successCodes:
@@ -282,49 +337,102 @@ def _job_popen(
282337
stderr_path,
283338
env,
284339
cwd,
340+
job_dir=None,
341+
build_job_script=None,
285342
):
286-
# type: (List[str], Text, Text, Text, Union[MutableMapping[Text, Text], MutableMapping[str, str]], Text) -> int
343+
# type: (List[str], Text, Text, Text, Union[MutableMapping[Text, Text], MutableMapping[str, str]], Text, Text, Callable[[List[str]], Text]) -> int
287344

288-
stdin = None # type: Union[IO[Any], int]
289-
stderr = None # type: IO[Any]
290-
stdout = None # type: IO[Any]
345+
job_script_contents = None # type: Text
346+
if build_job_script:
347+
job_script_contents = build_job_script(commands)
291348

292-
if stdin_path is not None:
293-
stdin = open(stdin_path, "rb")
294-
else:
295-
stdin = subprocess.PIPE
349+
if not job_script_contents and not FORCE_SHELLED_POPEN:
296350

297-
if stdout_path is not None:
298-
stdout = open(stdout_path, "wb")
299-
else:
300-
stdout = sys.stderr
351+
stdin = None # type: Union[IO[Any], int]
352+
stderr = None # type: IO[Any]
353+
stdout = None # type: IO[Any]
301354

302-
if stderr_path is not None:
303-
stderr = open(stderr_path, "wb")
304-
else:
305-
stderr = sys.stderr
355+
if stdin_path is not None:
356+
stdin = open(stdin_path, "rb")
357+
else:
358+
stdin = subprocess.PIPE
306359

307-
sp = subprocess.Popen(commands,
308-
shell=False,
309-
close_fds=True,
310-
stdin=stdin,
311-
stdout=stdout,
312-
stderr=stderr,
313-
env=env,
314-
cwd=cwd)
360+
if stdout_path is not None:
361+
stdout = open(stdout_path, "wb")
362+
else:
363+
stdout = sys.stderr
315364

316-
if sp.stdin:
317-
sp.stdin.close()
365+
if stderr_path is not None:
366+
stderr = open(stderr_path, "wb")
367+
else:
368+
stderr = sys.stderr
318369

319-
rcode = sp.wait()
370+
sp = subprocess.Popen(commands,
371+
shell=False,
372+
close_fds=True,
373+
stdin=stdin,
374+
stdout=stdout,
375+
stderr=stderr,
376+
env=env,
377+
cwd=cwd)
320378

321-
if isinstance(stdin, file):
322-
stdin.close()
379+
if sp.stdin:
380+
sp.stdin.close()
323381

324-
if stdout is not sys.stderr:
325-
stdout.close()
382+
rcode = sp.wait()
326383

327-
if stderr is not sys.stderr:
328-
stderr.close()
384+
if isinstance(stdin, file):
385+
stdin.close()
386+
387+
if stdout is not sys.stderr:
388+
stdout.close()
389+
390+
if stderr is not sys.stderr:
391+
stderr.close()
392+
393+
return rcode
394+
else:
395+
if job_dir is None:
396+
job_dir = tempfile.mkdtemp(prefix="cwltooljob")
397+
398+
if not job_script_contents:
399+
job_script_contents = SHELL_COMMAND_TEMPLATE
400+
401+
env_copy = {}
402+
for key in env:
403+
key = key.encode("utf-8")
404+
env_copy[key] = env[key]
405+
406+
job_description = dict(
407+
commands=commands,
408+
cwd=cwd,
409+
env=env_copy,
410+
stdout_path=stdout_path,
411+
stderr_path=stderr_path,
412+
stdin_path=stdin_path,
413+
)
414+
with open(os.path.join(job_dir, "job.json"), "w") as f:
415+
json.dump(job_description, f)
416+
try:
417+
job_script = os.path.join(job_dir, "run_job.bash")
418+
with open(job_script, "w") as f:
419+
f.write(job_script_contents)
420+
job_run = os.path.join(job_dir, "run_job.py")
421+
with open(job_run, "w") as f:
422+
f.write(PYTHON_RUN_SCRIPT)
423+
sp = subprocess.Popen(
424+
["bash", job_script.encode("utf-8")],
425+
shell=False,
426+
cwd=job_dir,
427+
stdout=subprocess.PIPE,
428+
stderr=subprocess.PIPE,
429+
stdin=subprocess.PIPE,
430+
)
431+
if sp.stdin:
432+
sp.stdin.close()
433+
434+
rcode = sp.wait()
329435

330-
return rcode
436+
return rcode
437+
finally:
438+
shutil.rmtree(job_dir)

0 commit comments

Comments
 (0)