Skip to content

Commit a68e0e6

Browse files
committed
Added initial num_threads monitoring code
1 parent 43c0d56 commit a68e0e6

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

nipype/interfaces/base.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,25 @@ def _read(self, drain):
12041204
self._lastidx = len(self._rows)
12051205

12061206

1207+
def _get_num_threads(proc):
1208+
'''
1209+
'''
1210+
1211+
# Import packages
1212+
import psutil
1213+
1214+
# Init variables
1215+
num_threads = proc.num_threads()
1216+
try:
1217+
for child in proc.children():
1218+
num_threads = max(num_threads, child.num_threads(),
1219+
len(child.children()), _get_num_threads(child))
1220+
except psutil.NoSuchProcess:
1221+
dummy = 1
1222+
1223+
return num_threads
1224+
1225+
12071226
def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12081227
"""Run a command, read stdout and stderr, prefix with timestamp.
12091228
@@ -1213,6 +1232,7 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12131232
# Import packages
12141233
try:
12151234
from memory_profiler import _get_memory
1235+
import psutil
12161236
mem_prof = True
12171237
except:
12181238
mem_prof = False
@@ -1253,7 +1273,7 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12531273
# Init variables for memory profiling
12541274
mem_mb = -1
12551275
num_threads = -1
1256-
interval = 0.1
1276+
interval = 1
12571277

12581278
if output == 'stream':
12591279
streams = [Stream('stdout', proc.stdout), Stream('stderr', proc.stderr)]
@@ -1273,8 +1293,7 @@ def _process(drain=0):
12731293
while proc.returncode is None:
12741294
if mem_prof:
12751295
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1276-
num_threads = max(num_threads, psutil.Process(proc.pid).num_threads())
1277-
time.sleep(interval)
1296+
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
12781297
proc.poll()
12791298
_process()
12801299
_process(drain=1)
@@ -1293,8 +1312,7 @@ def _process(drain=0):
12931312
if mem_prof:
12941313
while proc.returncode is None:
12951314
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1296-
num_threads = max(num_threads, psutil.Process(proc.pid).num_threads())
1297-
time.sleep(interval)
1315+
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
12981316
proc.poll()
12991317
stdout, stderr = proc.communicate()
13001318
if stdout and isinstance(stdout, bytes):
@@ -1315,8 +1333,7 @@ def _process(drain=0):
13151333
if mem_prof:
13161334
while proc.returncode is None:
13171335
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1318-
num_threads = max(num_threads, psutil.Process(proc.pid).num_threads())
1319-
time.sleep(interval)
1336+
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
13201337
proc.poll()
13211338
ret_code = proc.wait()
13221339
stderr.flush()
@@ -1328,8 +1345,7 @@ def _process(drain=0):
13281345
if mem_prof:
13291346
while proc.returncode is None:
13301347
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1331-
num_threads = max(num_threads, psutil.Process(proc.pid).num_threads())
1332-
time.sleep(interval)
1348+
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
13331349
proc.poll()
13341350
proc.communicate()
13351351
result['stdout'] = []

nipype/pipeline/plugins/multiproc.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ class NonDaemonPool(pool.Pool):
7878

7979
import numpy as np
8080
from copy import deepcopy
81-
from ..engine import (MapNode, str2bool)
81+
from ..engine import MapNode
82+
from ...utils.misc import str2bool
8283
import datetime
8384
import psutil
8485
from ... import logging
@@ -130,9 +131,9 @@ def __init__(self, plugin_args=None):
130131

131132
if non_daemon:
132133
# run the execution using the non-daemon pool subclass
133-
self.pool = NonDaemonPool(processes=n_procs)
134+
self.pool = NonDaemonPool(processes=self.processors)
134135
else:
135-
self.pool = Pool(processes=n_procs)
136+
self.pool = Pool(processes=self.processors)
136137

137138
def _wait(self):
138139
if len(self.pending_tasks) > 0:

0 commit comments

Comments
 (0)