Skip to content

Commit 9ba3f88

Browse files
committed
NF: Option to block until DAGMan has finished
This will make it easier to use DAGMan-based workflows in other script that do further processing of the output.
1 parent ad04f46 commit 9ba3f88

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

nipype/pipeline/plugins/dagman.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import sys
66
import uuid
7+
import time
78
from warnings import warn
89

910
from .base import (GraphPluginBase, logger)
@@ -34,6 +35,8 @@ class CondorDAGManPlugin(GraphPluginBase):
3435
- wrapper_args : optional additional arguments to a wrapper command
3536
- dagman_args : arguments to be prepended to the job execution script in the
3637
dagman call
38+
- block : if True the plugin call will block until Condor has finished
39+
prcoessing the entire workflow
3740
"""
3841

3942
default_submit_template = """
@@ -69,12 +72,15 @@ def __init__(self, **kwargs):
6972
('_override_specs', 'override_specs', ''),
7073
('_wrapper_cmd', 'wrapper_cmd', None),
7174
('_wrapper_args', 'wrapper_args', ''),
75+
('_block', 'block', False),
7276
('_dagman_args', 'dagman_args', '')):
7377
if 'plugin_args' in kwargs \
7478
and not kwargs['plugin_args'] is None \
7579
and id_ in kwargs['plugin_args']:
7680
if id_ == 'wrapper_cmd':
7781
val = os.path.abspath(kwargs['plugin_args'][id_])
82+
elif id_ == 'block':
83+
val = kwargs['plugin_args'][id_]
7884
else:
7985
val = self._get_str_or_file(kwargs['plugin_args'][id_])
8086
setattr(self, var, val)
@@ -150,3 +156,13 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
150156
self._dagman_args)
151157
cmd.run()
152158
logger.info('submitted all jobs to Condor DAGMan')
159+
if self._block:
160+
# wait for DAGMan to settle down, no time wasted it is already running
161+
time.sleep(10)
162+
if not os.path.exists('%s.condor.sub' % dagfilename):
163+
raise EnvironmentError("DAGMan did not create its submit file, please check the logs")
164+
# wait for completion
165+
logger.info('waiting for DAGMan to finish')
166+
lockfilename = '%s.lock' % dagfilename
167+
while os.path.exists(lockfilename):
168+
time.sleep(5)

0 commit comments

Comments
 (0)