Skip to content

Commit 0fdc671

Browse files
committed
Wrote my own get memory function - seems to work much better
1 parent a515c77 commit 0fdc671

File tree

3 files changed

+114
-68
lines changed

3 files changed

+114
-68
lines changed

nipype/interfaces/base.py

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,9 +1249,57 @@ def _get_num_threads(proc, log_flg=False):
12491249
# Return the number of threads found
12501250
return num_threads
12511251

1252+
def _get_num_ram_mb(pid, pyfunc=False):
1253+
"""Function to get the RAM usage of a process and its children
1254+
1255+
Parameters
1256+
----------
1257+
pid : integer
1258+
the PID of the process to get RAM usage of
1259+
pyfunc : boolean (optional); default=False
1260+
a flag to indicate if the process is a python function;
1261+
when Pythons are multithreaded via multiprocess or threading,
1262+
children functions include their own memory + parents. if this
1263+
is set, the parent memory will removed from children memories
1264+
1265+
Reference: http://ftp.dev411.com/t/python/python-list/095thexx8g/multiprocessing-forking-memory-usage
1266+
1267+
Returns
1268+
-------
1269+
mem_mb : float
1270+
the memory RAM in MB utilized by the process PID
1271+
"""
1272+
1273+
# Import packages
1274+
import psutil
1275+
1276+
# Init variables
1277+
_MB = 1024.0**2
1278+
1279+
# Try block to protect against any dying processes in the interim
1280+
try:
1281+
# Init parent
1282+
parent = psutil.Process(pid)
1283+
# Get memory of parent
1284+
parent_mem = parent.memory_info().rss
1285+
mem_mb = parent_mem/_MB
1286+
1287+
# Iterate through child processes
1288+
for child in parent.children(recursive=True):
1289+
child_mem = child.memory_info().rss
1290+
if pyfunc:
1291+
child_mem -= parent_mem
1292+
mem_mb += child_mem/_MB
1293+
1294+
# Catch if process dies, return gracefully
1295+
except psutil.NoSuchProcess:
1296+
pass
1297+
1298+
# Return memory
1299+
return mem_mb
12521300

