Skip to content

Commit 5a152a0

Browse files
authored
Merge pull request #2284 from oesteban/enh/multiproc-maxtasksperchild
[ENH] Set maxtasksperchild in MultiProc Pool
2 parents d0741b9 + cfdf07f commit 5a152a0

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

doc/users/plugins.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ Optional arguments::
8282
exceed the total amount of resources available (memory and threads), when
8383
``False`` (default), only a warning will be issued.
8484

85+
maxtasksperchild : number of nodes to run on each process before refreshing
86+
the worker (default: 10).
87+
88+
8589
To distribute processing on a multicore machine, simply call::
8690

8791
workflow.run(plugin='MultiProc')

nipype/pipeline/plugins/multiproc.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def run_node(node, updatehash, taskid):
6060
class NonDaemonProcess(Process):
6161
"""A non-daemon process to support internal multiprocessing.
6262
"""
63+
6364
def _get_daemon(self):
6465
return False
6566

@@ -104,6 +105,8 @@ class MultiProcPlugin(DistributedPluginBase):
104105
- scheduler: sort jobs topologically (``'tsort'``, default value)
105106
or prioritize jobs by, first, memory consumption and, second,
106107
number of threads (``'mem_thread'`` option).
108+
- maxtasksperchild: number of nodes to run on each process before
109+
refreshing the worker (default: 10).
107110
108111
"""
109112

@@ -116,6 +119,7 @@ def __init__(self, plugin_args=None):
116119

117120
# Read in options or set defaults.
118121
non_daemon = self.plugin_args.get('non_daemon', True)
122+
maxtasks = self.plugin_args.get('maxtasksperchild', 10)
119123
self.processors = self.plugin_args.get('n_procs', cpu_count())
120124
self.memory_gb = self.plugin_args.get('memory_gb', # Allocate 90% of system memory
121125
get_system_total_memory_gb() * 0.9)
@@ -124,7 +128,14 @@ def __init__(self, plugin_args=None):
124128
# Instantiate different thread pools for non-daemon processes
125129
logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
126130
'non' if non_daemon else '', self.processors, self.memory_gb)
127-
self.pool = (NonDaemonPool if non_daemon else Pool)(processes=self.processors)
131+
132+
NipypePool = NonDaemonPool if non_daemon else Pool
133+
try:
134+
self.pool = NipypePool(processes=self.processors,
135+
maxtasksperchild=maxtasks)
136+
except TypeError:
137+
self.pool = NipypePool(processes=self.processors)
138+
128139
self._stats = None
129140

130141
def _async_callback(self, args):

0 commit comments

Comments
 (0)