Skip to content

Commit 35781be

Browse files
committed
Merge branch 'master' into enh/versioning
* master: fix: deprecation warning does not reset trait to Undefined - will set new value add support for template files API fix for somaworkflow PEP8 Further refactoring Reafactoring - less redundant code between SGE and PBS. Respect custom execution parameters in graph based models. DOC: Added docstring for MpiCommandLine with example usage. ENH: Provide base interface for MPI capable command line software.
2 parents 5016d3e + c9d9197 commit 35781be

File tree

7 files changed

+148
-71
lines changed

7 files changed

+148
-71
lines changed

nipype/interfaces/base.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ def __init__(self, interface, runtime, inputs=None, outputs=None):
268268
def version(self):
269269
return self._version
270270

271+
271272
class BaseTraitedSpec(traits.HasTraits):
272273
"""Provide a few methods necessary to support nipype interface api
273274
@@ -374,7 +375,6 @@ def _deprecated_warn(self, obj, name, old, new):
374375
self.__class__.__name__.split('InputSpec')[0])
375376
msg2 = ('Will be removed or raise an error as of release %s') % \
376377
trait_spec.deprecated
377-
self.trait_set(trait_change_notify=False, **{'%s' % name: Undefined})
378378
if trait_spec.new_name:
379379
if trait_spec.new_name not in self.copyable_trait_names():
380380
raise TraitError(msg1 + ' Replacement trait %s not found' %
@@ -387,7 +387,12 @@ def _deprecated_warn(self, obj, name, old, new):
387387
raise TraitError(msg)
388388
else:
389389
warn(msg)
390-
390+
if trait_spec.new_name:
391+
warn('Unsetting %s and setting %s.' % (name,
392+
trait_spec.new_name))
393+
self.trait_set(trait_change_notify=False,
394+
**{'%s' % name: Undefined,
395+
'%s' % trait_spec.new_name: new})
391396

392397
def _hash_infile(self, adict, key):
393398
""" Inject file hashes into adict[key]"""
@@ -1328,6 +1333,46 @@ def _gen_filename(self, name):
13281333
def _gen_outfilename(self):
13291334
raise NotImplementedError
13301335

1336+
class MpiCommandLineInputSpec(CommandLineInputSpec):
1337+
use_mpi = traits.Bool(False,
1338+
desc="Whether or not to run the command with mpiexec",
1339+
usedefault=True)
1340+
n_procs = traits.Int(desc="Num processors to specify to mpiexec. Do not "
1341+
"specify if this is managed externally (e.g. through "
1342+
"SGE)")
1343+
1344+
1345+
class MpiCommandLine(CommandLine):
1346+
'''Implements functionality to interact with command line programs
1347+
that can be run with MPI (i.e. using 'mpiexec').
1348+
1349+
Examples
1350+
--------
1351+
>>> from nipype.interfaces.base import MpiCommandLine
1352+
>>> mpi_cli = MpiCommandLine(command='my_mpi_prog')
1353+
>>> mpi_cli.inputs.args = '-v'
1354+
>>> mpi_cli.cmdline
1355+
'my_mpi_prog -v'
1356+
1357+
>>> mpi_cli.inputs.use_mpi = True
1358+
>>> mpi_cli.inputs.n_procs = 8
1359+
>>> mpi_cli.cmdline
1360+
1361+
'mpiexec -n 8 my_mpi_prog -v'
1362+
'''
1363+
input_spec = MpiCommandLineInputSpec
1364+
1365+
@property
1366+
def cmdline(self):
1367+
"""Adds 'mpiexec' to begining of command"""
1368+
result = []
1369+
if self.inputs.use_mpi:
1370+
result.append('mpiexec')
1371+
if self.inputs.n_procs:
1372+
result.append('-n %d' % self.inputs.n_procs)
1373+
result.append(super(MpiCommandLine, self).cmdline)
1374+
return ' '.join(result)
1375+
13311376
class SEMLikeCommandLine(CommandLine):
13321377
"""By default in SEM derived interface all outputs have corresponding inputs.
13331378
However, some SEM commands create outputs that are not defined in the XML.

