@@ -60,6 +60,7 @@ def run_node(node, updatehash, taskid):
6060class NonDaemonProcess (Process ):
6161 """A non-daemon process to support internal multiprocessing.
6262 """
63+
6364 def _get_daemon (self ):
6465 return False
6566
@@ -104,6 +105,8 @@ class MultiProcPlugin(DistributedPluginBase):
104105 - scheduler: sort jobs topologically (``'tsort'``, default value)
105106 or prioritize jobs by, first, memory consumption and, second,
106107 number of threads (``'mem_thread'`` option).
108+ - maxtasksperchild: number of nodes to run on each process before
109+ refreshing the worker (default: 10).
107110
108111 """
109112
@@ -116,6 +119,7 @@ def __init__(self, plugin_args=None):
116119
117120 # Read in options or set defaults.
118121 non_daemon = self .plugin_args .get ('non_daemon' , True )
122+ maxtasks = self .plugin_args .get ('maxtasksperchild' , 10 )
119123 self .processors = self .plugin_args .get ('n_procs' , cpu_count ())
120124 self .memory_gb = self .plugin_args .get ('memory_gb' , # Allocate 90% of system memory
121125 get_system_total_memory_gb () * 0.9 )
@@ -124,7 +128,14 @@ def __init__(self, plugin_args=None):
124128 # Instantiate different thread pools for non-daemon processes
125129 logger .debug ('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)' ,
126130 'non' if non_daemon else '' , self .processors , self .memory_gb )
127- self .pool = (NonDaemonPool if non_daemon else Pool )(processes = self .processors )
131+
132+ NipypePool = NonDaemonPool if non_daemon else Pool
133+ try :
134+ self .pool = NipypePool (processes = self .processors ,
135+ maxtasksperchild = maxtasks )
136+ except TypeError :
137+ self .pool = NipypePool (processes = self .processors )
138+
128139 self ._stats = None
129140
130141 def _async_callback (self , args ):
0 commit comments