Skip to content

Commit 1e66b86

Browse files
committed
Added initial code for getting used memory of node
1 parent f74fe25 commit 1e66b86

File tree

4 files changed

+29
-13
lines changed

4 files changed

+29
-13
lines changed

nipype/interfaces/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,9 +751,17 @@ def __init__(self, **inputs):
751751
self.__class__.__name__)
752752
self.inputs = self.input_spec(**inputs)
753753
self.estimated_memory = 1
754-
self.real_memory = 0
754+
self._real_memory = 0
755755
self.num_threads = 1
756756

757+
@property
758+
def real_memory(self):
759+
return self._real_memory
760+
761+
@real_memory.setter
762+
def real_memory(self, value):
763+
self._real_memory = value
764+
757765
@classmethod
758766
def help(cls, returnhelp=False):
759767
""" Prints class help

nipype/pipeline/plugins/base.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,10 @@ def _task_finished_cb(self, jobid):
416416
logger.info('[Job finished] jobname: %s jobid: %d' %
417417
(self.procs[jobid]._id, jobid))
418418
if self._status_callback:
419-
self._status_callback(self.procs[jobid], 'end')
419+
print '!!!!!!!!!!!!!!!!!!!'
420+
print self._taskresult
421+
print self._taskresult.keys()
422+
self._status_callback(self.procs[jobid], 'end', self._taskresult[self.taskresultid])
420423
# Update job and worker queues
421424
self.proc_pending[jobid] = False
422425
# update the job dependency structure

nipype/pipeline/plugins/callback_log.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import datetime
22
import logging
33

4-
def log_nodes_cb(node, status):
4+
def log_nodes_cb(node, status, result=None):
55
print 'status', status
66
logger = logging.getLogger('callback')
77
if status == 'start':
@@ -16,7 +16,7 @@ def log_nodes_cb(node, status):
1616
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
1717
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
1818
'"' + ',"memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
19-
+ str(node._interface.num_threads) + ',"real memory":' str(node._interface.real_memory) + '}'
19+
+ str(node._interface.num_threads) + ',"real memory":' + str(result['real_memory']) + '}'
2020

2121
logger.debug(message)
2222

nipype/pipeline/plugins/multiproc.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,20 @@ def run_node(node, updatehash, plugin_args=None):
2121
run_memory = False
2222
if run_memory:
2323
import memory_profiler
24-
proc = (node.run(), (), {'updatehash' : updatehash})
24+
proc = (node.run, (), {'updatehash' : updatehash})
2525
mem_mb, retval = memory_profiler.memory_usage(proc, max_usage=True, retval=True)
2626
result['result'] = retval
27-
node._interface.real_memory = mem_mb[0]/1024.0
28-
try:
29-
result['result'] = node.run(updatehash=updatehash)
30-
except:
31-
etype, eval, etr = sys.exc_info()
32-
result['traceback'] = format_exception(etype,eval,etr)
33-
result['result'] = node.result
27+
result['real_memory'] = 100
28+
print 'Just populated task result!!!!!!!!!!!!!!!!!!!'
29+
print result
30+
#node._interface.real_memory = mem_mb[0]/1024.0
31+
else:
32+
try:
33+
result['result'] = node.run(updatehash=updatehash)
34+
except:
35+
etype, eval, etr = sys.exc_info()
36+
result['traceback'] = format_exception(etype,eval,etr)
37+
result['result'] = node.result
3438
return result
3539

3640

@@ -173,6 +177,8 @@ def _submit_job(self, node, updatehash=False):
173177
self._taskresult[self._taskid] = self.pool.apply_async(run_node,
174178
(node, updatehash, self.plugin_args),
175179
callback=release_lock)
180+
print 'Printing on output!!!!!!!!!!'
181+
print self._taskresult, self._taskid
176182
return self._taskid
177183

178184
def _send_procs_to_workers(self, updatehash=False, graph=None):
@@ -237,7 +243,6 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
237243
# Send job to task manager and add to pending tasks
238244
if self._status_callback:
239245
self._status_callback(self.procs[jobid], 'start')
240-
241246
if str2bool(self.procs[jobid].config['execution']['local_hash_check']):
242247
logger.debug('checking hash locally')
243248
try:

0 commit comments

Comments
 (0)