Skip to content

Commit 716f923

Browse files
committed
Fixed logging of real memory
1 parent 1e66b86 commit 716f923

File tree

5 files changed

+65
-18
lines changed

5 files changed

+65
-18
lines changed

nipype/interfaces/base.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,9 +1206,18 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12061206
12071207
The returned runtime contains a merged stdout+stderr log with timestamps
12081208
"""
1209-
PIPE = subprocess.PIPE
12101209

1210+
# Import packages
1211+
try:
1212+
from memory_profiler import _get_memory
1213+
mem_prof = True
1214+
except:
1215+
mem_prof = False
1216+
1217+
# Init variables
1218+
PIPE = subprocess.PIPE
12111219
cmdline = runtime.cmdline
1220+
12121221
if redirect_x:
12131222
exist_xvfb, _ = _exists_in_path('xvfb-run', runtime.environ)
12141223
if not exist_xvfb:
@@ -1237,6 +1246,11 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12371246
result = {}
12381247
errfile = os.path.join(runtime.cwd, 'stderr.nipype')
12391248
outfile = os.path.join(runtime.cwd, 'stdout.nipype')
1249+
1250+
# Init variables for memory profiling
1251+
ret = -1
1252+
interval = 0.1
1253+
12401254
if output == 'stream':
12411255
streams = [Stream('stdout', proc.stdout), Stream('stderr', proc.stderr)]
12421256

@@ -1252,8 +1266,10 @@ def _process(drain=0):
12521266
else:
12531267
for stream in res[0]:
12541268
stream.read(drain)
1255-
12561269
while proc.returncode is None:
1270+
if mem_prof:
1271+
ret = max([ret, _get_memory(proc.pid, include_children=True)])
1272+
time.sleep(interval)
12571273
proc.poll()
12581274
_process()
12591275
_process(drain=1)
@@ -1267,23 +1283,41 @@ def _process(drain=0):
12671283
result[stream._name] = [r[2] for r in rows]
12681284
temp.sort()
12691285
result['merged'] = [r[1] for r in temp]
1286+
12701287
if output == 'allatonce':
1288+
if mem_prof:
1289+
while proc.returncode is None:
1290+
ret = max([ret, _get_memory(proc.pid, include_children=True)])
1291+
time.sleep(interval)
1292+
proc.poll()
12711293
stdout, stderr = proc.communicate()
12721294
result['stdout'] = stdout.split('\n')
12731295
result['stderr'] = stderr.split('\n')
12741296
result['merged'] = ''
12751297
if output == 'file':
1298+
if mem_prof:
1299+
while proc.returncode is None:
1300+
ret = max([ret, _get_memory(proc.pid, include_children=True)])
1301+
time.sleep(interval)
1302+
proc.poll()
12761303
ret_code = proc.wait()
12771304
stderr.flush()
12781305
stdout.flush()
12791306
result['stdout'] = [line.strip() for line in open(outfile).readlines()]
12801307
result['stderr'] = [line.strip() for line in open(errfile).readlines()]
12811308
result['merged'] = ''
12821309
if output == 'none':
1310+
if mem_prof:
1311+
while proc.returncode is None:
1312+
ret = max([ret, _get_memory(proc.pid, include_children=True)])
1313+
time.sleep(interval)
1314+
proc.poll()
12831315
proc.communicate()
12841316
result['stdout'] = []
12851317
result['stderr'] = []
12861318
result['merged'] = ''
1319+
1320+
setattr(runtime, 'real_memory2', ret/1024.0)
12871321
runtime.stderr = '\n'.join(result['stderr'])
12881322
runtime.stdout = '\n'.join(result['stdout'])
12891323
runtime.merged = result['merged']

nipype/interfaces/utility.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,12 @@ def _run_interface(self, runtime):
442442
if isdefined(value):
443443
args[name] = value
444444

445-
out = function_handle(**args)
445+
# mem stuff
446+
import memory_profiler
447+
proc = (function_handle, (), args)
448+
mem_mb, out = memory_profiler.memory_usage(proc=proc, retval=True, include_children=True, max_usage=True)
449+
setattr(runtime, 'real_memory2', mem_mb[0]/1024.0)
450+
#out = function_handle(**args)
446451

447452
if len(self._output_names) == 1:
448453
self._out[self._output_names[0]] = out

nipype/pipeline/plugins/base.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,8 @@ def run(self, graph, config, updatehash=False):
241241
notrun.append(self._clean_queue(jobid, graph,
242242
result=result))
243243
else:
244-
self._task_finished_cb(jobid)
244+
print "DJC: Calling task finished for %s cb from DistributedPluginBase.run"%(str(taskid))
245+
self._task_finished_cb(jobid, result)
245246
self._remove_node_dirs()
246247
self._clear_task(taskid)
247248
else:
@@ -379,6 +380,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
379380
)
380381
):
381382
continue_with_submission = False
383+
print "DJC: Calling task finised cb from DistributedPluginBase._send_procs_to_workers hash==true"
382384
self._task_finished_cb(jobid)
383385
self._remove_node_dirs()
384386
except Exception:
@@ -395,6 +397,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
395397
self.procs[jobid].run()
396398
except Exception:
397399
self._clean_queue(jobid, graph)
400+
print "DJC: Calling task finised cb from DistributedPluginBase._send_procs_to_workers continue_with_submission==true"
398401
self._task_finished_cb(jobid)
399402
self._remove_node_dirs()
400403
else:
@@ -408,18 +411,23 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
408411
else:
409412
break
410413

411-
def _task_finished_cb(self, jobid):
414+
def _task_finished_cb(self, jobid, result=None):
412415
""" Extract outputs and assign to inputs of dependent tasks
413416
414417
This is called when a job is completed.
415418
"""
416419
logger.info('[Job finished] jobname: %s jobid: %d' %
417420
(self.procs[jobid]._id, jobid))
418421
if self._status_callback:
419-
print '!!!!!!!!!!!!!!!!!!!'
420-
print self._taskresult
421-
print self._taskresult.keys()
422-
self._status_callback(self.procs[jobid], 'end', self._taskresult[self.taskresultid])
422+
if result == None:
423+
if self._taskresult.has_key(jobid):
424+
result = self._taskresult[jobid].get()
425+
print 'MMMM'
426+
print result['real_memory'], result['real_memory2']
427+
else:
428+
print "DJC: %s not found, taskresult keys are: %s"%(str(jobid),":".join([str(k) for k in self._taskresult.keys()]))
429+
result = {'real_memory' : 'nokey'}
430+
self._status_callback(self.procs[jobid], 'end', result)
423431
# Update job and worker queues
424432
self.proc_pending[jobid] = False
425433
# update the job dependency structure

nipype/pipeline/plugins/callback_log.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22
import logging
33

44
def log_nodes_cb(node, status, result=None):
5-
print 'status', status
65
logger = logging.getLogger('callback')
6+
try:
7+
real_mem1 = result['real_memory']
8+
real_mem2 = result['result'].runtime.get('real_memory2')
9+
except Exception as exc:
10+
real_mem1 = real_mem2 = 'N/A'
711
if status == 'start':
812
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' +\
913
node._id + '"' + ',"start":' + '"' +str(datetime.datetime.now()) +\
@@ -16,7 +20,7 @@ def log_nodes_cb(node, status, result=None):
1620
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
1721
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
1822
'"' + ',"memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
19-
+ str(node._interface.num_threads) + ',"real memory":' + str(result['real_memory']) + '}'
23+
+ str(node._interface.num_threads) + ',"real memory1":' + str(real_mem1) + ',"real memory2":' + str(real_mem2) + '}'
2024

2125
logger.debug(message)
2226

nipype/pipeline/plugins/multiproc.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,10 @@ def run_node(node, updatehash, plugin_args=None):
2222
if run_memory:
2323
import memory_profiler
2424
proc = (node.run, (), {'updatehash' : updatehash})
25-
mem_mb, retval = memory_profiler.memory_usage(proc, max_usage=True, retval=True)
25+
mem_mb, retval = memory_profiler.memory_usage(proc=proc, retval=True, include_children=True, max_usage=True)
2626
result['result'] = retval
27-
result['real_memory'] = 100
28-
print 'Just populated task result!!!!!!!!!!!!!!!!!!!'
29-
print result
30-
#node._interface.real_memory = mem_mb[0]/1024.0
27+
result['real_memory'] = mem_mb[0]/1024.0
28+
result['real_memory2'] = retval.runtime.get('real_memory2')
3129
else:
3230
try:
3331
result['result'] = node.run(updatehash=updatehash)
@@ -177,8 +175,6 @@ def _submit_job(self, node, updatehash=False):
177175
self._taskresult[self._taskid] = self.pool.apply_async(run_node,
178176
(node, updatehash, self.plugin_args),
179177
callback=release_lock)
180-
print 'Printing on output!!!!!!!!!!'
181-
print self._taskresult, self._taskid
182178
return self._taskid
183179

184180
def _send_procs_to_workers(self, updatehash=False, graph=None):

0 commit comments

Comments
 (0)