Skip to content

Commit 97e7333

Browse files
committed
Manual merge of s3_datasink and resource_multiproc branch for cpac run
1 parent c0d148a commit 97e7333

File tree

11 files changed

+490
-52
lines changed

11 files changed

+490
-52
lines changed

nipype/interfaces/base.py

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,8 @@ def __init__(self, **inputs):
764764
raise Exception('No input_spec in class: %s' %
765765
self.__class__.__name__)
766766
self.inputs = self.input_spec(**inputs)
767+
self.estimated_memory = 1
768+
self.num_threads = 1
767769

768770
@classmethod
769771
def help(cls, returnhelp=False):
@@ -1202,14 +1204,43 @@ def _read(self, drain):
12021204
self._lastidx = len(self._rows)
12031205

12041206

1207+
def _get_num_threads(proc):
1208+
'''
1209+
'''
1210+
1211+
# Import packages
1212+
import psutil
1213+
1214+
# Init variables
1215+
num_threads = proc.num_threads()
1216+
try:
1217+
for child in proc.children():
1218+
num_threads = max(num_threads, child.num_threads(),
1219+
len(child.children()), _get_num_threads(child))
1220+
except psutil.NoSuchProcess:
1221+
dummy = 1
1222+
1223+
return num_threads
1224+
1225+
12051226
def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12061227
"""Run a command, read stdout and stderr, prefix with timestamp.
12071228
12081229
The returned runtime contains a merged stdout+stderr log with timestamps
12091230
"""
1210-
PIPE = subprocess.PIPE
12111231

1232+
# Import packages
1233+
try:
1234+
from memory_profiler import _get_memory
1235+
import psutil
1236+
mem_proc = True
1237+
except:
1238+
mem_prof = False
1239+
1240+
# Init variables
1241+
PIPE = subprocess.PIPE
12121242
cmdline = runtime.cmdline
1243+
12131244
if redirect_x:
12141245
exist_xvfb, _ = _exists_in_path('xvfb-run', runtime.environ)
12151246
if not exist_xvfb:
@@ -1238,6 +1269,12 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12381269
result = {}
12391270
errfile = os.path.join(runtime.cwd, 'stderr.nipype')
12401271
outfile = os.path.join(runtime.cwd, 'stdout.nipype')
1272+
1273+
# Init variables for memory profiling
1274+
mem_mb = -1
1275+
num_threads = -1
1276+
interval = 1
1277+
12411278
if output == 'stream':
12421279
streams = [Stream('stdout', proc.stdout), Stream('stderr', proc.stderr)]
12431280

@@ -1253,8 +1290,10 @@ def _process(drain=0):
12531290
else:
12541291
for stream in res[0]:
12551292
stream.read(drain)
1256-
12571293
while proc.returncode is None:
1294+
if mem_prof:
1295+
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1296+
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
12581297
proc.poll()
12591298
_process()
12601299
_process(drain=1)
@@ -1268,27 +1307,53 @@ def _process(drain=0):
12681307
result[stream._name] = [r[2] for r in rows]
12691308
temp.sort()
12701309
result['merged'] = [r[1] for r in temp]
1310+
12711311
if output == 'allatonce':
1312+
if mem_prof:
1313+
while proc.returncode is None:
1314+
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1315+
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
1316+
proc.poll()
12721317
stdout, stderr = proc.communicate()
12731318
if stdout and isinstance(stdout, bytes):
1274-
stdout = stdout.decode()
1319+
try:
1320+
stdout = stdout.decode()
1321+
except UnicodeDecodeError:
1322+
stdout = stdout.decode("ISO-8859-1")
12751323
if stderr and isinstance(stderr, bytes):
1276-
stderr = stderr.decode()
1324+
try:
1325+
stderr = stderr.decode()
1326+
except UnicodeDecodeError:
1327+
stdout = stdout.decode("ISO-8859-1")
1328+
12771329
result['stdout'] = str(stdout).split('\n')
12781330
result['stderr'] = str(stderr).split('\n')
12791331
result['merged'] = ''
12801332
if output == 'file':
1333+
if mem_prof:
1334+
while proc.returncode is None:
1335+
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1336+
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
1337+
proc.poll()
12811338
ret_code = proc.wait()
12821339
stderr.flush()
12831340
stdout.flush()
12841341
result['stdout'] = [line.strip() for line in open(outfile).readlines()]
12851342
result['stderr'] = [line.strip() for line in open(errfile).readlines()]
12861343
result['merged'] = ''
12871344
if output == 'none':
1345+
if mem_prof:
1346+
while proc.returncode is None:
1347+
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1348+
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
1349+
proc.poll()
12881350
proc.communicate()
12891351
result['stdout'] = []
12901352
result['stderr'] = []
12911353
result['merged'] = ''
1354+
1355+
setattr(runtime, 'cmd_memory', mem_mb/1024.0)
1356+
setattr(runtime, 'cmd_threads', num_threads)
12921357
runtime.stderr = '\n'.join(result['stderr'])
12931358
runtime.stdout = '\n'.join(result['stdout'])
12941359
runtime.merged = result['merged']

