Skip to content

Commit 0388305

Browse files
committed
Added checks for python deps and added method using builtin std library functions to get system memory
1 parent 9d19e14 commit 0388305

File tree

3 files changed

+58
-19
lines changed

3 files changed

+58
-19
lines changed

nipype/interfaces/tests/test_runtime_profiler.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@
1010
import unittest
1111
from nipype.interfaces.base import traits, CommandLine, CommandLineInputSpec
1212

13+
try:
14+
import psutil
15+
import memory_profiler
16+
run_profiler = True
17+
skip_profile_msg = 'Run profiler tests'
18+
except ImportError as exc:
19+
skip_profile_msg = 'Missing python packages for runtime profiling, skipping...\n'\
20+
'Error: %s' % exc
21+
run_profiler = False
1322

1423
# UseResources inputspec
1524
class UseResourcesInputSpec(CommandLineInputSpec):
@@ -151,6 +160,7 @@ def _run_workflow(self):
151160
return finish_str
152161

153162
# Test resources were used as expected
163+
@unittest.skipIf(run_profiler == False, skip_profile_msg)
154164
def test_wf_logfile(self):
155165
'''
156166
Test runtime profiler correctly records workflow RAM/CPUs consumption

nipype/pipeline/plugins/multiproc.py

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99
# Import packages
1010
from multiprocessing import Process, Pool, cpu_count, pool
1111
from traceback import format_exception
12+
import os
1213
import sys
1314

1415
import numpy as np
1516
from copy import deepcopy
1617
from ..engine import MapNode
1718
from ...utils.misc import str2bool
18-
import psutil
1919
from ... import logging
2020
import semaphore_singleton
2121
from .base import (DistributedPluginBase, report_crash)
@@ -78,6 +78,34 @@ def release_lock(args):
7878
semaphore_singleton.semaphore.release()
7979

8080

81+
# Get total system RAM
82+
def get_system_total_memory_gb():
83+
"""Function to get the total RAM of the running system in GB
84+
"""
85+
86+
# Import packages
87+
import os
88+
import sys
89+
90+
# Get memory
91+
if 'linux' in sys.platform:
92+
with open('/proc/meminfo', 'r') as f_in:
93+
meminfo_lines = f_in.readlines()
94+
mem_total_line = [line for line in meminfo_lines \
95+
if 'MemTotal' in line][0]
96+
mem_total = float(mem_total_line.split()[1])
97+
memory_gb = mem_total/(1024.0**2)
98+
elif 'darwin' in sys.platform:
99+
mem_str = os.popen('sysctl hw.memsize').read().strip().split(' ')[-1]
100+
memory_gb = float(mem_str)/(1024.0**3)
101+
else:
102+
err_msg = 'System platform: %s is not supported'
103+
raise Exception(err_msg)
104+
105+
# Return memory
106+
return memory_gb
107+
108+
81109
class MultiProcPlugin(DistributedPluginBase):
82110
"""Execute workflow with multiprocessing, not sending more jobs at once
83111
than the system can support.
@@ -102,22 +130,24 @@ class MultiProcPlugin(DistributedPluginBase):
102130
"""
103131

104132
def __init__(self, plugin_args=None):
133+
# Init variables and instance attributes
105134
super(MultiProcPlugin, self).__init__(plugin_args=plugin_args)
106135
self._taskresult = {}
107136
self._taskid = 0
108137
non_daemon = True
109138
self.plugin_args = plugin_args
110139
self.processors = cpu_count()
111-
memory = psutil.virtual_memory()
112-
self.memory = float(memory.total) / (1024.0**3)
140+
self.memory_gb = get_system_total_memory_gb()
141+
142+
# Check plugin args
113143
if self.plugin_args:
114144
if 'non_daemon' in self.plugin_args:
115145
non_daemon = plugin_args['non_daemon']
116146
if 'n_procs' in self.plugin_args:
117147
self.processors = self.plugin_args['n_procs']
118148
if 'memory' in self.plugin_args:
119149
self.memory = self.plugin_args['memory']
120-
150+
# Instantiate different thread pools for non-daemon processes
121151
if non_daemon:
122152
# run the execution using the non-daemon pool subclass
123153
self.pool = NonDaemonPool(processes=self.processors)
@@ -172,40 +202,39 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
172202
jobids = np.flatnonzero((self.proc_pending == True) & \
173203
(self.depidx.sum(axis=0) == 0).__array__())
174204

175-
#check available system resources by summing all threads and memory used
176-
busy_memory = 0
205+
# Check available system resources by summing all threads and memory used
206+
busy_memory_gb = 0
177207
busy_processors = 0
178208
for jobid in jobids:
179-
busy_memory += self.procs[jobid]._interface.estimated_memory_gb
209+
busy_memory_gb += self.procs[jobid]._interface.estimated_memory_gb
180210
busy_processors += self.procs[jobid]._interface.num_threads
181211

182-
free_memory = self.memory - busy_memory
212+
free_memory_gb = self.memory_gb - busy_memory_gb
183213
free_processors = self.processors - busy_processors
184214

185215

186-
#check all jobs without dependency not run
216+
# Check all jobs without dependency not run
187217
jobids = np.flatnonzero((self.proc_done == False) & \
188218
(self.depidx.sum(axis=0) == 0).__array__())
189219

190220

191-
#sort jobs ready to run first by memory and then by number of threads
192-
#The most resource consuming jobs run first
221+
# Sort jobs ready to run first by memory and then by number of threads
222+
# The most resource consuming jobs run first
193223
jobids = sorted(jobids,
194224
key=lambda item: (self.procs[item]._interface.estimated_memory_gb,
195225
self.procs[item]._interface.num_threads))
196226

197227
logger.debug('Free memory (GB): %d, Free processors: %d',
198-
free_memory, free_processors)
199-
228+
free_memory_gb, free_processors)
200229

201-
#while have enough memory and processors for first job
202-
#submit first job on the list
230+
# While have enough memory and processors for first job
231+
# Submit first job on the list
203232
for jobid in jobids:
204233
logger.debug('Next Job: %d, memory (GB): %d, threads: %d' \
205234
% (jobid, self.procs[jobid]._interface.estimated_memory_gb,
206235
self.procs[jobid]._interface.num_threads))
207236

208-
if self.procs[jobid]._interface.estimated_memory_gb <= free_memory and \
237+
if self.procs[jobid]._interface.estimated_memory_gb <= free_memory_gb and \
209238
self.procs[jobid]._interface.num_threads <= free_processors:
210239
logger.info('Executing: %s ID: %d' %(self.procs[jobid]._id, jobid))
211240
executing_now.append(self.procs[jobid])
@@ -226,7 +255,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
226255
self.proc_done[jobid] = True
227256
self.proc_pending[jobid] = True
228257

229-
free_memory -= self.procs[jobid]._interface.estimated_memory_gb
258+
free_memory_gb -= self.procs[jobid]._interface.estimated_memory_gb
230259
free_processors -= self.procs[jobid]._interface.num_threads
231260

232261
# Send job to task manager and add to pending tasks

nipype/pipeline/plugins/tests/test_multiproc.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
from tempfile import mkdtemp
44
from shutil import rmtree
55
from multiprocessing import cpu_count
6-
import psutil
76

87
import nipype.interfaces.base as nib
98
from nipype.utils import draw_gantt_chart
109
from nipype.testing import assert_equal
1110
import nipype.pipeline.engine as pe
1211
from nipype.pipeline.plugins.callback_log import log_nodes_cb
12+
from nipype.pipeline.plugins.multiproc import get_system_total_memory_gb
1313

1414
class InputSpec(nib.TraitedSpec):
1515
input1 = nib.traits.Int(desc='a random int')
@@ -222,7 +222,7 @@ def test_do_not_use_more_threads_then_specified():
222222

223223
yield assert_equal, result, True, "using more threads than specified"
224224

225-
max_memory = psutil.virtual_memory().total / (1024*1024)
225+
max_memory = get_system_total_memory_gb()
226226
result = True
227227
for m in memory:
228228
if m > max_memory:

0 commit comments

Comments
 (0)