Skip to content

Commit 19756e7

Browse files
committed
freshen cwltool/executors.py
1 parent e11b307 commit 19756e7

File tree

1 file changed

+30
-22
lines changed

1 file changed

+30
-22
lines changed

cwltool/executors.py

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,9 @@
88
from abc import ABCMeta, abstractmethod
99
from threading import Lock
1010
from typing import (
11-
Any,
1211
Dict,
1312
Iterable,
1413
List,
15-
MutableMapping,
1614
MutableSequence,
1715
Optional,
1816
Set,
@@ -45,14 +43,19 @@ class JobExecutor(object, metaclass=ABCMeta):
4543

4644
def __init__(self) -> None:
4745
"""Initialize."""
48-
self.final_output = (
49-
[]
50-
) # type: MutableSequence[Union[Optional[CWLObjectType], MutableSequence[CWLObjectType]]]
46+
self.final_output = [] # type: MutableSequence[Optional[CWLObjectType]]
5147
self.final_status = [] # type: List[str]
5248
self.output_dirs = set() # type: Set[str]
5349

54-
def __call__(self, *args: Any, **kwargs: Any) -> Any:
55-
return self.execute(*args, **kwargs)
50+
def __call__(
51+
self,
52+
process: Process,
53+
job_order_object: CWLObjectType,
54+
runtime_context: RuntimeContext,
55+
logger: logging.Logger = _logger,
56+
) -> Tuple[Optional[CWLObjectType], str]:
57+
58+
return self.execute(process, job_order_object, runtime_context, logger)
5659

5760
def output_callback(
5861
self, out: Optional[CWLObjectType], process_status: str
@@ -77,12 +80,12 @@ def execute(
7780
job_order_object: CWLObjectType,
7881
runtime_context: RuntimeContext,
7982
logger: logging.Logger = _logger,
80-
) -> Tuple[Union[Optional[CWLObjectType], MutableSequence[CWLObjectType]], str]:
83+
) -> Tuple[Union[Optional[CWLObjectType]], str]:
8184
"""Execute the process."""
8285
if not runtime_context.basedir:
8386
raise WorkflowException("Must provide 'basedir' in runtimeContext")
8487

85-
def check_for_abstract_op(tool: MutableMapping[str, Any]) -> None:
88+
def check_for_abstract_op(tool: CWLObjectType) -> None:
8689
if tool["class"] == "Operation":
8790
raise SourceLine(tool, "class", WorkflowException).makeError(
8891
"Workflow has unrunnable abstract Operation"
@@ -122,7 +125,7 @@ def check_for_abstract_op(tool: MutableMapping[str, Any]) -> None:
122125
elif (
123126
"cwl:defaults" in process.metadata
124127
and "https://w3id.org/cwl/cwl#requirements"
125-
in process.metadata["cwl:defaults"]
128+
in cast(CWLObjectType, process.metadata["cwl:defaults"])
126129
):
127130
if (
128131
process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
@@ -133,9 +136,12 @@ def check_for_abstract_op(tool: MutableMapping[str, Any]) -> None:
133136
"v1.0. You can adjust to use `cwltool:overrides` instead; or you "
134137
"can set the cwlVersion to v1.1"
135138
)
136-
job_reqs = process.metadata["cwl:defaults"][
137-
"https://w3id.org/cwl/cwl#requirements"
138-
]
139+
job_reqs = cast(
140+
Optional[List[CWLObjectType]],
141+
cast(CWLObjectType, process.metadata["cwl:defaults"])[
142+
"https://w3id.org/cwl/cwl#requirements"
143+
],
144+
)
139145
if job_reqs is not None:
140146
for req in job_reqs:
141147
process.requirements.append(req)
@@ -159,10 +165,10 @@ def check_for_abstract_op(tool: MutableMapping[str, Any]) -> None:
159165

160166
if runtime_context.rm_tmpdir:
161167
if runtime_context.cachedir is None:
162-
output_dirs = self.output_dirs # type: Iterable[Any]
168+
output_dirs = self.output_dirs # type: Iterable[str]
163169
else:
164170
output_dirs = filter(
165-
lambda x: not x.startswith(runtime_context.cachedir),
171+
lambda x: not x.startswith(runtime_context.cachedir), # type: ignore
166172
self.output_dirs,
167173
)
168174
cleanIntermediate(output_dirs)
@@ -279,16 +285,16 @@ def __init__(self) -> None:
279285
self.pending_jobs_lock = threading.Lock()
280286

281287
self.max_ram = int(psutil.virtual_memory().available / 2 ** 20)
282-
self.max_cores = psutil.cpu_count()
283-
self.allocated_ram = 0
284-
self.allocated_cores = 0
288+
self.max_cores = float(psutil.cpu_count())
289+
self.allocated_ram = float(0)
290+
self.allocated_cores = float(0)
285291

286292
def select_resources(
287293
self, request, runtime_context
288294
): # pylint: disable=unused-argument
289-
# type: (Dict[str, int], RuntimeContext) -> Dict[str, int]
295+
# type: (Dict[str, Union[int, float]], RuntimeContext) -> Dict[str, Union[int, float]]
290296
"""Naïve check for available cpu cores and memory."""
291-
result = {} # type: Dict[str, int]
297+
result = {} # type: Dict[str, Union[int, float]]
292298
maxrsc = {"cores": self.max_cores, "ram": self.max_ram}
293299
for rsc in ("cores", "ram"):
294300
if request[rsc + "Min"] > maxrsc[rsc]:
@@ -328,7 +334,9 @@ def _runner(self, job, runtime_context, TMPDIR_LOCK):
328334
self.allocated_cores -= job.builder.resources["cores"]
329335
runtime_context.workflow_eval_lock.notifyAll()
330336

331-
def run_job(self, job: JobsType, runtime_context: RuntimeContext,) -> None:
337+
def run_job(
338+
self, job: Optional[JobsType], runtime_context: RuntimeContext,
339+
) -> None:
332340
"""Execute a single Job in a seperate thread."""
333341
if job is not None:
334342
with self.pending_jobs_lock:
@@ -452,5 +460,5 @@ def execute(
452460
job_order_object: CWLObjectType,
453461
runtime_context: RuntimeContext,
454462
logger: Optional[logging.Logger] = None,
455-
) -> Tuple[Union[Optional[CWLObjectType], MutableSequence[CWLObjectType]], str]:
463+
) -> Tuple[Optional[CWLObjectType], str]:
456464
return {}, "success"

0 commit comments

Comments
 (0)