Skip to content

Commit 36e1446

Browse files
committed
Added cmd-level threads and memory profiling
1 parent 70ca457 commit 36e1446

File tree

4 files changed

+44
-37
lines changed

4 files changed

+44
-37
lines changed

nipype/interfaces/base.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -751,17 +751,8 @@ def __init__(self, **inputs):
751751
self.__class__.__name__)
752752
self.inputs = self.input_spec(**inputs)
753753
self.estimated_memory = 1
754-
self._real_memory = 0
755754
self.num_threads = 1
756755

757-
@property
758-
def real_memory(self):
759-
return self._real_memory
760-
761-
@real_memory.setter
762-
def real_memory(self, value):
763-
self._real_memory = value
764-
765756
@classmethod
766757
def help(cls, returnhelp=False):
767758
""" Prints class help
@@ -1269,7 +1260,8 @@ def _process(drain=0):
12691260
stream.read(drain)
12701261
while proc.returncode is None:
12711262
if mem_prof:
1272-
mem_mb = max([mem_mb, _get_memory(proc.pid, include_children=True)])
1263+
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1264+
num_threads = max(num_threads, psutil.Process(proc.pid).num_threads())
12731265
time.sleep(interval)
12741266
proc.poll()
12751267
_process()
@@ -1288,8 +1280,8 @@ def _process(drain=0):
12881280
if output == 'allatonce':
12891281
if mem_prof:
12901282
while proc.returncode is None:
1291-
mem_mb = max([mem_mb, _get_memory(proc.pid, include_children=True)])
1292-
num_threads = max([num_threads, psutil.Proc(proc.pid).num_threads()])
1283+
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1284+
num_threads = max(num_threads, psutil.Process(proc.pid).num_threads())
12931285
time.sleep(interval)
12941286
proc.poll()
12951287
stdout, stderr = proc.communicate()
@@ -1299,8 +1291,8 @@ def _process(drain=0):
12991291
if output == 'file':
13001292
if mem_prof:
13011293
while proc.returncode is None:
1302-
mem_mb = max([mem_mb, _get_memory(proc.pid, include_children=True)])
1303-
num_threads = max([num_threads, psutil.Proc(proc.pid).num_threads()])
1294+
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1295+
num_threads = max(num_threads, psutil.Process(proc.pid).num_threads())
13041296
time.sleep(interval)
13051297
proc.poll()
13061298
ret_code = proc.wait()
@@ -1312,8 +1304,8 @@ def _process(drain=0):
13121304
if output == 'none':
13131305
if mem_prof:
13141306
while proc.returncode is None:
1315-
mem_mb = max([mem_mb, _get_memory(proc.pid, include_children=True)])
1316-
num_threads = max([num_threads, psutil.Proc(proc.pid).num_threads()])
1307+
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1308+
num_threads = max(num_threads, psutil.Process(proc.pid).num_threads())
13171309
time.sleep(interval)
13181310
proc.poll()
13191311
proc.communicate()
@@ -1322,7 +1314,7 @@ def _process(drain=0):
13221314
result['merged'] = ''
13231315

13241316
setattr(runtime, 'cmd_memory', mem_mb/1024.0)
1325-
setattr(runtime, 'num_threads', num_threads)
1317+
setattr(runtime, 'cmd_threads', num_threads)
13261318
runtime.stderr = '\n'.join(result['stderr'])
13271319
runtime.stdout = '\n'.join(result['stdout'])
13281320
runtime.merged = result['merged']

nipype/pipeline/plugins/base.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,8 +419,6 @@ def _task_finished_cb(self, jobid, result=None):
419419
if result == None:
420420
if self._taskresult.has_key(jobid):
421421
result = self._taskresult[jobid].get()
422-
else:
423-
result = {'real_memory' : 'nokey'}
424422
self._status_callback(self.procs[jobid], 'end', result)
425423
# Update job and worker queues
426424
self.proc_pending[jobid] = False

nipype/pipeline/plugins/callback_log.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
def log_nodes_cb(node, status, result=None):
55
logger = logging.getLogger('callback')
66
try:
7-
real_mem1 = result['real_memory']
8-
real_mem2 = result['real_memory2']
7+
node_mem = result['node_memory']
8+
cmd_mem = result['cmd_memory']
99
run_seconds = result['run_seconds']
10+
cmd_threads = result['cmd_threads']
1011
except Exception as exc:
11-
real_mem1 = real_mem2 = run_seconds = 'N/A'
12+
node_mem = cmd_mem = run_seconds = cmd_threads = 'N/A'
1213
if status == 'start':
1314
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' +\
1415
node._id + '"' + ',"start":' + '"' +str(datetime.datetime.now()) +\
@@ -19,16 +20,20 @@ def log_nodes_cb(node, status, result=None):
1920

2021
elif status == 'end':
2122
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
22-
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
23-
'"' + ',"memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
24-
+ str(node._interface.num_threads) + ',"real_memory1":' + str(real_mem1) + ',"real_memory2":' + str(real_mem2) + ',"run_seconds":' + str(run_seconds) + '}'
23+
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) + \
24+
'"' + ',"estimate memory":' + str(node._interface.estimated_memory) + \
25+
',"num_threads":' + str(node._interface.num_threads) + \
26+
',"cmd-level threads":' + str(cmd_threads) + \
27+
',"node-level memory":' + str(node_mem) + \
28+
',"cmd-level memory":' + str(cmd_mem) + \
29+
',"run_seconds":' + str(run_seconds) + '}'
2530

2631
logger.debug(message)
2732

2833
else:
2934
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
3035
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
31-
'"' + ',"memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
36+
'"' + ',"estimate memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
3237
+ str(node._interface.num_threads) + ',"error":"True"}'
3338

3439
logger.debug(message)

nipype/pipeline/plugins/multiproc.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,37 @@
1313
from .base import (DistributedPluginBase, report_crash)
1414

1515

16+
# Run node
1617
def run_node(node, updatehash, plugin_args=None):
17-
result = dict(result=None, traceback=None)
18+
"""docstring
19+
"""
20+
21+
# Import packages
1822
try:
19-
run_memory = plugin_args['memory_profile']
23+
runtime_profile = plugin_args['runtime_profile']
2024
import memory_profiler
25+
import datetime
2126
except KeyError:
22-
run_memory = False
27+
runtime_profile = False
2328
except ImportError:
24-
run_memory = False
25-
if run_memory:
26-
import datetime
29+
runtime_profile = False
30+
31+
# Init variables
32+
result = dict(result=None, traceback=None)
33+
34+
# If we're profiling the run
35+
if runtime_profile:
36+
# Init function tuple
2737
proc = (node.run, (), {'updatehash' : updatehash})
2838
start = datetime.datetime.now()
2939
mem_mb, retval = memory_profiler.memory_usage(proc=proc, retval=True, include_children=True, max_usage=True)
3040
runtime = (datetime.datetime.now() - start).total_seconds()
3141
result['result'] = retval
3242
result['node_memory'] = mem_mb[0]/1024.0
3343
result['cmd_memory'] = retval.runtime.get('cmd_memory')
44+
result['cmd_threads'] = retval.runtime.get('cmd_threads')
3445
result['run_seconds'] = runtime
46+
# Otherwise, execute node.run as normal
3547
else:
3648
try:
3749
result['result'] = node.run(updatehash=updatehash)
@@ -141,15 +153,15 @@ class ResourceMultiProcPlugin(MultiProcPlugin):
141153
the number of threads and memory of the system is used.
142154
143155
System consuming nodes should be tagged:
144-
memory_consuming_node.interface.memory = 8 #Gb
156+
memory_consuming_node.interface.estimated_memory = 8 #Gb
145157
thread_consuming_node.interface.num_threads = 16
146158
147159
The default number of threads and memory for a node is 1.
148160
149161
Currently supported options are:
150162
151-
- num_thread: maximum number of threads to be executed in parallel
152-
- memory: maximum memory that can be used at once.
163+
- num_threads: maximum number of threads to be executed in parallel
164+
- estimated_memory: maximum memory that can be used at once.
153165
154166
"""
155167

@@ -198,7 +210,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
198210
for jobid in jobids:
199211
busy_memory+= self.procs[jobid]._interface.estimated_memory
200212
busy_processors+= self.procs[jobid]._interface.num_threads
201-
213+
202214
free_memory = self.memory - busy_memory
203215
free_processors = self.processors - busy_processors
204216

@@ -222,7 +234,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
222234
if self.procs[jobid]._interface.estimated_memory <= free_memory and self.procs[jobid]._interface.num_threads <= free_processors:
223235
logger.info('Executing: %s ID: %d' %(self.procs[jobid]._id, jobid))
224236
executing_now.append(self.procs[jobid])
225-
237+
226238
if isinstance(self.procs[jobid], MapNode):
227239
try:
228240
num_subnodes = self.procs[jobid].num_subnodes()

0 commit comments

Comments
 (0)