Skip to content

Commit 9ad5d24

Browse files
committed
Restructured unittests and num_threads logic
1 parent 0fdc671 commit 9ad5d24

File tree

4 files changed

+45
-77
lines changed

4 files changed

+45
-77
lines changed

nipype/interfaces/base.py

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,8 +1205,9 @@ def _read(self, drain):
12051205

12061206

12071207
# Get number of threads for process
1208-
def _get_num_threads(proc, log_flg=False):
1208+
def _get_num_threads(proc):
12091209
"""Function to get the number of threads a process is using
1210+
NOTE: If
12101211
12111212
Parameters
12121213
----------
@@ -1221,35 +1222,23 @@ def _get_num_threads(proc, log_flg=False):
12211222

12221223
# Import packages
12231224
import psutil
1224-
import logging
12251225

12261226
# Init variables
12271227
num_threads = proc.num_threads()
1228-
if log_flg:
1229-
from CPAC.utils.utils import setup_logger
1230-
logger = setup_logger('memory_profiler', '/home/dclark/memory_profiler.log',
1231-
logging.INFO, to_screen=False)
12321228

1229+
# Iterate through child processes and get number of their threads
12331230
try:
1234-
num_children = len(proc.children())
1235-
if log_flg:
1236-
logger.debug('len(proc.children()): %d' % num_children)
1237-
logger.debug('proc.id: %s' % str(proc.pid))
1238-
for child in proc.children():
1239-
if log_flg:
1240-
logger.debug('child.pid: %d' % child.pid)
1241-
logger.debug('child.threads(): %s' % str(child.threads()))
1242-
logger.debug('child.num_threads(): %d' % child.num_threads())
1243-
logger.debug('len(child.children()): %d' % len(child.children()))
1244-
num_threads = max(num_threads, num_children,
1245-
child.num_threads(), len(child.children()))
1231+
for child in proc.children(recursive=True):
1232+
num_threads += child.num_threads()
12461233
except psutil.NoSuchProcess:
12471234
pass
12481235

1249-
# Return the number of threads found
1236+
# Return number of threads found
12501237
return num_threads
12511238

1252-
def _get_num_ram_mb(pid, pyfunc=False):
1239+
1240+
# Get ram usage of process
1241+
def _get_ram_mb(pid, pyfunc=False):
12531242
"""Function to get the RAM usage of a process and its children
12541243
12551244
Parameters
@@ -1298,8 +1287,9 @@ def _get_num_ram_mb(pid, pyfunc=False):
12981287
# Return memory
12991288
return mem_mb
13001289

1290+
13011291
# Get max resources used for process
1302-
def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False, log_flg=False):
1292+
def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False):
13031293
"""Function to get the RAM and threads usage of a process
13041294
13051295
Paramters
@@ -1320,13 +1310,11 @@ def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False, log_flg=False
13201310
"""
13211311

13221312
# Import packages
1323-
from memory_profiler import _get_memory
13241313
import psutil
13251314

13261315
try:
1327-
#mem_mb = max(mem_mb, _get_memory(pid, include_children=True, log_flg=log_flg))
1328-
mem_mb = max(mem_mb, _get_num_ram_mb(pid, pyfunc=pyfunc))
1329-
num_threads = max(num_threads, _get_num_threads(psutil.Process(pid), log_flg=log_flg))
1316+
mem_mb = max(mem_mb, _get_ram_mb(pid, pyfunc=pyfunc))
1317+
num_threads = max(num_threads, _get_num_threads(psutil.Process(pid)))
13301318
except Exception as exc:
13311319
iflogger.info('Could not get resources used by process. Error: %s'\
13321320
% exc)
@@ -1346,7 +1334,6 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
13461334

13471335
# Default to profiling the runtime
13481336
try:
1349-
import memory_profiler
13501337
import psutil
13511338
runtime_profile = True
13521339
except ImportError as exc:

nipype/interfaces/tests/test_runtime_profiler.py

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
try:
1414
import psutil
15-
import memory_profiler
1615
run_profiler = True
1716
skip_profile_msg = 'Run profiler tests'
1817
except ImportError as exc:
@@ -28,9 +27,9 @@ class UseResourcesInputSpec(CommandLineInputSpec):
2827

2928
# Init attributes
3029
num_gb = traits.Float(desc='Number of GB of RAM to use',
31-
argstr = "-g %f")
30+
argstr='-g %f')
3231
num_procs = traits.Int(desc='Number of processors to use',
33-
argstr = "-p %d")
32+
argstr='-p %d')
3433

