Skip to content

Commit 91e9e9f

Browse files
authored
improve multi threading code, report lack of resources (#923)
1 parent 8b6cfba commit 91e9e9f

File tree

2 files changed

+27
-25
lines changed

2 files changed

+27
-25
lines changed

cwltool/command_line_tool.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,6 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
481481
if debug:
482482
_logger.debug(u"[job %s] command line bindings is %s", j.name,
483483
json_dumps(builder.bindings, indent=4))
484-
485484
dockerReq = self.get_requirement("DockerRequirement")[0]
486485
if dockerReq and runtimeContext.use_container:
487486
out_prefix = getdefault(runtimeContext.tmp_outdir_prefix, 'tmp')

cwltool/executors.py

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@
55
import tempfile
66
import threading
77
from abc import ABCMeta, abstractmethod
8-
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
8+
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
99

1010
import psutil
11-
from schema_salad.validate import ValidationException
1211
from six import string_types, with_metaclass
1312
from typing_extensions import Text # pylint: disable=unused-import
14-
# move to a regular typing import when Python 3.3-3.6 is no longer supported
13+
from schema_salad.validate import ValidationException
1514

1615
from .builder import Builder # pylint: disable=unused-import
1716
from .context import (RuntimeContext, # pylint: disable=unused-import
@@ -189,7 +188,7 @@ def __init__(self): # type: () -> None
189188
super(MultithreadedJobExecutor, self).__init__()
190189
self.threads = set() # type: Set[threading.Thread]
191190
self.exceptions = [] # type: List[WorkflowException]
192-
self.pending_jobs = [] # type: List[JobBase]
191+
self.pending_jobs = [] # type: List[Union[JobBase, WorkflowJob]]
193192
self.pending_jobs_lock = threading.Lock()
194193

195194
self.max_ram = psutil.virtual_memory().available / 2**20
@@ -218,7 +217,7 @@ def select_resources(self, request, runtime_context): # pylint: disable=unused-
218217
return result
219218

220219
def run_job(self,
221-
job, # type: JobBase
220+
job, # type: Union[JobBase, WorkflowJob]
222221
runtime_context # type: RuntimeContext
223222
): # type: (...) -> None
224223
""" Execute a single Job in a seperate thread. """
@@ -230,34 +229,38 @@ def run_job(self,
230229
while self.pending_jobs:
231230
with self.pending_jobs_lock:
232231
job = self.pending_jobs[0]
233-
if isinstance(job, JobBase):
234-
if ((self.allocated_ram + job.builder.resources["ram"])
235-
> self.max_ram or
236-
(self.allocated_cores + job.builder.resources["cores"])
237-
> self.max_cores):
238-
return
232+
if isinstance(job, JobBase) and \
233+
((self.allocated_ram + job.builder.resources["ram"])
234+
> self.max_ram or
235+
(self.allocated_cores + job.builder.resources["cores"])
236+
> self.max_cores):
237+
_logger.warning(
238+
'Job "%s" requested more resources (%s) than are '
239+
'available (max ram is %f, max cores is %f)',
240+
job.name, job.builder.resources, self.max_ram,
241+
self.max_cores)
242+
return
239243
self.pending_jobs.remove(job)
240244

241-
def runner(my_job, my_runtime_context):
245+
def runner():
242246
""" Job running thread. """
243247
try:
244-
my_job.run(my_runtime_context)
248+
job.run(runtime_context)
245249
except WorkflowException as err:
246250
_logger.exception("Got workflow error")
247251
self.exceptions.append(err)
248252
except Exception as err: # pylint: disable=broad-except
249253
_logger.exception("Got workflow error")
250254
self.exceptions.append(WorkflowException(Text(err)))
251255
finally:
252-
with my_runtime_context.workflow_eval_lock:
256+
with runtime_context.workflow_eval_lock:
253257
self.threads.remove(threading.current_thread())
254-
if isinstance(my_job, JobBase):
255-
self.allocated_ram -= my_job.builder.resources["ram"]
256-
self.allocated_cores -= my_job.builder.resources["cores"]
257-
my_runtime_context.workflow_eval_lock.notifyAll()
258+
if isinstance(job, JobBase):
259+
self.allocated_ram -= job.builder.resources["ram"]
260+
self.allocated_cores -= job.builder.resources["cores"]
261+
runtime_context.workflow_eval_lock.notifyAll()
258262

259-
thread = threading.Thread(
260-
target=runner, args=(job, runtime_context))
263+
thread = threading.Thread(target=runner)
261264
thread.daemon = True
262265
self.threads.add(thread)
263266
if isinstance(job, JobBase):
@@ -291,10 +294,10 @@ def run_jobs(self,
291294
runtime_context.workflow_eval_lock.acquire()
292295
for job in jobiter:
293296
if job is not None:
294-
if runtime_context.builder is not None:
295-
job.builder = runtime_context.builder
296-
if job.outdir:
297-
self.output_dirs.add(job.outdir)
297+
if isinstance(job, JobBase):
298+
job.builder = runtime_context.builder or job.builder
299+
if job.outdir:
300+
self.output_dirs.add(job.outdir)
298301

299302
self.run_job(job, runtime_context)
300303

0 commit comments

Comments
 (0)