nipype/interfaces/tests/test_base.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class DeprecationSpec1(nib.TraitedSpec):
139139
set_foo = lambda : setattr(spec_instance, 'foo', 1)
140140
yield assert_raises, nib.TraitError, set_foo
141141
class DeprecationSpec1numeric(nib.TraitedSpec):
142-
foo = nib.traits.Int(deprecated=0.1)
142+
foo = nib.traits.Int(deprecated='0.1')
143143
spec_instance = DeprecationSpec1numeric()
144144
set_foo = lambda : setattr(spec_instance, 'foo', 1)
145145
yield assert_raises, nib.TraitError, set_foo
@@ -158,6 +158,18 @@ class DeprecationSpec3(nib.TraitedSpec):
158158
except nib.TraitError:
159159
not_raised = False
160160
yield assert_true, not_raised
161+
class DeprecationSpec3(nib.TraitedSpec):
162+
foo = nib.traits.Int(deprecated='1000', new_name='bar')
163+
bar = nib.traits.Int()
164+
spec_instance = DeprecationSpec3()
165+
not_raised = True
166+
try:
167+
spec_instance.foo = 1
168+
except nib.TraitError:
169+
not_raised = False
170+
yield assert_true, not_raised
171+
yield assert_equal, spec_instance.foo, Undefined
172+
yield assert_equal, spec_instance.bar, 1
161173

162174
def checknose():
163175
"""check version of nose for known incompatability"""

nipype/pipeline/plugins/base.py

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def run(self, graph, config, updatehash=False):
218218
# setup polling - TODO: change to threaded model
219219
notrun = []
220220
while np.any(self.proc_done == False) | \
221-
np.any(self.proc_pending == True):
221+
np.any(self.proc_pending == True):
222222
toappend = []
223223
# trigger callbacks for any pending results
224224
while self.pending_tasks:
@@ -297,11 +297,12 @@ def _submit_mapnode(self, jobid):
297297
self.procs.extend(mapnodesubids)
298298
self.depidx = ssp.vstack((self.depidx,
299299
ssp.lil_matrix(np.zeros((numnodes,
300-
self.depidx.shape[1])))),
300+
self.depidx.shape[1])))),
301301
'lil')
302302
self.depidx = ssp.hstack((self.depidx,
303-
ssp.lil_matrix(np.zeros((self.depidx.shape[0],
304-
numnodes)))),
303+
ssp.lil_matrix(
304+
np.zeros((self.depidx.shape[0],
305+
numnodes)))),
305306
'lil')
306307
self.depidx[-numnodes:, jobid] = 1
307308
self.proc_done = np.concatenate((self.proc_done,
@@ -315,7 +316,7 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
315316
"""
316317
while np.any(self.proc_done == False):
317318
# Check to see if a job is available
318-
jobids = np.flatnonzero((self.proc_done == False) & \
319+
jobids = np.flatnonzero((self.proc_done == False) &
319320
(self.depidx.sum(axis=0) == 0).__array__())
320321
if len(jobids) > 0:
321322
# send all available jobs
@@ -336,20 +337,21 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
336337
self.proc_done[jobid] = True
337338
self.proc_pending[jobid] = True
338339
# Send job to task manager and add to pending tasks
339-
logger.info('Executing: %s ID: %d' % \
340-
(self.procs[jobid]._id, jobid))
340+
logger.info('Executing: %s ID: %d' %
341+
(self.procs[jobid]._id, jobid))
341342
if self._status_callback:
342343
self._status_callback(self.procs[jobid], 'start')
343344
continue_with_submission = True
344345
if str2bool(self.procs[jobid].config['execution']['local_hash_check']):
345346
logger.debug('checking hash locally')
346347
try:
347-
hash_exists, _, _, _ = self.procs[jobid].hash_exists()
348+
hash_exists, _, _, _ = self.procs[
349+
jobid].hash_exists()
348350
logger.debug('Hash exists %s' % str(hash_exists))
349351
if (hash_exists and
350-
(self.procs[jobid].overwrite == False or
351-
(self.procs[jobid].overwrite == None and
352-
not self.procs[jobid]._interface.always_run))):
352+
(self.procs[jobid].overwrite == False or
353+
(self.procs[jobid].overwrite == None and
354+
not self.procs[jobid]._interface.always_run))):
353355
continue_with_submission = False
354356
self._task_finished_cb(jobid)
355357
self._remove_node_dirs()
@@ -385,7 +387,7 @@ def _task_finished_cb(self, jobid):
385387
386388
This is called when a job is completed.
387389
"""
388-
logger.info('[Job finished] jobname: %s jobid: %d' % \
390+
logger.info('[Job finished] jobname: %s jobid: %d' %
389391
(self.procs[jobid]._id, jobid))
390392
if self._status_callback:
391393
self._status_callback(self.procs[jobid], 'end')
@@ -431,7 +433,7 @@ def _remove_node_dirs(self):
431433
self.refidx[idx, idx] = -1
432434
outdir = self.procs[idx]._output_directory()
433435
logger.info(('[node dependencies finished] '
434-
'removing node: %s from directory %s') % \
436+
'removing node: %s from directory %s') %
435437
(self.procs[idx]._id, outdir))
436438
shutil.rmtree(outdir)
437439

@@ -562,9 +564,26 @@ def run(self, graph, config, updatehash=False):
562564
store_exception=False))
563565
dependencies[idx] = [nodes.index(prevnode) for prevnode in
564566
graph.predecessors(node)]
565-
self._submit_graph(pyfiles, dependencies)
567+
self._submit_graph(pyfiles, dependencies, nodes)
568+
569+
def _get_args(self, node, keywords):
570+
values = ()
571+
for keyword in keywords:
572+
value = getattr(self, "_" + keyword)
573+
if keyword == "template" and os.path.isfile(value):
574+
value = open(value).read()
575+
if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args:
576+
if keyword == "template" and os.path.isfile(node.plugin_args[keyword]):
577+
tmp_value = open(node.plugin_args[keyword]).read()
578+
if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']:
579+
value = tmp_value
580+
else:
581+
value += tmp_value
582+
else:
583+
values += (value, )
584+
return values
566585