3534

3635
# UseResources interface
@@ -78,30 +77,21 @@ def _use_gb_ram(num_gb):
7877
del gb_str
7978

8079
# Import packages
81-
import logging
82-
from multiprocessing import Process
83-
8480
from threading import Thread
8581

8682
# Init variables
8783
num_gb = float(num_gb)
88-
# Init variables
89-
#num_threads = proc.num_threads()
90-
from CPAC.utils.utils import setup_logger
84+
9185
# Build proc list
9286
proc_list = []
9387
for idx in range(num_procs):
94-
#proc = Thread(target=_use_gb_ram, args=(num_gb/num_procs,), name=str(idx))
95-
proc = Process(target=_use_gb_ram, args=(num_gb/num_procs,), name=str(idx))
88+
proc = Thread(target=_use_gb_ram, args=(num_gb/num_procs,), name=str(idx))
9689
proc_list.append(proc)
9790

98-
logger = setup_logger('memory_profiler', '/home/dclark/memory_profiler.log',
99-
logging.DEBUG, to_screen=False)
10091
# Run multi-threaded
101-
print 'Using %.3f GB of memory over %d processors...' % (num_gb, num_procs)
92+
print 'Using %.3f GB of memory over %d sub-threads...' % (num_gb, num_procs)
10293
for idx, proc in enumerate(proc_list):
10394
proc.start()
104-
#logger.debug('Starting PID: %d' % proc.pid)
10595

10696
for proc in proc_list:
10797
proc.join()
@@ -137,9 +127,9 @@ def setUp(self):
137127

138128
# Init parameters
139129
# Input RAM GB to occupy
140-
self.num_gb= 4
141-
# Input number of processors
142-
self.num_procs = 1
130+
self.num_gb = 6
131+
# Input number of sub-threads (not including parent threads)
132+
self.num_threads = 7
143133
# Acceptable percent error for memory profiled against input
144134
self.mem_err_percent = 5
145135

@@ -367,30 +357,33 @@ def test_cmdline_profiling(self):
367357

368358
# Init variables
369359
num_gb = self.num_gb
370-
num_procs = self.num_procs
360+
num_threads = self.num_threads
371361

372362
# Run workflow and get stats
373-
finish_str = self._run_cmdline_workflow(num_gb, num_procs)
363+
finish_str = self._run_cmdline_workflow(num_gb, num_threads)
374364
# Get runtime stats as dictionary
375365
node_stats = json.loads(finish_str)
376366

377367
# Read out runtime stats
378368
runtime_gb = float(node_stats['runtime_memory_gb'])
379-
runtime_procs = int(node_stats['runtime_threads'])
369+
runtime_threads = int(node_stats['runtime_threads'])
380370

381371
# Get margin of error for RAM GB
382372
allowed_gb_err = (self.mem_err_percent/100.0)*num_gb
383373
runtime_gb_err = np.abs(runtime_gb-num_gb)
374+
# Runtime threads should reflect shell-cmd thread, Python parent thread
375+
# and Python sub-threads = 1 + 1 + num_threads
376+
expected_runtime_threads = 1 + 1 + num_threads
384377

385378
# Error message formatting
386379
mem_err = 'Input memory: %f is not within %.1f%% of runtime '\
387380
'memory: %f' % (num_gb, self.mem_err_percent, runtime_gb)
388-
procs_err = 'Input procs: %d is not equal to runtime procs: %d' \
389-
% (num_procs, runtime_procs)
381+
procs_err = 'Input threads: %d is not equal to runtime threads: %d' \
382+
% (expected_runtime_threads, runtime_threads)
390383

391384
# Assert runtime stats are what was input
392385
self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err)
393-
self.assertEqual(num_procs, runtime_procs, msg=procs_err)
386+
self.assertEqual(expected_runtime_threads, runtime_threads, msg=procs_err)
394387

395388
# Test resources were used as expected
396389
@unittest.skipIf(run_profiler == False, skip_profile_msg)
@@ -406,30 +399,33 @@ def test_function_profiling(self):
406399

407400
# Init variables
408401
num_gb = self.num_gb
409-
num_procs = self.num_procs
402+
num_threads = self.num_threads
410403

411404
# Run workflow and get stats
412-
finish_str = self._run_function_workflow(num_gb, num_procs)
405+
finish_str = self._run_function_workflow(num_gb, num_threads)
413406
# Get runtime stats as dictionary
414407
node_stats = json.loads(finish_str)
415408