nipype/interfaces/utility.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,15 @@ def _run_interface(self, runtime):
449449
if isdefined(value):
450450
args[name] = value
451451

452-
out = function_handle(**args)
452+
# Record memory of function_handle
453+
try:
454+
import memory_profiler
455+
proc = (function_handle, (), args)
456+
mem_mb, out = memory_profiler.memory_usage(proc=proc, retval=True, include_children=True, max_usage=True)
457+
setattr(runtime, 'cmd_memory', mem_mb[0]/1024.0)
458+
# If no memory_profiler package, run without recording memory
459+
except:
460+
out = function_handle(**args)
453461

454462
if len(self._output_names) == 1:
455463
self._out[self._output_names[0]] = out

nipype/pipeline/engine/tests/test_engine.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ def func1(in1):
723723
# test running the workflow on default conditions
724724
error_raised = False
725725
try:
726-
w1.run(plugin='MultiProc')
726+
w1.run(plugin='ResourceMultiProc')
727727
except Exception as e:
728728
from nipype.pipeline.engine.base import logger
729729
logger.info('Exception: %s' % str(e))
@@ -737,7 +737,7 @@ def func1(in1):
737737
# test running the workflow on serial conditions
738738
error_raised = False
739739
try:
740-
w1.run(plugin='MultiProc')
740+
w1.run(plugin='ResourceMultiProc')
741741
except Exception as e:
742742
from nipype.pipeline.engine.base import logger
743743
logger.info('Exception: %s' % str(e))

nipype/pipeline/engine/tests/test_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def test_function3(arg):
214214

215215
out_dir = mkdtemp()
216216

217-
for plugin in ('Linear',): # , 'MultiProc'):
217+
for plugin in ('Linear',): # , 'ResourceMultiProc'):
218218
n1 = pe.Node(niu.Function(input_names=['arg1'],
219219
output_names=['out_file1', 'out_file2', 'dir'],
220220
function=test_function),

nipype/pipeline/plugins/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
from .sge import SGEPlugin
1010
from .condor import CondorPlugin
1111
from .dagman import CondorDAGManPlugin
12-
from .multiproc import MultiProcPlugin
12+
from .multiproc import ResourceMultiProcPlugin
1313
from .ipython import IPythonPlugin
1414
from .somaflow import SomaFlowPlugin
1515
from .pbsgraph import PBSGraphPlugin
1616
from .sgegraph import SGEGraphPlugin
1717
from .lsf import LSFPlugin
1818
from .slurm import SLURMPlugin
1919
from .slurmgraph import SLURMGraphPlugin
20+
21+
from .callback_log import log_nodes_cb

nipype/pipeline/plugins/base.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import numpy as np
2121
import scipy.sparse as ssp
2222

23-
2423
from ...utils.filemanip import savepkl, loadpkl
2524
from ...utils.misc import str2bool
2625
from ..engine.utils import (nx, dfs_preorder, topological_sort)
@@ -246,7 +245,7 @@ def run(self, graph, config, updatehash=False):
246245
notrun.append(self._clean_queue(jobid, graph,
247246
result=result))
248247
else:
249-
self._task_finished_cb(jobid)
248+
self._task_finished_cb(jobid, result)
250249
self._remove_node_dirs()
251250
self._clear_task(taskid)
252251
else:
@@ -265,10 +264,15 @@ def run(self, graph, config, updatehash=False):
265264
graph=graph)
266265
else:
267266
logger.debug('Not submitting')
268-
sleep(float(self._config['execution']['poll_sleep_duration']))
267+
self._wait()
269268
self._remove_node_dirs()
270269
report_nodes_not_run(notrun)
271270

271+
272+
273+
def _wait(self):
274+
sleep(float(self._config['execution']['poll_sleep_duration']))
275+
272276
def _get_result(self, taskid):
273277
raise NotImplementedError
274278

@@ -410,15 +414,18 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
410414
else:
411415
break
412416

413-
def _task_finished_cb(self, jobid):
417+
def _task_finished_cb(self, jobid, result=None):
414418
""" Extract outputs and assign to inputs of dependent tasks
415419
416420
This is called when a job is completed.
417421
"""
418422
logger.info('[Job finished] jobname: %s jobid: %d' %
419423
(self.procs[jobid]._id, jobid))
420424
if self._status_callback:
421-
self._status_callback(self.procs[jobid], 'end')
425+
if result == None:
426+
if self._taskresult.has_key(jobid):
427+
result = self._taskresult[jobid].get()
428+
self._status_callback(self.procs[jobid], 'end', result)
422429
# Update job and worker queues
423430
self.proc_pending[jobid] = False
424431
# update the job dependency structure

0 commit comments

Comments
 (0)