Skip to content

Commit 8a5e7a3

Browse files
committed
add MultiProc scheduler option
1 parent 430a3b4 commit 8a5e7a3

File tree

1 file changed

+9
-8
lines changed

1 file changed

+9
-8
lines changed

nipype/pipeline/plugins/multiproc.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
208208
'be submitted to the queue. Potential deadlock')
209209
return
210210

211-
# Sort jobs ready to run first by memory and then by number of threads
212-
# The most resource consuming jobs run first
213-
# jobids = sorted(jobids,
214-
# key=lambda item: (self.procs[item]._mem_gb,
215-
# self.procs[item]._n_procs))
216-
217-
# While have enough memory and processors for first job
218-
# Submit first job on the list
211+
jobids = self._sort_jobs(jobids, scheduler=self.plugin_args.get('scheduler'))
212+
213+
# Submit jobs
219214
for jobid in jobids:
220215
# First expand mapnodes
221216
if isinstance(self.procs[jobid], MapNode):
@@ -302,3 +297,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
302297
self.proc_pending[jobid] = False
303298
else:
304299
self.pending_tasks.insert(0, (tid, jobid))
300+
301+
def _sort_jobs(self, jobids, scheduler='tsort'):
302+
if scheduler == 'mem_thread':
303+
return sorted(jobids, key=lambda item: (
304+
self.procs[item].mem_gb, self.procs[item].n_procs))
305+
return jobids

0 commit comments

Comments
 (0)