Skip to content

Commit 1b1a158

Browse files
authored
Support for v1.1 TimeLimit, WorkReuse and NetworkAccess (#718)
* Kill jobs that exceed timeout (TimeLimit) * Enable/disable network (NetworkAccess) * Enable/disable caching (WorkReuse) * Support v1.1 and as a prefixed backport in v1.0
1 parent 65203c7 commit 1b1a158

17 files changed

+361
-28
lines changed

cwltool/argparser.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
208208
help="Specify a default docker container that will be used if the workflow fails to specify one.")
209209
parser.add_argument("--no-match-user", action="store_true",
210210
help="Disable passing the current uid to `docker run --user`")
211-
parser.add_argument("--disable-net", action="store_true",
212-
help="Use docker's default networking for containers;"
213-
" the default is to enable networking.")
214211
parser.add_argument("--custom-net", type=Text,
215-
help="Will be passed to `docker run` as the '--net' "
216-
"parameter. Implies '--enable-net'.")
212+
help="Passed to `docker run` as the '--net' "
213+
"parameter when NetworkAccess is true.")
217214
parser.add_argument("--disable-validate", dest="do_validate",
218215
action="store_false", default=True,
219216
help=argparse.SUPPRESS)

cwltool/command_line_tool.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,15 @@ def job(self,
250250
):
251251
# type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]
252252

253+
require_prefix = ""
254+
if self.metadata["cwlVersion"] == "v1.0":
255+
require_prefix = "http://commonwl.org/cwltool#"
256+
257+
workReuse = self.get_requirement(require_prefix+"WorkReuse")[0]
258+
enableReuse = workReuse.get("enableReuse", True) if workReuse else True
259+
253260
jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))
254-
if kwargs.get("cachedir"):
261+
if kwargs.get("cachedir") and enableReuse:
255262
cacheargs = kwargs.copy()
256263
cacheargs["outdir"] = "/out"
257264
cacheargs["tmpdir"] = "/tmp"
@@ -487,6 +494,22 @@ def register_reader(f):
487494
adjustDirObjs(builder.files, register_reader)
488495
adjustDirObjs(builder.bindings, register_reader)
489496

497+
timelimit = self.get_requirement(require_prefix+"TimeLimit")[0]
498+
if timelimit:
499+
with SourceLine(timelimit, "timelimit", validate.ValidationException, debug):
500+
j.timelimit = builder.do_eval(timelimit["timelimit"])
501+
if not isinstance(j.timelimit, int) or j.timelimit < 0:
502+
raise Exception("timelimit must be an integer >= 0, got: %s" % j.timelimit)
503+
504+
if self.metadata["cwlVersion"] == "v1.0":
505+
j.networkaccess = True
506+
networkaccess = self.get_requirement(require_prefix+"NetworkAccess")[0]
507+
if networkaccess:
508+
with SourceLine(networkaccess, "networkAccess", validate.ValidationException, debug):
509+
j.networkaccess = builder.do_eval(networkaccess["networkAccess"])
510+
if not isinstance(j.networkaccess, bool):
511+
raise Exception("networkAccess must be a boolean, got: %s" % j.networkaccess)
512+
490513
j.environment = {}
491514
evr = self.get_requirement("EnvVarRequirement")[0]
492515
if evr:

