Skip to content

Commit 602b4fe

Browse files
author
Anton Khodak
committed
Create separate classes for docker & singularity command line jobs
1 parent 3102a10 commit 602b4fe

File tree

4 files changed

+392
-353
lines changed

4 files changed

+392
-353
lines changed

cwltool/docker.py

Lines changed: 221 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,124 +1,242 @@
11
from __future__ import absolute_import
2+
23
import logging
34
import os
45
import re
6+
import shutil
57
import subprocess
68
import sys
79
import tempfile
810
from io import open
9-
from typing import Dict, List, Text
1011

1112
import requests
13+
from typing import (Dict, List, Text, Any, MutableMapping)
1214

15+
from .docker_id import docker_vm_id
1316
from .errors import WorkflowException
17+
from .job import ContainerCommandLineJob
18+
from .pathmapper import PathMapper, ensure_writable
19+
from .utils import docker_windows_path_adjust, onWindows
1420

1521
_logger = logging.getLogger("cwltool")
1622

1723

18-
def get_image(dockerRequirement, pull_image, dry_run=False):
19-
# type: (Dict[Text, Text], bool, bool) -> bool
20-
found = False
21-
22-
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
23-
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
24-
25-
for ln in subprocess.check_output(
26-
["docker", "images", "--no-trunc", "--all"]).decode('utf-8').splitlines():
27-
try:
28-
m = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", ln)
29-
sp = dockerRequirement["dockerImageId"].split(":")
30-
if len(sp) == 1:
31-
sp.append("latest")
32-
elif len(sp) == 2:
33-
# if sp[1] doesn't match valid tag names, it is a part of repository
34-
if not re.match(r'[\w][\w.-]{0,127}', sp[1]):
35-
sp[0] = sp[0] + ":" + sp[1]
36-
sp[1] = "latest"
37-
elif len(sp) == 3:
38-
if re.match(r'[\w][\w.-]{0,127}', sp[2]):
39-
sp[0] = sp[0] + ":" + sp[1]
40-
sp[1] = sp[2]
41-
del sp[2]
42-
43-
# check for repository:tag match or image id match
44-
if ((sp[0] == m.group(1) and sp[1] == m.group(2)) or dockerRequirement["dockerImageId"] == m.group(3)):
45-
found = True
46-
break
47-
except ValueError:
48-
pass
49-
50-
if not found and pull_image:
51-
cmd = [] # type: List[Text]
52-
if "dockerPull" in dockerRequirement:
53-
cmd = ["docker", "pull", str(dockerRequirement["dockerPull"])]
54-
_logger.info(Text(cmd))
55-
if not dry_run:
56-
subprocess.check_call(cmd, stdout=sys.stderr)
57-
found = True
58-
elif "dockerFile" in dockerRequirement:
59-
dockerfile_dir = str(tempfile.mkdtemp())
60-
with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as df:
61-
df.write(dockerRequirement["dockerFile"].encode('utf-8'))
62-
cmd = ["docker", "build", "--tag=%s" %
63-
str(dockerRequirement["dockerImageId"]), dockerfile_dir]
64-
_logger.info(Text(cmd))
65-
if not dry_run:
66-
subprocess.check_call(cmd, stdout=sys.stderr)
67-
found = True
68-
elif "dockerLoad" in dockerRequirement:
69-
cmd = ["docker", "load"]
70-
_logger.info(Text(cmd))
71-
if not dry_run:
72-
if os.path.exists(dockerRequirement["dockerLoad"]):
73-
_logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"])
74-
with open(dockerRequirement["dockerLoad"], "rb") as f:
75-
loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr)
24+
class DockerCommandLineJob(ContainerCommandLineJob):
25+
26+
@staticmethod
27+
def get_image(dockerRequirement, pull_image, dry_run=False):
28+
# type: (Dict[Text, Text], bool, bool) -> bool
29+
found = False
30+
31+
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
32+
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
33+
34+
for ln in subprocess.check_output(
35+
["docker", "images", "--no-trunc", "--all"]).decode('utf-8').splitlines():
36+
try:
37+
m = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", ln)
38+
sp = dockerRequirement["dockerImageId"].split(":")
39+
if len(sp) == 1:
40+
sp.append("latest")
41+
elif len(sp) == 2:
42+
# if sp[1] doesn't match valid tag names, it is a part of repository
43+
if not re.match(r'[\w][\w.-]{0,127}', sp[1]):
44+
sp[0] = sp[0] + ":" + sp[1]
45+
sp[1] = "latest"
46+
elif len(sp) == 3:
47+
if re.match(r'[\w][\w.-]{0,127}', sp[2]):
48+
sp[0] = sp[0] + ":" + sp[1]
49+
sp[1] = sp[2]
50+
del sp[2]
51+
52+
# check for repository:tag match or image id match
53+
if ((sp[0] == m.group(1) and sp[1] == m.group(2)) or dockerRequirement["dockerImageId"] == m.group(3)):
54+
found = True
55+
break
56+
except ValueError:
57+
pass
58+
59+
if not found and pull_image:
60+
cmd = [] # type: List[Text]
61+
if "dockerPull" in dockerRequirement:
62+
cmd = ["docker", "pull", str(dockerRequirement["dockerPull"])]
63+
_logger.info(Text(cmd))
64+
if not dry_run:
65+
subprocess.check_call(cmd, stdout=sys.stderr)
66+
found = True
67+
elif "dockerFile" in dockerRequirement:
68+
dockerfile_dir = str(tempfile.mkdtemp())
69+
with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as df:
70+
df.write(dockerRequirement["dockerFile"].encode('utf-8'))
71+
cmd = ["docker", "build", "--tag=%s" %
72+
str(dockerRequirement["dockerImageId"]), dockerfile_dir]
73+
_logger.info(Text(cmd))
74+
if not dry_run:
75+
subprocess.check_call(cmd, stdout=sys.stderr)
76+
found = True
77+
elif "dockerLoad" in dockerRequirement:
78+
cmd = ["docker", "load"]
79+
_logger.info(Text(cmd))
80+
if not dry_run:
81+
if os.path.exists(dockerRequirement["dockerLoad"]):
82+
_logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"])
83+
with open(dockerRequirement["dockerLoad"], "rb") as f:
84+
loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr)
85+
else:
86+
loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr)
87+
_logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"])
88+
req = requests.get(dockerRequirement["dockerLoad"], stream=True)
89+
n = 0
90+
for chunk in req.iter_content(1024 * 1024):
91+
n += len(chunk)
92+
_logger.info("\r%i bytes" % (n))
93+
loadproc.stdin.write(chunk)
94+
loadproc.stdin.close()
95+
rcode = loadproc.wait()
96+
if rcode != 0:
97+
raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode))
98+
found = True
99+
elif "dockerImport" in dockerRequirement:
100+
cmd = ["docker", "import", str(dockerRequirement["dockerImport"]),
101+
str(dockerRequirement["dockerImageId"])]
102+
_logger.info(Text(cmd))
103+
if not dry_run:
104+
subprocess.check_call(cmd, stdout=sys.stderr)
105+
found = True
106+
107+
return found
108+
109+
def get_from_requirements(self, r, req, pull_image, dry_run=False):
110+
# type: (Dict[Text, Text], bool, bool, bool) -> Text
111+
if r:
112+
errmsg = None
113+
try:
114+
subprocess.check_output(["docker", "version"])
115+
except subprocess.CalledProcessError as e:
116+
errmsg = "Cannot communicate with docker daemon: " + Text(e)
117+
except OSError as e:
118+
errmsg = "'docker' executable not found: " + Text(e)
119+
120+
if errmsg:
121+
if req:
122+
raise WorkflowException(errmsg)
76123
else:
77-
loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr)
78-
_logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"])
79-
req = requests.get(dockerRequirement["dockerLoad"], stream=True)
80-
n = 0
81-
for chunk in req.iter_content(1024 * 1024):
82-
n += len(chunk)
83-
_logger.info("\r%i bytes" % (n))
84-
loadproc.stdin.write(chunk)
85-
loadproc.stdin.close()
86-
rcode = loadproc.wait()
87-
if rcode != 0:
88-
raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode))
89-
found = True
90-
elif "dockerImport" in dockerRequirement:
91-
cmd = ["docker", "import", str(dockerRequirement["dockerImport"]),
92-
str(dockerRequirement["dockerImageId"])]
93-
_logger.info(Text(cmd))
94-
if not dry_run:
95-
subprocess.check_call(cmd, stdout=sys.stderr)
96-
found = True
97-
98-
return found
99-
100-
101-
def get_from_requirements(r, req, pull_image, dry_run=False):
102-
# type: (Dict[Text, Text], bool, bool, bool) -> Text
103-
if r:
104-
errmsg = None
105-
try:
106-
subprocess.check_output(["docker", "version"])
107-
except subprocess.CalledProcessError as e:
108-
errmsg = "Cannot communicate with docker daemon: " + Text(e)
109-
except OSError as e:
110-
errmsg = "'docker' executable not found: " + Text(e)
111-
112-
if errmsg:
113-
if req:
114-
raise WorkflowException(errmsg)
124+
return None
125+
126+
if self.get_image(r, pull_image, dry_run):
127+
return r["dockerImageId"]
115128
else:
116-
return None
129+
if req:
130+
raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"])
131+
132+
return None
133+
134+
def add_volumes(self, pathmapper, runtime):
135+
# type: (PathMapper, List[Text]) -> None
136+
137+
host_outdir = self.outdir
138+
container_outdir = self.builder.outdir
139+
for src, vol in pathmapper.items():
140+
if not vol.staged:
141+
continue
142+
if vol.target.startswith(container_outdir+"/"):
143+
host_outdir_tgt = os.path.join(
144+
host_outdir, vol.target[len(container_outdir)+1:])
145+
else:
146+
host_outdir_tgt = None
147+
if vol.type in ("File", "Directory"):
148+
if not vol.resolved.startswith("_:"):
149+
runtime.append(u"--volume=%s:%s:ro" % (
150+
docker_windows_path_adjust(vol.resolved),
151+
docker_windows_path_adjust(vol.target)))
152+
elif vol.type == "WritableFile":
153+
if self.inplace_update:
154+
runtime.append(u"--volume=%s:%s:rw" % (
155+
docker_windows_path_adjust(vol.resolved),
156+
docker_windows_path_adjust(vol.target)))
157+
else:
158+
shutil.copy(vol.resolved, host_outdir_tgt)
159+
ensure_writable(host_outdir_tgt)
160+
elif vol.type == "WritableDirectory":
161+
if vol.resolved.startswith("_:"):
162+
os.makedirs(vol.target, 0o0755)
163+
else:
164+
if self.inplace_update:
165+
runtime.append(u"--volume=%s:%s:rw" % (
166+
docker_windows_path_adjust(vol.resolved),
167+
docker_windows_path_adjust(vol.target)))
168+
else:
169+
shutil.copytree(vol.resolved, host_outdir_tgt)
170+
ensure_writable(host_outdir_tgt)
171+
elif vol.type == "CreateFile":
172+
if host_outdir_tgt:
173+
with open(host_outdir_tgt, "wb") as f:
174+
f.write(vol.resolved.encode("utf-8"))
175+
else:
176+
fd, createtmp = tempfile.mkstemp(dir=self.tmpdir)
177+
with os.fdopen(fd, "wb") as f:
178+
f.write(vol.resolved.encode("utf-8"))
179+
runtime.append(u"--volume=%s:%s:rw" % (
180+
docker_windows_path_adjust(createtmp),
181+
docker_windows_path_adjust(vol.target)))
117182

