Skip to content

Commit 1e4c4f1

Browse files
committed
NF: Wrapper command for DAGMan jobs
This enables checkpoint/migration wrapper to be combined for Nipype workflows.
1 parent 03519b6 commit 1e4c4f1

File tree

2 files changed

+60
-12
lines changed

2 files changed

+60
-12
lines changed

doc/users/plugins.rst

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,18 +176,45 @@ Workflow execution with HTCondor DAGMan is done by calling::
176176
Job execution behavior can be tweaked with the following optional plug-in
177177
arguments::
178178

179-
template : submit spec template to use for job submission. The template
180-
all generated submit specs are appended to this template. This
181-
can be a str or a filename.
182-
submit_specs : additional submit specs that are appended to the generated
183-
submit specs to allow for overriding or extending the defaults.
184-
This can be a str or a filename.
185-
dagman_args : arguments to be prepended to the job execution script in the
186-
dagman call
179+
submit_template : submit spec template for individual jobs in a DAG (see
180+
CondorDAGManPlugin.default_submit_template for the default.
181+
initial_specs : additional submit specs that are prepended to any job's
182+
submit file
183+
override_specs : additional submit specs that are appended to any job's
184+
submit file
185+
wrapper_cmd : path to an exectuable that will be started instead of a node
186+
script. This is useful for wrapper script that execute certain
187+
functionality prior or after a node runs. If this option is
188+
given the wrapper command is called with the respective Python
189+
exectuable and the path to the node script as final arguments
190+
wrapper_args : optional additional arguments to a wrapper command
191+
dagman_args : arguments to be prepended to the job execution script in the
192+
dagman call
187193

188194
Please see the `HTCondor documentation`_ for details on possible configuration
189195
options and command line arguments.
190196

197+
Using the ``wrapper_cmd`` argument it is possible to combine Nipype workflow
198+
execution with checkpoint/migration functionality offered by, for example,
199+
DMTCP_. On a Debian system, executing a workflow with support for
200+
checkpoint/migration for all nodes could look like this::
201+
202+
# define common parameters
203+
dmtcp_hdr = """
204+
should_transfer_files = YES
205+
when_to_transfer_output = ON_EXIT_OR_EVICT
206+
kill_sig = 2
207+
environment = DMTCP_TMPDIR=./;JALIB_STDERR_PATH=/dev/null;DMTCP_PREFIX_ID=$(CLUSTER)_$(PROCESS)
208+
"""
209+
shim_args = "--log %(basename)s.shimlog --stdout %(basename)s.shimout --stderr %(basename)s.shimerr"
210+
# run workflow
211+
workflow.run(
212+
plugin='CondorDAGMan',
213+
plugin_args=dict(initial_specs=dmtcp_hdr,
214+
wrapper_cmd='/usr/lib/condor/shim_dmtcp',
215+
wrapper_args=shim_args)
216+
)
217+
191218
``qsub`` emulation
192219
~~~~~~~~~~~~~~~~~~
193220

@@ -230,3 +257,4 @@ Optional arguments::
230257
.. _HTCondor: http://www.cs.wisc.edu/htcondor/
231258
.. _DAGMan: http://research.cs.wisc.edu/htcondor/dagman/dagman.html
232259
.. _HTCondor documentation: http://research.cs.wisc.edu/htcondor/manual
260+
.. _DMTCP: http://dmtcp.sourceforge.net

nipype/pipeline/plugins/dagman.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class CondorDAGManPlugin(GraphPluginBase):
1515
"""Execute using Condor DAGMan
1616
1717
The plugin_args input to run can be used to control the DAGMan execution.
18-
The value of any argument can be a literal string or a filename, where in
18+
The value of most arguments can be a literal string or a filename, where in
1919
the latter case the content of the file will be used as the argument value.
2020
2121
Currently supported options are:
@@ -26,6 +26,12 @@ class CondorDAGManPlugin(GraphPluginBase):
2626
submit file
2727
- override_specs : additional submit specs that are appended to any job's
2828
submit file
29+
- wrapper_cmd : path to an exectuable that will be started instead of a node
30+
script. This is useful for wrapper script that execute certain
31+
functionality prior or after a node runs. If this option is
32+
given the wrapper command is called with the respective Python
33+
exectuable and the path to the node script as final arguments
34+
- wrapper_args : optional additional arguments to a wrapper command
2935
- dagman_args : arguments to be prepended to the job execution script in the
3036
dagman call
3137
"""
@@ -61,11 +67,16 @@ def __init__(self, **kwargs):
6167
('_initial_specs', 'initial_specs', ''),
6268
('_override_specs', 'submit_specs', ''),
6369
('_override_specs', 'override_specs', ''),
70+
('_wrapper_cmd', 'wrapper_cmd', None),
71+
('_wrapper_args', 'wrapper_args', ''),
6472
('_dagman_args', 'dagman_args', '')):
6573
if 'plugin_args' in kwargs \
6674
and not kwargs['plugin_args'] is None \
6775
and id_ in kwargs['plugin_args']:
68-
val = self._get_str_or_file(kwargs['plugin_args'][id_])
76+
if id_ == 'wrapper_cmd':
77+
val = os.path.abspath(kwargs['plugin_args'][id_])
78+
else:
79+
val = self._get_str_or_file(kwargs['plugin_args'][id_])
6980
setattr(self, var, val)
7081
# TODO remove after some time
7182
if 'plugin_args' in kwargs \
@@ -89,8 +100,11 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
89100
node = nodes[idx]
90101
# XXX redundant with previous value? or could it change between
91102
# scripts?
92-
template, initial_specs, override_specs = self._get_args(
93-
node, ["template", "initial_specs", "override_specs"])
103+
template, initial_specs, override_specs, wrapper_cmd, wrapper_args = \
104+
self._get_args(node,
105+
["template", "initial_specs",
106+
"override_specs", "wrapper_cmd",
107+
"wrapper_args"])
94108
# add required slots to the template
95109
template = '%s\n%s\n%s\n' % ('%(initial_specs)s',
96110
template,
@@ -105,6 +119,12 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
105119
basename=os.path.join(batch_dir, name),
106120
override_specs=override_specs
107121
)
122+
if not wrapper_cmd is None:
123+
specs['executable'] = wrapper_cmd
124+
specs['nodescript'] = \
125+
'%s %s %s' % (wrapper_args % specs, # give access to variables
126+
sys.executable,
127+
pyscript)
108128
submitspec = template % specs
109129
# write submit spec for this job
110130
submitfile = os.path.join(batch_dir,

0 commit comments

Comments
 (0)