cwltool/docker.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,10 @@ def create_runtime(self, env, rm_container=True, record_container_id=False, cidf
267267
if not kwargs.get("no_read_only"):
268268
runtime.append(u"--read-only=true")
269269

270-
if kwargs.get("custom_net", None) is not None:
271-
runtime.append(u"--net={0}".format(kwargs.get("custom_net")))
272-
elif kwargs.get("disable_net", None):
270+
if self.networkaccess:
271+
if kwargs.get("custom_net"):
272+
runtime.append(u"--net={0}".format(kwargs["custom_net"]))
273+
else:
273274
runtime.append(u"--net=none")
274275

275276
if self.stdout:

cwltool/extensions.yml

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,85 @@ $graph:
5353
which will be deliberately obscured from logging.
5454
jsonldPredicate:
5555
"_type": "@id"
56-
refScope: 0
56+
refScope: 0
57+
58+
59+
- type: record
60+
name: TimeLimit
61+
inVocab: false
62+
extends: cwl:ProcessRequirement
63+
doc: |
64+
Set an upper limit on the execution time of a CommandLineTool or
65+
ExpressionTool. A tool execution which exceeds the time limit may
66+
be preemptively terminated and considered failed. May also be
67+
used by batch systems to make scheduling decisions.
68+
fields:
69+
- name: class
70+
type: string
71+
doc: "Always 'TimeLimit'"
72+
jsonldPredicate:
73+
"_id": "@type"
74+
"_type": "@vocab"
75+
- name: timelimit
76+
type: [long, string]
77+
doc: |
78+
The time limit, in seconds. A time limit of zero means no
79+
time limit. Negative time limits are an error.
80+
81+
82+
- type: record
83+
name: WorkReuse
84+
inVocab: false
85+
extends: cwl:ProcessRequirement
86+
doc: |
87+
For implementations that support reusing output from past work (on
88+
the assumption that same code and same input produce same
89+
results), control whether to enable or disable the reuse behavior
90+
for a particular tool or step (to accomodate situations where that
91+
assumption is incorrect). A reused step is not executed but
92+
instead returns the same output as the original execution.
93+
94+
If `enableReuse` is not specified, correct tools should assume it
95+
is enabled by default.
96+
fields:
97+
- name: class
98+
type: string
99+
doc: "Always 'WorkReuse'"
100+
jsonldPredicate:
101+
"_id": "@type"
102+
"_type": "@vocab"
103+
- name: enableReuse
104+
type: [boolean, string]
105+
#default: true
106+
107+
108+
- type: record
109+
name: NetworkAccess
110+
inVocab: false
111+
extends: cwl:ProcessRequirement
112+
doc: |
113+
Indicate whether a process requires outgoing IPv4/IPv6 network
114+
access. Choice of IPv4 or IPv6 is implementation and site
115+
specific, correct tools must support both.
116+
117+
If `networkAccess` is false or not specified, tools must not
118+
assume network access, except for localhost (the loopback device).
119+
120+
If `networkAccess` is true, the tool must be able to make outgoing
121+
connections to network resources. Resources may be on a private
122+
subnet or the public Internet. However, implementations and sites
123+
may apply their own security policies to restrict what is
124+
accessible by the tool.
125+
126+
Enabling network access does not imply a publically routable IP
127+
address or the ability to accept inbound connections.
128+
129+
fields:
130+
- name: class
131+
type: string
132+
doc: "Always 'NetworkAccess'"
133+
jsonldPredicate:
134+
"_id": "@type"
135+
"_type": "@vocab"
136+
- name: networkAccess
137+
type: [boolean, string]

cwltool/job.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import stat
1212
import sys
1313
import tempfile
14+
import threading
1415
from abc import ABCMeta, abstractmethod
1516
from io import open
1617
from threading import Lock
@@ -154,6 +155,8 @@ def __init__(self): # type: () -> None
154155
self.generatefiles = None # type: Dict[Text, Union[List[Dict[Text, Text]], Dict[Text, Text], Text]]
155156
self.stagedir = None # type: Text
156157
self.inplace_update = None # type: bool
158+
self.timelimit = None # type: int
159+
self.networkaccess = False # type: bool
157160

158161
def _setup(self, kwargs): # type: (Dict) -> None
159162
if not os.path.exists(self.outdir):
@@ -235,9 +238,17 @@ def _execute(self,
235238
if builder is not None:
236239
job_script_contents = builder.build_job_script(commands)
237240
rcode = _job_popen(
238-
commands, stdin_path, stdout_path, stderr_path, env,
239-
self.outdir, tempfile.mkdtemp(prefix=tmp_outdir_prefix),
240-
job_script_contents)
241+
commands,
242+
stdin_path=stdin_path,
243+
stdout_path=stdout_path,
244+
stderr_path=stderr_path,
245+
env=env,
246+
cwd=self.outdir,
247+
job_dir=tempfile.mkdtemp(prefix=tmp_outdir_prefix),
248+
job_script_contents=job_script_contents,
249+
timelimit=self.timelimit,
250+
name=self.name
251+
)
241252

242253
if self.successCodes and rcode in self.successCodes:
243254
processStatus = "success"
@@ -429,7 +440,10 @@ def _job_popen(
429440
cwd, # type: Text
430441
job_dir, # type: Text
431442
job_script_contents=None, # type: Text
443+
timelimit=None, # type: int
444+
name=None # type: Text
432445
): # type: (...) -> int
446+
433447
if not job_script_contents and not FORCE_SHELLED_POPEN:
434448

435449
stdin = None # type: Union[IO[Any], int]
@@ -463,8 +477,22 @@ def _job_popen(
463477
if sp.stdin:
464478
sp.stdin.close()
465479

480+
tm = None
481+
if timelimit:
482+
def terminate():
483+
try:
484+
_logger.warn(u"[job %s] exceeded time limit of %d seconds and will be terminated", name, timelimit)
485+
sp.terminate()
486+
except OSError:
487+
pass
488+
tm = threading.Timer(timelimit, terminate)
489+
tm.start()
490+
466491
rcode = sp.wait()
467492

493+
if tm:
494+
tm.cancel()
495+
468496
if isinstance(stdin, io.IOBase):
469497
stdin.close()
470498

cwltool/pathmapper.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from schema_salad.sourceline import SourceLine
1818
from six.moves import urllib
1919

20-
from .utils import convert_pathsep_to_unix
20+
from .utils import convert_pathsep_to_unix, visit_class
2121

2222
from .stdfsaccess import StdFsAccess, abspath
2323

@@ -38,17 +38,6 @@ def adjustFiles(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]
3838
for d in rec:
3939
adjustFiles(d, op)
4040

41-
def visit_class(rec, cls, op): # type: (Any, Iterable, Union[Callable[..., Any], partial[Any]]) -> None
42-
"""Apply a function to with "class" in cls."""
43-
44-
if isinstance(rec, dict):
45-
if "class" in rec and rec.get("class") in cls:
46-
op(rec)
47-
for d in rec:
48-
visit_class(rec[d], cls, op)
49-
if isinstance(rec, list):
50-
for d in rec:
51-
visit_class(d, cls, op)
5241

5342
def adjustFileObjs(rec, op): # type: (Any, Union[Callable[..., Any], partial[Any]]) -> None
5443
"""Apply an update function to each File object in the object `rec`."""

cwltool/process.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ def filter(self, record):
7171
"StepInputExpressionRequirement",
7272
"ResourceRequirement",
7373
"InitialWorkDirRequirement",
74+
"TimeLimit",
75+
"WorkReuse",
76+
"NetworkAccess",
77+
"http://commonwl.org/cwltool#TimeLimit",
78+
"http://commonwl.org/cwltool#WorkReuse",
79+
"http://commonwl.org/cwltool#NetworkAccess",
7480
"http://commonwl.org/cwltool#LoadListingRequirement",
7581
"http://commonwl.org/cwltool#InplaceUpdateRequirement"]
7682

cwltool/schemas/v1.1.0-dev1/CommandLineTool.yml

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -990,3 +990,81 @@ $graph:
990990
- name: outdirMax
991991
type: ["null", long, string, Expression]
992992
doc: Maximum reserved filesystem based storage for the designated output directory, in mebibytes (2**20)
993+
994+
995+
- type: record
996+
name: TimeLimit
997+
extends: ProcessRequirement
998+
doc: |
999+
Set an upper limit on the execution time of a CommandLineTool or
1000+
ExpressionTool. A tool execution which exceeds the time limit may
1001+
be preemptively terminated and considered failed. May also be
1002+
used by batch systems to make scheduling decisions.
1003+
fields:
1004+
- name: class
1005+
type: string
1006+
doc: "Always 'TimeLimit'"
1007+
jsonldPredicate:
1008+
"_id": "@type"
1009+
"_type": "@vocab"
1010+
- name: timelimit
1011+
type: [long, string, Expression]
1012+
doc: |
1013+
The time limit, in seconds. A time limit of zero means no
1014+
time limit. Negative time limits are an error.
1015+
1016+
1017+
- type: record
1018+
name: WorkReuse
1019+
extends: ProcessRequirement
1020+
doc: |
1021+
For implementations that support reusing output from past work (on
1022+
the assumption that same code and same input produce same
1023+
results), control whether to enable or disable the reuse behavior
1024+
for a particular tool or step (to accomodate situations where that
1025+
assumption is incorrect). A reused step is not executed but
1026+
instead returns the same output as the original execution.
1027+
1028+
If `enableReuse` is not specified, correct tools should assume it
1029+
is enabled by default.
1030+
fields:
1031+
- name: class
1032+
type: string
1033+
doc: "Always 'WorkReuse'"
1034+
jsonldPredicate:
1035+
"_id": "@type"
1036+
"_type": "@vocab"
1037+
- name: enableReuse
1038+
type: [boolean, string, Expression]
1039+
default: true
1040+
1041+
1042+
- type: record
1043+
name: NetworkAccess
1044+
extends: ProcessRequirement
1045+
doc: |
1046+
Indicate whether a process requires outgoing IPv4/IPv6 network
1047+
access. Choice of IPv4 or IPv6 is implementation and site
1048+
specific, correct tools must support both.
1049+
1050+
If `networkAccess` is false or not specified, tools must not
1051+
assume network access, except for localhost (the loopback device).
1052+
1053+
If `networkAccess` is true, the tool must be able to make outgoing
1054+
connections to network resources. Resources may be on a private
1055+
subnet or the public Internet. However, implementations and sites
1056+
may apply their own security policies to restrict what is
1057+
accessible by the tool.
1058+
1059+
Enabling network access does not imply a publically routable IP
1060+
address or the ability to accept inbound connections.
1061+
1062+
fields:
1063+
- name: class
1064+
type: string
1065+
doc: "Always 'NetworkAccess'"
1066+
jsonldPredicate:
1067+
"_id": "@type"
1068+
"_type": "@vocab"
1069+
- name: networkAccess
1070+
type: [boolean, string, Expression]

cwltool/update.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from ruamel.yaml.comments import CommentedMap, CommentedSeq
1414
from schema_salad.ref_resolver import Loader
1515

16-
from .utils import aslist
16+
from .utils import aslist, visit_class
1717

1818
def findId(doc, frg): # type: (Any, Any) -> Dict
1919
if isinstance(doc, dict):
@@ -112,6 +112,16 @@ def v1_0dev4to1_0(doc, loader, baseuri):
112112
def v1_0to1_1_0dev1(doc, loader, baseuri):
113113
# type: (Any, Loader, Text) -> Tuple[Any, Text]
114114
"""Public updater for v1.0 to v1.1.0-dev1."""
115+
116+
def add_networkaccess(t):
117+
t.setdefault("requirements", [])
118+
t["requirements"].append({
119+
"class": "NetworkAccess",
120+
"networkAccess": True
121+
})
122+
123+
visit_class(doc, ("CommandLineTool",), add_networkaccess)
124+
115125
return (doc, "v1.1.0-dev1")
116126

117127

0 commit comments

Comments
 (0)