Skip to content

Commit f74fe25

Browse files
committed
Added real memory recording to plugn
1 parent 350fd4a commit f74fe25

File tree

2 files changed

+19
-8
lines changed

2 files changed

+19
-8
lines changed

nipype/pipeline/plugins/callback_log.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ def log_nodes_cb(node, status):
77
if status == 'start':
88
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' +\
99
node._id + '"' + ',"start":' + '"' +str(datetime.datetime.now()) +\
10-
'"' + ',"memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
11-
+ str(node._interface.num_threads) + '}'
10+
'"' + ',"estimate memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
11+
+ str(node._interface.num_threads) + '}'
1212

1313
logger.debug(message)
1414

1515
elif status == 'end':
1616
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
1717
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
1818
'"' + ',"memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
19-
+ str(node._interface.num_threads) + '}'
19+
+ str(node._interface.num_threads) + ',"real memory":' str(node._interface.real_memory) + '}'
2020

2121
logger.debug(message)
2222

@@ -26,4 +26,4 @@ def log_nodes_cb(node, status):
2626
'"' + ',"memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
2727
+ str(node._interface.num_threads) + ',"error":"True"}'
2828

29-
logger.debug(message)
29+
logger.debug(message)

nipype/pipeline/plugins/multiproc.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,18 @@
1313
from .base import (DistributedPluginBase, report_crash)
1414

1515

16-
def run_node(node, updatehash):
16+
def run_node(node, updatehash, plugin_args=None):
1717
result = dict(result=None, traceback=None)
18+
try:
19+
run_memory = plugin_args['memory_profile']
20+
except Exception:
21+
run_memory = False
22+
if run_memory:
23+
import memory_profiler
24+
proc = (node.run(), (), {'updatehash' : updatehash})
25+
mem_mb, retval = memory_profiler.memory_usage(proc, max_usage=True, retval=True)
26+
result['result'] = retval
27+
node._interface.real_memory = mem_mb[0]/1024.0
1828
try:
1929
result['result'] = node.run(updatehash=updatehash)
2030
except:
@@ -160,8 +170,9 @@ def _submit_job(self, node, updatehash=False):
160170
node.inputs.terminal_output = 'allatonce'
161171
except:
162172
pass
163-
self._taskresult[self._taskid] = self.pool.apply_async(run_node, (node,
164-
updatehash,), callback=release_lock)
173+
self._taskresult[self._taskid] = self.pool.apply_async(run_node,
174+
(node, updatehash, self.plugin_args),
175+
callback=release_lock)
165176
return self._taskid
166177

167178
def _send_procs_to_workers(self, updatehash=False, graph=None):
@@ -263,4 +274,4 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
263274
else:
264275
break
265276

266-
logger.debug('No jobs waiting to execute')
277+
logger.debug('No jobs waiting to execute')

0 commit comments

Comments
 (0)