Skip to content

Commit 7617c0b

Browse files
committed
support opaque resources
1 parent 2710cfe commit 7617c0b

File tree

8 files changed

+71
-33
lines changed

8 files changed

+71
-33
lines changed

cwltool/builder.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def __init__(
158158
names: Names,
159159
requirements: List[CWLObjectType],
160160
hints: List[CWLObjectType],
161-
resources: Dict[str, Union[int, float]],
161+
resources: Dict[str, Union[int, float, str]],
162162
mutation_manager: Optional[MutationManager],
163163
formatgraph: Optional[Graph],
164164
make_fs_access: Type[StdFsAccess],
@@ -635,8 +635,10 @@ def do_eval(
635635

636636
resources = self.resources
637637
if self.resources and "cores" in self.resources:
638-
resources = copy.copy(resources)
639-
resources["cores"] = int(math.ceil(resources["cores"]))
638+
cores = resources["cores"]
639+
if not isinstance(cores, str):
640+
resources = copy.copy(resources)
641+
resources["cores"] = int(math.ceil(cores))
640642

641643
return expression.do_eval(
642644
ex,

cwltool/context.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ class RuntimeContext(ContextBase):
8181
def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
8282
"""Initialize the RuntimeContext from the kwargs."""
8383
select_resources_callable = Callable[ # pylint: disable=unused-variable
84-
[Dict[str, Union[int, float]], RuntimeContext], Dict[str, Union[int, float]]
84+
[Dict[str, Union[int, float, str]], RuntimeContext],
85+
Dict[str, Union[int, float, str]],
8586
]
8687
self.user_space_docker_cmd = "" # type: Optional[str]
8788
self.secret_store = None # type: Optional[SecretStore]

cwltool/docker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,9 @@ def create_runtime(
446446
runtime.append("--env=%s=%s" % (key, value))
447447

448448
if runtimeContext.strict_memory_limit and not user_space_docker_cmd:
449-
runtime.append("--memory=%dm" % self.builder.resources["ram"])
449+
ram = self.builder.resources["ram"]
450+
if not isinstance(ram, str):
451+
runtime.append("--memory=%dm" % ram)
450452
elif not user_space_docker_cmd:
451453
res_req, _ = self.builder.get_requirement("ResourceRequirement")
452454
if res_req and ("ramMin" in res_req or "ramMax" in res_req):

cwltool/executors.py

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -293,23 +293,33 @@ def __init__(self) -> None:
293293
def select_resources(
294294
self, request, runtime_context
295295
): # pylint: disable=unused-argument
296-
# type: (Dict[str, Union[int, float]], RuntimeContext) -> Dict[str, Union[int, float]]
296+
# type: (Dict[str, Union[int, float, str]], RuntimeContext) -> Dict[str, Union[int, float, str]]
297297
"""Naïve check for available cpu cores and memory."""
298-
result = {} # type: Dict[str, Union[int, float]]
298+
result = {} # type: Dict[str, Union[int, float, str]]
299299
maxrsc = {"cores": self.max_cores, "ram": self.max_ram}
300300
for rsc in ("cores", "ram"):
301-
if request[rsc + "Min"] > maxrsc[rsc]:
301+
rsc_min = request[rsc + "Min"]
302+
if not isinstance(rsc_min, str) and rsc_min > maxrsc[rsc]:
302303
raise WorkflowException(
303304
"Requested at least %d %s but only %d available"
304-
% (request[rsc + "Min"], rsc, maxrsc[rsc])
305+
% (rsc_min, rsc, maxrsc[rsc])
305306
)
306-
if request[rsc + "Max"] < maxrsc[rsc]:
307-
result[rsc] = math.ceil(request[rsc + "Max"])
307+
rsc_max = request[rsc + "Max"]
308+
if not isinstance(rsc_max, str) and rsc_max < maxrsc[rsc]:
309+
result[rsc] = math.ceil(rsc_max)
308310
else:
309311
result[rsc] = maxrsc[rsc]
310312

311-
result["tmpdirSize"] = math.ceil(request["tmpdirMin"])
312-
result["outdirSize"] = math.ceil(request["outdirMin"])
313+
result["tmpdirSize"] = (
314+
math.ceil(request["tmpdirMin"])
315+
if not isinstance(request["tmpdirMin"], str)
316+
else request["tmpdirMin"]
317+
)
318+
result["outdirSize"] = (
319+
math.ceil(request["outdirMin"])
320+
if not isinstance(request["outdirMin"], str)
321+
else request["outdirMin"]
322+
)
313323

314324
return result
315325

@@ -334,8 +344,12 @@ def _runner(self, job, runtime_context, TMPDIR_LOCK):
334344
with runtime_context.workflow_eval_lock:
335345
self.threads.remove(threading.current_thread())
336346
if isinstance(job, JobBase):
337-
self.allocated_ram -= job.builder.resources["ram"]
338-
self.allocated_cores -= job.builder.resources["cores"]
347+
ram = job.builder.resources["ram"]
348+
if not isinstance(ram, str):
349+
self.allocated_ram -= ram
350+
cores = job.builder.resources["cores"]
351+
if not isinstance(cores, str):
352+
self.allocated_cores -= cores
339353
runtime_context.workflow_eval_lock.notifyAll()
340354

341355
def run_job(
@@ -353,9 +367,11 @@ def run_job(
353367
while (n + 1) <= len(self.pending_jobs):
354368
job = self.pending_jobs[n]
355369
if isinstance(job, JobBase):
356-
if (job.builder.resources["ram"]) > self.max_ram or (
357-
job.builder.resources["cores"]
358-
) > self.max_cores:
370+
ram = job.builder.resources["ram"]
371+
cores = job.builder.resources["cores"]
372+
if (not isinstance(ram, str) and ram > self.max_ram) or (
373+
not isinstance(cores, str) and cores > self.max_cores
374+
):
359375
_logger.error(
360376
'Job "%s" cannot be run, requests more resources (%s) '
361377
"than available on this host (max ram %d, max cores %d",
@@ -370,10 +386,12 @@ def run_job(
370386
return
371387

372388
if (
373-
self.allocated_ram + job.builder.resources["ram"]
374-
) > self.max_ram or (
375-
self.allocated_cores + job.builder.resources["cores"]
376-
) > self.max_cores:
389+
not isinstance(ram, str)
390+
and self.allocated_ram + ram > self.max_ram
391+
) or (
392+
not isinstance(cores, str)
393+
and self.allocated_cores + cores > self.max_cores
394+
):
377395
_logger.debug(
378396
'Job "%s" cannot run yet, resources (%s) are not '
379397
"available (already allocated ram is %d, allocated cores is %d, "
@@ -394,8 +412,12 @@ def run_job(
394412
thread.daemon = True
395413
self.threads.add(thread)
396414
if isinstance(job, JobBase):
397-
self.allocated_ram += job.builder.resources["ram"]
398-
self.allocated_cores += job.builder.resources["cores"]
415+
ram = job.builder.resources["ram"]
416+
if not isinstance(ram, str):
417+
self.allocated_ram += ram
418+
cores = job.builder.resources["cores"]
419+
if not isinstance(cores, str):
420+
self.allocated_cores += cores
399421
thread.start()
400422
self.pending_jobs.remove(job)
401423

cwltool/expression.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ def do_eval(
345345
requirements: List[CWLObjectType],
346346
outdir: Optional[str],
347347
tmpdir: Optional[str],
348-
resources: Dict[str, Union[float, int]],
348+
resources: Dict[str, Union[float, int, str]],
349349
context: Optional[CWLOutputType] = None,
350350
timeout: float = default_timeout,
351351
force_docker_pull: bool = False,

cwltool/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@
101101
windows_default_container_id,
102102
)
103103
from .workflow import Workflow
104+
104105
# PYTHON_ARGCOMPLETE_OK
105106

107+
106108
def _terminate_processes() -> None:
107109
"""Kill all spawned processes.
108110
@@ -397,7 +399,7 @@ def init_job_order(
397399
k: v for k, v in cmd_line.items() if k.startswith(record_name)
398400
}
399401
for key, value in record_items.items():
400-
record[key[len(record_name) + 1:]] = value
402+
record[key[len(record_name) + 1 :]] = value
401403
del cmd_line[key]
402404
cmd_line[str(record_name)] = record
403405
if "job_order" in cmd_line and cmd_line["job_order"]:

cwltool/process.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -962,7 +962,7 @@ def inc(d): # type: (List[int]) -> None
962962

963963
def evalResources(
964964
self, builder: Builder, runtimeContext: RuntimeContext
965-
) -> Dict[str, Union[int, float]]:
965+
) -> Dict[str, Union[int, float, str]]:
966966
resourceReq, _ = self.get_requirement("ResourceRequirement")
967967
if resourceReq is None:
968968
resourceReq = {}
@@ -973,7 +973,7 @@ def evalResources(
973973
ram = 1024
974974
else:
975975
ram = 256
976-
request = {
976+
request: Dict[str, Union[int, float, str]] = {
977977
"coresMin": 1,
978978
"coresMax": 1,
979979
"ramMin": ram,
@@ -982,7 +982,7 @@ def evalResources(
982982
"tmpdirMax": 1024,
983983
"outdirMin": 1024,
984984
"outdirMax": 1024,
985-
} # type: Dict[str, Union[int, float]]
985+
}
986986
for a in ("cores", "ram", "tmpdir", "outdir"):
987987
mn = mx = None # type: Optional[Union[int, float]]
988988
if resourceReq.get(a + "Min"):
@@ -1012,9 +1012,15 @@ def evalResources(
10121012
return runtimeContext.select_resources(request, runtimeContext)
10131013
return {
10141014
"cores": request["coresMin"],
1015-
"ram": math.ceil(request["ramMin"]),
1016-
"tmpdirSize": math.ceil(request["tmpdirMin"]),
1017-
"outdirSize": math.ceil(request["outdirMin"]),
1015+
"ram": math.ceil(request["ramMin"])
1016+
if not isinstance(request["ramMin"], str)
1017+
else request["ramMin"],
1018+
"tmpdirSize": math.ceil(request["tmpdirMin"])
1019+
if not isinstance(request["tmpdirMin"], str)
1020+
else request["tmpdirMin"],
1021+
"outdirSize": math.ceil(request["outdirMin"])
1022+
if not isinstance(request["outdirMin"], str)
1023+
else request["outdirMin"],
10181024
}
10191025

10201026
def validate_hints(

cwltool/singularity.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ def _singularity_supports_userns() -> bool:
4747
stdout=DEVNULL,
4848
universal_newlines=True,
4949
).communicate(timeout=60)[1]
50-
_USERNS = "No valid /bin/sh" in result or "/bin/sh doesn't exist in container" in result
50+
_USERNS = (
51+
"No valid /bin/sh" in result
52+
or "/bin/sh doesn't exist in container" in result
53+
)
5154
except TimeoutExpired:
5255
_USERNS = False
5356
return _USERNS

0 commit comments

Comments
 (0)