12531301
# Get max resources used for process
1254-
def get_max_resources_used(pid, mem_mb, num_threads, log_flg=False):
1302+
def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False, log_flg=False):
12551303
"""Function to get the RAM and threads usage of a process
12561304
12571305
Paramters
@@ -1276,7 +1324,8 @@ def get_max_resources_used(pid, mem_mb, num_threads, log_flg=False):
12761324
import psutil
12771325

12781326
try:
1279-
mem_mb = max(mem_mb, _get_memory(pid, include_children=True, log_flg=log_flg))
1327+
#mem_mb = max(mem_mb, _get_memory(pid, include_children=True, log_flg=log_flg))
1328+
mem_mb = max(mem_mb, _get_num_ram_mb(pid, pyfunc=pyfunc))
12801329
num_threads = max(num_threads, _get_num_threads(psutil.Process(pid), log_flg=log_flg))
12811330
except Exception as exc:
12821331
iflogger.info('Could not get resources used by process. Error: %s'\

nipype/interfaces/tests/test_runtime_profiler.py

Lines changed: 62 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def _use_gb_ram(num_gb):
101101
print 'Using %.3f GB of memory over %d processors...' % (num_gb, num_procs)
102102
for idx, proc in enumerate(proc_list):
103103
proc.start()
104-
logger.debug('Starting PID: %d' % proc.pid)
104+
#logger.debug('Starting PID: %d' % proc.pid)
105105

106106
for proc in proc_list:
107107
proc.join()
@@ -137,12 +137,72 @@ def setUp(self):
137137

138138
# Init parameters
139139
# Input RAM GB to occupy
140-
self.num_gb= .75
140+
self.num_gb= 4
141141
# Input number of processors
142142
self.num_procs = 1
143143
# Acceptable percent error for memory profiled against input
144144
self.mem_err_percent = 5
145145

146+
# ! Only used for benchmarking the profiler over a range of
147+
# ! processors and RAM usage
148+
# ! Requires a LOT of RAM and PROCS to be tested
149+
def _collect_range_runtime_stats(self):
150+
'''
151+
Function to collect a range of runtime stats
152+
'''
153+
154+
# Import packages
155+
import json
156+
import numpy as np
157+
import pandas as pd
158+
159+
# Init variables
160+
num_procs_range = 8
161+
ram_gb_range = 10.0
162+
ram_gb_step = 0.25
163+
dict_list = []
164+
165+
# Iterate through all combos
166+
for num_procs in np.arange(1, num_procs_range+1, 1):
167+
for num_gb in np.arange(0.25, ram_gb_range+ram_gb_step, ram_gb_step):
168+
# Cmd-level
169+
cmd_fin_str = self._run_cmdline_workflow(num_gb, num_procs)
170+
cmd_node_stats = json.loads(cmd_fin_str)
171+
cmd_runtime_procs = int(cmd_node_stats['runtime_threads'])
172+
cmd_runtime_gb = float(cmd_node_stats['runtime_memory_gb'])
173+
174+
# Func-level
175+
func_fin_str = self._run_function_workflow(num_gb, num_procs)
176+
func_node_stats = json.loads(func_fin_str)
177+
func_runtime_procs = int(func_node_stats['runtime_threads'])
178+
func_runtime_gb = float(func_node_stats['runtime_memory_gb'])
179+
180+
# Calc errors
181+
cmd_procs_err = cmd_runtime_procs - num_procs
182+
cmd_gb_err = cmd_runtime_gb - num_gb
183+
func_procs_err = func_runtime_procs - num_procs
184+
func_gb_err = func_runtime_gb - num_gb
185+
186+
# Node dictionary
187+
results_dict = {'input_procs' : num_procs,
188+
'input_gb' : num_gb,
189+
'cmd_runtime_procs' : cmd_runtime_procs,
190+
'cmd_runtime_gb' : cmd_runtime_gb,
191+
'func_runtime_procs' : func_runtime_procs,
192+
'func_runtime_gb' : func_runtime_gb,
193+
'cmd_procs_err' : cmd_procs_err,
194+
'cmd_gb_err' : cmd_gb_err,
195+
'func_procs_err' : func_procs_err,
196+
'func_gb_err' : func_gb_err}
197+
# Append to list
198+
dict_list.append(results_dict)
199+
200+
# Create dataframe
201+
runtime_results_df = pd.DataFrame(dict_list)
202+
203+
# Return dataframe
204+
return runtime_results_df
205+
146206
# Test node
147207
def _run_cmdline_workflow(self, num_gb, num_procs):
148208
'''
@@ -371,69 +431,6 @@ def test_function_profiling(self):
371431
self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err)
372432
self.assertEqual(num_procs, runtime_procs, msg=procs_err)
373433

374-
# Collect stats for range of num_threads and memory amount
375-
def _collect_range_runtime_stats(self):
376-
'''
377-
Function to collect a range of runtime stats
378-
'''
379-
380-
# Import packages
381-
import json
382-
import numpy as np
383-
import pandas as pd
384-
385-
# Init variables
386-
num_procs_range = 8
387-
ram_gb_range = 10.0
388-
ram_gb_step = 0.25
389-
dict_list = []
390-
391-
# Iterate through all combos
392-
for num_procs in np.arange(1, num_procs_range+1, 1):
393-
for num_gb in np.arange(0.25, ram_gb_range+ram_gb_step, ram_gb_step):
394-
# Cmd-level
395-
cmd_fin_str = self._run_cmdline_workflow(num_gb, num_procs)
396-
cmd_node_stats = json.loads(cmd_fin_str)
397-
cmd_runtime_procs = int(cmd_node_stats['runtime_threads'])
398-
cmd_runtime_gb = float(cmd_node_stats['runtime_memory_gb'])
399-
400-
# Func-level
401-
func_fin_str = self._run_function_workflow(num_gb, num_procs)
402-
func_node_stats = json.loads(func_fin_str)
403-
func_runtime_procs = int(func_node_stats['runtime_threads'])
404-
func_runtime_gb = float(func_node_stats['runtime_memory_gb'])
405-
406-
# Calc errors
407-
cmd_procs_err = cmd_runtime_procs - num_procs
408-
cmd_gb_err = cmd_runtime_gb - num_gb
409-
func_procs_err = func_runtime_procs - num_procs
410-
func_gb_err = func_runtime_gb - num_gb
411-
412-
# Node dictionary
413-
results_dict = {'input_procs' : num_procs,
414-
'input_gb' : num_gb,
415-
'cmd_runtime_procs' : cmd_runtime_procs,
416-
'cmd_runtime_gb' : cmd_runtime_gb,
417-
'func_runtime_procs' : func_runtime_procs,
418-
'func_runtime_gb' : func_runtime_gb,
419-
'cmd_procs_err' : cmd_procs_err,
420-
'cmd_gb_err' : cmd_gb_err,
421-
'func_procs_err' : func_procs_err,
422-
'func_gb_err' : func_gb_err}
423-
# Append to list
424-
dict_list.append(results_dict)
425-
426-
# Create dataframe
427-
runtime_results_df = pd.DataFrame(dict_list)
428-
429-
# Return dataframe
430-
return runtime_results_df
431-
432-
def test_write_df_to_csv(self):
433-
df = self._collect_range_runtime_stats()
434-
df.to_csv('/home/dclark/runtime_results.csv')
435-
#self.assertEqual(1, 1)
436-
437434

438435
# Command-line run-able unittest module
439436
if __name__ == '__main__':

nipype/interfaces/utility.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ def _function_handle_wrapper(queue, **kwargs):
496496
proc.start()
497497
while proc.is_alive():
498498
mem_mb, num_threads = \
499-
get_max_resources_used(proc.pid, mem_mb, num_threads, log_flg=log_flg)
499+
get_max_resources_used(proc.pid, mem_mb, num_threads, pyfunc=True, log_flg=log_flg)
500500

501501
# Get result from process queue
502502
out = queue.get()

0 commit comments

Comments
 (0)