Skip to content

Commit 6402981

Browse files
committed
cleaning up MultiProc
1 parent 0b00a20 commit 6402981

File tree

1 file changed

+11
-18
lines changed

1 file changed

+11
-18
lines changed

nipype/pipeline/plugins/multiproc.py

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
# Import packages
1212
from multiprocessing import Process, Pool, cpu_count, pool
13-
import threading
1413
from traceback import format_exception
1514
import sys
1615

@@ -83,7 +82,8 @@ class NonDaemonPool(pool.Pool):
8382

8483

8584
class MultiProcPlugin(DistributedPluginBase):
86-
"""Execute workflow with multiprocessing, not sending more jobs at once
85+
"""
86+
Execute workflow with multiprocessing, not sending more jobs at once
8787
than the system can support.
8888
8989
The plugin_args input to run can be used to control the multiprocessing
@@ -102,6 +102,8 @@ class MultiProcPlugin(DistributedPluginBase):
102102
- non_daemon : boolean flag to execute as non-daemon processes
103103
- n_procs: maximum number of threads to be executed in parallel
104104
- memory_gb: maximum memory (in GB) that can be used at once.
105+
- raise_insufficient: raise error if the requested resources for
106+
a node over the maximum `n_procs` and/or `memory_gb`.
105107
106108
"""
107109

@@ -112,7 +114,6 @@ def __init__(self, plugin_args=None):
112114
self._task_obj = {}
113115
self._taskid = 0
114116
self._timeout = 2.0
115-
# self._event = threading.Event()
116117

117118
# Read in options or set defaults.
118119
non_daemon = self.plugin_args.get('non_daemon', True)
@@ -126,18 +127,8 @@ def __init__(self, plugin_args=None):
126127
'non' if non_daemon else '', self.processors, self.memory_gb)
127128
self.pool = (NonDaemonPool if non_daemon else Pool)(processes=self.processors)
128129

129-
# def _wait(self):
130-
# if len(self.pending_tasks) > 0:
131-
# if self._config['execution']['poll_sleep_duration']:
132-
# self._timeout = float(self._config['execution']['poll_sleep_duration'])
133-
# sig_received = self._event.wait(self._timeout)
134-
# if not sig_received:
135-
# logger.debug('MultiProcPlugin timeout before signal received. Deadlock averted??')
136-
# self._event.clear()
137-
138130
def _async_callback(self, args):
139131
self._taskresult[args['taskid']] = args
140-
# self._event.set()
141132

142133
def _get_result(self, taskid):
143134
return self._taskresult.get(taskid)
@@ -178,7 +169,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
178169

179170
# Check to see if a job is available
180171
currently_running_jobids = np.flatnonzero(
181-
self.proc_pending & (self.depidx.sum(axis=0) == 0).__array__())
172+
np.array(self.proc_pending, dtype=bool) & ~self.depidx.sum(axis=0).astype(bool)
173+
)
182174

183175
# Check available system resources by summing all threads and memory used
184176
busy_memory_gb = 0
@@ -210,6 +202,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
210202
# Check all jobs without dependency not run
211203
jobids = np.flatnonzero((self.proc_done == False) &
212204
(self.depidx.sum(axis=0) == 0).__array__())
205+
# jobids = np.flatnonzero(~np.array(self.proc_done, dtype=bool) &
206+
# (self.depidx.sum(axis=0) == 0))
213207

214208
# Sort jobs ready to run first by memory and then by number of threads
215209
# The most resource consuming jobs run first
@@ -226,10 +220,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
226220
# Submit first job on the list
227221
for jobid in jobids:
228222
if resource_monitor:
229-
logger.debug('Next Job: %d, memory (GB): %d, threads: %d' \
230-
% (jobid,
231-
self.procs[jobid]._interface.estimated_memory_gb,
232-
self.procs[jobid]._interface.num_threads))
223+
logger.debug('Next Job: %d, memory (GB): %d, threads: %d',
224+
jobid, self.procs[jobid]._interface.estimated_memory_gb,
225+
self.procs[jobid]._interface.num_threads)
233226

234227
if self.procs[jobid]._interface.estimated_memory_gb <= free_memory_gb and \
235228
self.procs[jobid]._interface.num_threads <= free_processors:

0 commit comments

Comments
 (0)