Skip to content

Commit b7e9309

Browse files
committed
Removed the memory profiler code to just pull in s3 datasink code
1 parent 2af5c1d commit b7e9309

File tree

4 files changed

+10
-52
lines changed

4 files changed

+10
-52
lines changed

nipype/interfaces/base.py

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -751,17 +751,8 @@ def __init__(self, **inputs):
751751
self.__class__.__name__)
752752
self.inputs = self.input_spec(**inputs)
753753
self.estimated_memory = 1
754-
self._real_memory = 0
755754
self.num_threads = 1
756755

757-
@property
758-
def real_memory(self):
759-
return self._real_memory
760-
761-
@real_memory.setter
762-
def real_memory(self, value):
763-
self._real_memory = value
764-
765756
@classmethod
766757
def help(cls, returnhelp=False):
767758
""" Prints class help
@@ -1240,9 +1231,6 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12401231
errfile = os.path.join(runtime.cwd, 'stderr.nipype')
12411232
outfile = os.path.join(runtime.cwd, 'stdout.nipype')
12421233

1243-
# Init variables for memory profiling
1244-
ret = -1
1245-
interval = 0.1
12461234

12471235
if output == 'stream':
12481236
streams = [Stream('stdout', proc.stdout), Stream('stderr', proc.stderr)]
@@ -1260,9 +1248,6 @@ def _process(drain=0):
12601248
for stream in res[0]:
12611249
stream.read(drain)
12621250
while proc.returncode is None:
1263-
if mem_prof:
1264-
ret = max([ret, _get_memory(proc.pid, include_children=True)])
1265-
time.sleep(interval)
12661251
proc.poll()
12671252
_process()
12681253
_process(drain=1)
@@ -1278,39 +1263,23 @@ def _process(drain=0):
12781263
result['merged'] = [r[1] for r in temp]
12791264

12801265
if output == 'allatonce':
1281-
if mem_prof:
1282-
while proc.returncode is None:
1283-
ret = max([ret, _get_memory(proc.pid, include_children=True)])
1284-
time.sleep(interval)
1285-
proc.poll()
12861266
stdout, stderr = proc.communicate()
12871267
result['stdout'] = stdout.split('\n')
12881268
result['stderr'] = stderr.split('\n')
12891269
result['merged'] = ''
12901270
if output == 'file':
1291-
if mem_prof:
1292-
while proc.returncode is None:
1293-
ret = max([ret, _get_memory(proc.pid, include_children=True)])
1294-
time.sleep(interval)
1295-
proc.poll()
12961271
ret_code = proc.wait()
12971272
stderr.flush()
12981273
stdout.flush()
12991274
result['stdout'] = [line.strip() for line in open(outfile).readlines()]
13001275
result['stderr'] = [line.strip() for line in open(errfile).readlines()]
13011276
result['merged'] = ''
13021277
if output == 'none':
1303-
if mem_prof:
1304-
while proc.returncode is None:
1305-
ret = max([ret, _get_memory(proc.pid, include_children=True)])
1306-
time.sleep(interval)
1307-
proc.poll()
13081278
proc.communicate()
13091279
result['stdout'] = []
13101280
result['stderr'] = []
13111281
result['merged'] = ''
13121282

1313-
setattr(runtime, 'real_memory2', ret/1024.0)
13141283
runtime.stderr = '\n'.join(result['stderr'])
13151284
runtime.stdout = '\n'.join(result['stdout'])
13161285
runtime.merged = result['merged']

nipype/pipeline/plugins/base.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ def run(self, graph, config, updatehash=False):
241241
notrun.append(self._clean_queue(jobid, graph,
242242
result=result))
243243
else:
244-
self._task_finished_cb(jobid, result)
244+
self._task_finished_cb(jobid)
245245
self._remove_node_dirs()
246246
self._clear_task(taskid)
247247
else:
@@ -408,20 +408,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
408408
else:
409409
break
410410

411-
def _task_finished_cb(self, jobid, result=None):
411+
def _task_finished_cb(self, jobid):
412412
""" Extract outputs and assign to inputs of dependent tasks
413413
414414
This is called when a job is completed.
415415
"""
416416
logger.info('[Job finished] jobname: %s jobid: %d' %
417417
(self.procs[jobid]._id, jobid))
418418
if self._status_callback:
419-
if result == None:
420-
if self._taskresult.has_key(jobid):
421-
result = self._taskresult[jobid].get()
422-
else:
423-
result = {'real_memory' : 'nokey'}
424-
self._status_callback(self.procs[jobid], 'end', result)
419+
self._status_callback(self.procs[jobid], 'end')
425420
# Update job and worker queues
426421
self.proc_pending[jobid] = False
427422
# update the job dependency structure
Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
11
import datetime
22
import logging
33

4-
def log_nodes_cb(node, status, result=None):
4+
def log_nodes_cb(node, status):
55
logger = logging.getLogger('callback')
6-
try:
7-
real_mem1 = result['real_memory']
8-
real_mem2 = result['real_memory2']
9-
run_seconds = result['run_seconds']
10-
except Exception as exc:
11-
real_mem1 = real_mem2 = run_seconds = 'N/A'
126
if status == 'start':
137
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' +\
148
node._id + '"' + ',"start":' + '"' +str(datetime.datetime.now()) +\
@@ -20,15 +14,15 @@ def log_nodes_cb(node, status, result=None):
2014
elif status == 'end':
2115
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
2216
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
23-
'"' + ',"memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
24-
+ str(node._interface.num_threads) + ',"real_memory1":' + str(real_mem1) + ',"real_memory2":' + str(real_mem2) + ',"run_seconds":' + str(run_seconds) + '}'
17+
'"' + ',"estimate memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
18+
+ str(node._interface.num_threads) + '}'
2519

2620
logger.debug(message)
2721

2822
else:
2923
message = '{"name":' + '"' + node.name + '"' + ',"id":' + '"' + \
3024
node._id + '"' + ',"finish":' + '"' + str(datetime.datetime.now()) +\
31-
'"' + ',"memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
25+
'"' + ',"estimate memory":' + str(node._interface.estimated_memory) + ',"num_threads":' \
3226
+ str(node._interface.num_threads) + ',"error":"True"}'
3327

3428
logger.debug(message)

nipype/pipeline/plugins/multiproc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ def _submit_job(self, node, updatehash=False):
8383
node.inputs.terminal_output = 'allatonce'
8484
except:
8585
pass
86-
self._taskresult[self._taskid] = self.pool.apply_async(run_node, (node,
87-
updatehash,))
86+
self._taskresult[self._taskid] = self.pool.apply_async(run_node,
87+
(node, updatehash,))
8888
return self._taskid
8989

9090
def _report_crash(self, node, result=None):
@@ -161,7 +161,7 @@ def _submit_job(self, node, updatehash=False):
161161
except:
162162
pass
163163
self._taskresult[self._taskid] = self.pool.apply_async(run_node,
164-
(node, updatehash, self.plugin_args),
164+
(node, updatehash,),
165165
callback=release_lock)
166166
return self._taskid
167167

0 commit comments

Comments
 (0)