118-
if get_image(r, pull_image, dry_run):
119-
return r["dockerImageId"]
183+
def create_runtime(self, env, rm_container=True, **kwargs):
184+
# type: (MutableMapping[Text, Text], bool, **Any) -> List
185+
user_space_docker_cmd = kwargs.get("user_space_docker_cmd")
186+
if user_space_docker_cmd:
187+
runtime = [user_space_docker_cmd, u"run"]
120188
else:
121-
if req:
122-
raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"])
189+
runtime = [u"docker", u"run", u"-i"]
190+
191+
runtime.append(u"--volume=%s:%s:rw" % (
192+
docker_windows_path_adjust(os.path.realpath(self.outdir)),
193+
self.builder.outdir))
194+
runtime.append(u"--volume=%s:%s:rw" % (
195+
docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp"))
196+
197+
self.add_volumes(self.pathmapper, runtime)
198+
if self.generatemapper:
199+
self.add_volumes(self.generatemapper, runtime)
200+
201+
if user_space_docker_cmd:
202+
runtime = [x.replace(":ro", "") for x in runtime]
203+
runtime = [x.replace(":rw", "") for x in runtime]
204+
205+
runtime.append(u"--workdir=%s" % (
206+
docker_windows_path_adjust(self.builder.outdir)))
207+
if not user_space_docker_cmd:
208+
209+
if not kwargs.get("no_read_only"):
210+
runtime.append(u"--read-only=true")
211+
212+
if kwargs.get("custom_net", None) is not None:
213+
runtime.append(u"--net={0}".format(kwargs.get("custom_net")))
214+
elif kwargs.get("disable_net", None):
215+
runtime.append(u"--net=none")
216+
217+
if self.stdout:
218+
runtime.append("--log-driver=none")
219+
220+
euid, egid = docker_vm_id()
221+
if not onWindows():
222+
# MS Windows does not have getuid() or geteuid() functions
223+
euid, egid = euid or os.geteuid(), egid or os.getgid()
224+
225+
if kwargs.get("no_match_user", None) is False \
226+
and (euid, egid) != (None, None):
227+
runtime.append(u"--user=%d:%d" % (euid, egid))
228+
229+
if rm_container:
230+
runtime.append(u"--rm")
231+
232+
runtime.append(u"--env=TMPDIR=/tmp")
233+
234+
# spec currently says "HOME must be set to the designated output
235+
# directory." but spec might change to designated temp directory.
236+
# runtime.append("--env=HOME=/tmp")
237+
runtime.append(u"--env=HOME=%s" % self.builder.outdir)
238+
239+
for t, v in self.environment.items():
240+
runtime.append(u"--env=%s=%s" % (t, v))
123241

124-
return None
242+
return runtime

cwltool/draft2tool.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@
2121
from six.moves import urllib
2222

2323
from .builder import CONTENT_LIMIT, Builder, substitute
24+
from .docker import DockerCommandLineJob
2425
from .errors import WorkflowException
2526
from .flatten import flatten
26-
from .job import CommandLineJob, DockerCommandLineJob, JobBase
27+
from .job import CommandLineJob, JobBase
2728
from .pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs,
2829
get_listing, trim_listing, visit_class)
2930
from .process import (Process, UnsupportedRequirement,
3031
_logger_validation_warnings, compute_checksums,
3132
normalizeFilesDirs, shortname, uniquename)
33+
from .singularity import SingularityCommandLineJob
3234
from .stdfsaccess import StdFsAccess
3335
from .utils import aslist, docker_windows_path_adjust, convert_pathsep_to_unix, windows_default_container_id, onWindows
3436
from six.moves import map
@@ -213,7 +215,10 @@ def makeJobRunner(self, use_container=True, **kwargs): # type: (Optional[bool],
213215
_logger.warning(DEFAULT_CONTAINER_MSG % (windows_default_container_id, windows_default_container_id))
214216

215217
if dockerReq and use_container:
216-
return DockerCommandLineJob()
218+
if kwargs.get('singularity'):
219+
return SingularityCommandLineJob()
220+
else:
221+
return DockerCommandLineJob()
217222
else:
218223
for t in reversed(self.requirements):
219224
if t["class"] == "DockerRequirement":

0 commit comments

Comments
 (0)