Skip to content

Commit 4283521

Browse files
committed
Merge pull request #892 from satra/fix/max_jobs
fix: max_jobs plugin parameter now acts within the inner submission loop
2 parents 365478e + f067076 commit 4283521

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

CHANGES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Next Release
2525
* FIX: MRTrix tracking algorithms were ignoring mask parameters.
2626
* FIX: FNIRT registration pathway and associated OpenFMRI example script
2727
* FIX: spm12b compatibility for Model estimate
28+
* FIX: Batch scheduler controls the number of maximum jobs properly
2829

2930
Release 0.9.2 (January 31, 2014)
3031
============

nipype/pipeline/plugins/base.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,17 @@ def run(self, graph, config, updatehash=False):
252252
if toappend:
253253
self.pending_tasks.extend(toappend)
254254
num_jobs = len(self.pending_tasks)
255+
logger.debug('Number of pending tasks: %d' % num_jobs)
255256
if num_jobs < self.max_jobs:
256257
if np.isinf(self.max_jobs):
257258
slots = None
258259
else:
259-
slots = self.max_jobs - num_jobs
260+
slots = max(0, self.max_jobs - num_jobs)
261+
logger.debug('Slots available: %s' % slots)
260262
self._send_procs_to_workers(updatehash=updatehash,
261263
slots=slots, graph=graph)
264+
else:
265+
logger.debug('Not submitting')
262266
sleep(2)
263267
self._remove_node_dirs()
264268
report_nodes_not_run(notrun)
@@ -327,9 +331,12 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
327331
# Check to see if a job is available
328332
jobids = np.flatnonzero((self.proc_done == False) &
329333
(self.depidx.sum(axis=0) == 0).__array__())
334+
num_jobs = len(self.pending_tasks)
335+
if num_jobs >= self.max_jobs:
336+
break
330337
if len(jobids) > 0:
331338
# send all available jobs
332-
logger.info('Submitting %d jobs' % len(jobids))
339+
logger.info('Submitting %d jobs' % len(jobids[:slots]))
333340
for jobid in jobids[:slots]:
334341
if isinstance(self.procs[jobid], MapNode):
335342
try:
@@ -339,6 +346,8 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
339346
self.proc_pending[jobid] = False
340347
continue
341348
if num_subnodes > 1:
349+
if num_subnodes > (self.max_jobs - len(self.pending_tasks)):
350+
break
342351
submit = self._submit_mapnode(jobid)
343352
if not submit:
344353
continue

0 commit comments

Comments
 (0)