567-
def _submit_graph(self, pyfiles, dependencies):
586+
def _submit_graph(self, pyfiles, dependencies, nodes):
568587
"""
569588
pyfiles: list of files corresponding to a topological sort
570589
dependencies: dictionary of dependencies based on the toplogical sort

nipype/pipeline/plugins/dagman.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(self, **kwargs):
4949
self._dagman_args = plugin_args['dagman_args']
5050
super(CondorDAGManPlugin, self).__init__(**kwargs)
5151

52-
def _submit_graph(self, pyfiles, dependencies):
52+
def _submit_graph(self, pyfiles, dependencies, nodes):
5353
# location of all scripts, place dagman output in here too
5454
batch_dir, _ = os.path.split(pyfiles[0])
5555
# DAG description filename
@@ -58,27 +58,30 @@ def _submit_graph(self, pyfiles, dependencies):
5858
# loop over all scripts, create submit files, and define them
5959
# as jobs in the DAG
6060
for idx, pyscript in enumerate(pyfiles):
61+
node = nodes[idx]
62+
template, submit_specs = self._get_args(
63+
node, ["template", "submit_specs"])
6164
# XXX redundant with previous value? or could it change between
6265
# scripts?
6366
batch_dir, name = os.path.split(pyscript)
6467
name = '.'.join(name.split('.')[:-1])
6568
submitspec = '\n'.join(
66-
(self._template,
67-
'executable = %s' % sys.executable,
68-
'arguments = %s' % pyscript,
69-
'output = %s' % os.path.join(batch_dir,
70-
'%s.out' % name),
71-
'error = %s' % os.path.join(batch_dir,
72-
'%s.err' % name),
73-
'log = %s' % os.path.join(batch_dir,
74-
'%s.log' % name),
75-
'getenv = True',
76-
self._submit_specs,
77-
'queue'
78-
))
69+
(template,
70+
'executable = %s' % sys.executable,
71+
'arguments = %s' % pyscript,
72+
'output = %s' % os.path.join(batch_dir,
73+
'%s.out' % name),
74+
'error = %s' % os.path.join(batch_dir,
75+
'%s.err' % name),
76+
'log = %s' % os.path.join(batch_dir,
77+
'%s.log' % name),
78+
'getenv = True',
79+
submit_specs,
80+
'queue'
81+
))
7982
# write submit spec for this job
8083
submitfile = os.path.join(batch_dir,
81-
'%s.submit' % name)
84+
'%s.submit' % name)
8285
with open(submitfile, 'wt') as submitfileprt:
8386
submitfileprt.writelines(submitspec)
8487
submitfileprt.close()
@@ -98,4 +101,3 @@ def _submit_graph(self, pyfiles, dependencies):
98101
self._dagman_args)
99102
cmd.run()
100103
logger.info('submitted all jobs to Condor DAGMan')
101-

nipype/pipeline/plugins/pbsgraph.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
from .base import (GraphPluginBase, logger)
88

99
from ...interfaces.base import CommandLine
10+
from .sgegraph import SGEGraphPlugin
1011

1112

12-
class PBSGraphPlugin(GraphPluginBase):
13+
class PBSGraphPlugin(SGEGraphPlugin):
1314
"""Execute using PBS/Torque
1415
1516
The plugin_args input to run can be used to control the SGE execution.
@@ -20,31 +21,23 @@ class PBSGraphPlugin(GraphPluginBase):
2021
qsub call
2122
2223
"""
23-
24-
def __init__(self, **kwargs):
25-
self._template = """
24+
_template = """
2625
#PBS -V
27-
"""
28-
self._qsub_args = None
29-
if 'plugin_args' in kwargs:
30-
plugin_args = kwargs['plugin_args']
31-
if 'template' in plugin_args:
32-
self._template = plugin_args['template']
33-
if os.path.isfile(self._template):
34-
self._template = open(self._template).read()
35-
if 'qsub_args' in plugin_args:
36-
self._qsub_args = plugin_args['qsub_args']
37-
super(PBSGraphPlugin, self).__init__(**kwargs)
26+
"""
3827

39-
def _submit_graph(self, pyfiles, dependencies):
28+
def _submit_graph(self, pyfiles, dependencies, nodes):
4029
batch_dir, _ = os.path.split(pyfiles[0])
4130
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')
4231
with open(submitjobsfile, 'wt') as fp:
4332
fp.writelines('#!/usr/bin/env sh\n')
4433
for idx, pyscript in enumerate(pyfiles):
34+
node = nodes[idx]
35+
template, qsub_args = self._get_args(
36+
node, ["template", "qsub_args"])
37+
4538
batch_dir, name = os.path.split(pyscript)
4639
name = '.'.join(name.split('.')[:-1])
47-
batchscript = '\n'.join((self._template,
40+
batchscript = '\n'.join((template,
4841
'%s %s' % (sys.executable, pyscript)))
4942
batchscriptfile = os.path.join(batch_dir,
5043
'batchscript_%s.sh' % name)
@@ -53,14 +46,14 @@ def _submit_graph(self, pyfiles, dependencies):
5346
batchfp.close()
5447
deps = ''
5548
if idx in dependencies:
56-
values = ['$job%05d' % jobid for jobid in dependencies[idx]]
49+
values = ['$job%05d' %
50+
jobid for jobid in dependencies[idx]]
5751
if len(values):
5852
deps = '-W depend=afterok:%s' % ':'.join(values)
5953
fp.writelines('job%05d=`qsub %s %s %s`\n' % (idx, deps,
60-
self._qsub_args,
54+
qsub_args,
6155
batchscriptfile))
6256
cmd = CommandLine('sh', environ=os.environ.data)
6357
cmd.inputs.args = '%s' % submitjobsfile
6458
cmd.run()
6559
logger.info('submitted all jobs to queue')
66-

0 commit comments

Comments
 (0)