416409
# Read out runtime stats
417410
runtime_gb = float(node_stats['runtime_memory_gb'])
418-
runtime_procs = int(node_stats['runtime_threads'])
411+
runtime_threads = int(node_stats['runtime_threads'])
419412

420413
# Get margin of error for RAM GB
421414
allowed_gb_err = (self.mem_err_percent/100.0)*num_gb
422415
runtime_gb_err = np.abs(runtime_gb-num_gb)
416+
# Runtime threads should reflect Python parent thread
417+
# and Python sub-threads = 1 + num_threads
418+
expected_runtime_threads = 1 + num_threads
423419

424420
# Error message formatting
425421
mem_err = 'Input memory: %f is not within %.1f%% of runtime '\
426422
'memory: %f' % (num_gb, self.mem_err_percent, runtime_gb)
427423
procs_err = 'Input procs: %d is not equal to runtime procs: %d' \
428-
% (num_procs, runtime_procs)
424+
% (expected_runtime_threads, runtime_threads)
429425

430426
# Assert runtime stats are what was input
431427
self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err)
432-
self.assertEqual(num_procs, runtime_procs, msg=procs_err)
428+
self.assertEqual(expected_runtime_threads, runtime_threads, msg=procs_err)
433429

434430

435431
# Command-line run-able unittest module

nipype/interfaces/tests/use_resources

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ if __name__ == '__main__':
3535
# Import packages
3636
import argparse
3737
from threading import Thread
38-
from multiprocessing import Process
3938

4039
# Init argparser
4140
parser = argparse.ArgumentParser(description=__doc__)
@@ -56,10 +55,10 @@ if __name__ == '__main__':
5655
# Build proc list
5756
proc_list = []
5857
for idx in range(num_procs):
59-
proc_list.append(Process(target=use_gb_ram, args=(num_gb/num_procs,)))
58+
proc_list.append(Thread(target=use_gb_ram, args=(num_gb/num_procs,)))
6059

6160
# Run multi-threaded
62-
print 'Using %.3f GB of memory over %d processors...' % (num_gb, num_procs)
61+
print 'Using %.3f GB of memory over %d sub-threads...' % (num_gb, num_procs)
6362
for proc in proc_list:
6463
proc.start()
6564

nipype/interfaces/utility.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,6 @@ def _function_handle_wrapper(queue, **kwargs):
466466

467467
# Runtime profiler on if dependecies available
468468
try:
469-
import memory_profiler
470469
import psutil
471470
from nipype.interfaces.base import get_max_resources_used
472471
import multiprocessing
@@ -483,37 +482,24 @@ def _function_handle_wrapper(queue, **kwargs):
483482
queue = multiprocessing.Queue()
484483
proc = multiprocessing.Process(target=_function_handle_wrapper,
485484
args=(queue,), kwargs=args)
486-
485+
487486
# Init memory and threads before profiling
488487
mem_mb = -1
489488
num_threads = -1
490-
# if function_handle.__name__ == 'use_resources':
491-
# log_flg = True
492-
# else:
493-
# log_flg = False
494-
log_flg = False
489+
495490
# Start process and profile while it's alive
496491
proc.start()
497492
while proc.is_alive():
498493
mem_mb, num_threads = \
499-
get_max_resources_used(proc.pid, mem_mb, num_threads, pyfunc=True, log_flg=log_flg)
500-
494+
get_max_resources_used(proc.pid, mem_mb, num_threads,
495+
pyfunc=True)
496+
501497
# Get result from process queue
502498
out = queue.get()
503499
# If it is an exception, raise it
504500
if isinstance(out, Exception):
505501
raise out
506502

507-
# proc = (function_handle, (), args)
508-
# num_threads = 1
509-
# print 'function_handle: ', function_handle.__name__
510-
# if function_handle.__name__ == 'use_resources':
511-
# log_flg = True
512-
# else:
513-
# log_flg = False
514-
# mem_mb, out = \
515-
# memory_profiler.memory_usage(proc, include_children=True, max_usage=True, retval=True, log_flg=log_flg)
516-
# mem_mb = mem_mb[0]
517503
# Function ran successfully, populate runtime stats
518504
setattr(runtime, 'runtime_memory_gb', mem_mb/1024.0)
519505
setattr(runtime, 'runtime_threads', num_threads)

0 commit comments

Comments
 (0)