4
4
import os
5
5
import sys
6
6
import uuid
7
+ import time
8
+ from warnings import warn
7
9
8
10
from .base import (GraphPluginBase , logger )
9
11
@@ -14,19 +16,46 @@ class CondorDAGManPlugin(GraphPluginBase):
14
16
"""Execute using Condor DAGMan
15
17
16
18
The plugin_args input to run can be used to control the DAGMan execution.
19
+ The value of most arguments can be a literal string or a filename, where in
20
+ the latter case the content of the file will be used as the argument value.
21
+
17
22
Currently supported options are:
18
23
19
- - template : submit spec template for individual jobs in a DAG. All
20
- generated submit spec components (e.g. executable name and
21
- arguments) are appended to this template. This can be a str or
22
- a filename. In the latter case the file content is used as a
23
- template.
24
- - submit_specs : additional submit specs that are appended to the generated
25
- submit specs to allow for overriding or extending the defaults.
26
- This can be a str or a filename.
24
+ - submit_template : submit spec template for individual jobs in a DAG (see
25
+ CondorDAGManPlugin.default_submit_template for the default.
26
+ - initial_specs : additional submit specs that are prepended to any job's
27
+ submit file
28
+ - override_specs : additional submit specs that are appended to any job's
29
+ submit file
30
+ - wrapper_cmd : path to an exectuable that will be started instead of a node
31
+ script. This is useful for wrapper script that execute certain
32
+ functionality prior or after a node runs. If this option is
33
+ given the wrapper command is called with the respective Python
34
+ exectuable and the path to the node script as final arguments
35
+ - wrapper_args : optional additional arguments to a wrapper command
27
36
- dagman_args : arguments to be prepended to the job execution script in the
28
37
dagman call
38
+ - block : if True the plugin call will block until Condor has finished
39
+ prcoessing the entire workflow (default: False)
29
40
"""
41
+
42
+ default_submit_template = """
43
+ universe = vanilla
44
+ notification = Never
45
+ executable = %(executable)s
46
+ arguments = %(nodescript)s
47
+ output = %(basename)s.out
48
+ error = %(basename)s.err
49
+ log = %(basename)s.log
50
+ getenv = True
51
+ """
52
+ def _get_str_or_file (self , arg ):
53
+ if os .path .isfile (arg ):
54
+ content = open (arg ).read ()
55
+ else :
56
+ content = arg
57
+ return content
58
+
30
59
# XXX feature wishlist
31
60
# - infer data file dependencies from jobs
32
61
# - infer CPU requirements from jobs
@@ -35,21 +64,34 @@ class CondorDAGManPlugin(GraphPluginBase):
35
64
# actually have to run. would be good to be able to decide whether they
36
65
# actually have to be scheduled (i.e. output already exist).
37
66
def __init__ (self , ** kwargs ):
38
- self ._template = "universe = vanilla\n notification = Never"
39
- self ._submit_specs = ""
40
- self ._dagman_args = ""
41
- if 'plugin_args' in kwargs and not kwargs ['plugin_args' ] is None :
67
+ for var , id_ , val in \
68
+ (('_template' , 'submit_template' , self .default_submit_template ),
69
+ ('_initial_specs' , 'template' , '' ),
70
+ ('_initial_specs' , 'initial_specs' , '' ),
71
+ ('_override_specs' , 'submit_specs' , '' ),
72
+ ('_override_specs' , 'override_specs' , '' ),
73
+ ('_wrapper_cmd' , 'wrapper_cmd' , None ),
74
+ ('_wrapper_args' , 'wrapper_args' , '' ),
75
+ ('_block' , 'block' , False ),
76
+ ('_dagman_args' , 'dagman_args' , '' )):
77
+ if 'plugin_args' in kwargs \
78
+ and not kwargs ['plugin_args' ] is None \
79
+ and id_ in kwargs ['plugin_args' ]:
80
+ if id_ == 'wrapper_cmd' :
81
+ val = os .path .abspath (kwargs ['plugin_args' ][id_ ])
82
+ elif id_ == 'block' :
83
+ val = kwargs ['plugin_args' ][id_ ]
84
+ else :
85
+ val = self ._get_str_or_file (kwargs ['plugin_args' ][id_ ])
86
+ setattr (self , var , val )
87
+ # TODO remove after some time
88
+ if 'plugin_args' in kwargs \
89
+ and not kwargs ['plugin_args' ] is None :
42
90
plugin_args = kwargs ['plugin_args' ]
43
91
if 'template' in plugin_args :
44
- self ._template = plugin_args ['template' ]
45
- if os .path .isfile (self ._template ):
46
- self ._template = open (self ._template ).read ()
92
+ warn ("the 'template' argument is deprecated, use 'initial_specs' instead" )
47
93
if 'submit_specs' in plugin_args :
48
- self ._submit_specs = plugin_args ['submit_specs' ]
49
- if os .path .isfile (self ._submit_specs ):
50
- self ._submit_specs = open (self ._submit_specs ).read ()
51
- if 'dagman_args' in plugin_args :
52
- self ._dagman_args = plugin_args ['dagman_args' ]
94
+ warn ("the 'submit_specs' argument is deprecated, use 'override_specs' instead" )
53
95
super (CondorDAGManPlugin , self ).__init__ (** kwargs )
54
96
55
97
def _submit_graph (self , pyfiles , dependencies , nodes ):
@@ -62,26 +104,35 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
62
104
# as jobs in the DAG
63
105
for idx , pyscript in enumerate (pyfiles ):
64
106
node = nodes [idx ]
65
- template , submit_specs = self ._get_args (
66
- node , ["template" , "submit_specs" ])
67
107
# XXX redundant with previous value? or could it change between
68
108
# scripts?
109
+ template , initial_specs , override_specs , wrapper_cmd , wrapper_args = \
110
+ self ._get_args (node ,
111
+ ["template" , "initial_specs" ,
112
+ "override_specs" , "wrapper_cmd" ,
113
+ "wrapper_args" ])
114
+ # add required slots to the template
115
+ template = '%s\n %s\n %s\n queue\n ' % (
116
+ '%(initial_specs)s' ,
117
+ template ,
118
+ '%(override_specs)s' )
69
119
batch_dir , name = os .path .split (pyscript )
70
120
name = '.' .join (name .split ('.' )[:- 1 ])
71
- submitspec = '\n ' .join (
72
- (template ,
73
- 'executable = %s' % sys .executable ,
74
- 'arguments = %s' % pyscript ,
75
- 'output = %s' % os .path .join (batch_dir ,
76
- '%s.out' % name ),
77
- 'error = %s' % os .path .join (batch_dir ,
78
- '%s.err' % name ),
79
- 'log = %s' % os .path .join (batch_dir ,
80
- '%s.log' % name ),
81
- 'getenv = True' ,
82
- submit_specs ,
83
- 'queue'
84
- ))
121
+ specs = dict (
122
+ # TODO make parameter for this,
123
+ initial_specs = initial_specs ,
124
+ executable = sys .executable ,
125
+ nodescript = pyscript ,
126
+ basename = os .path .join (batch_dir , name ),
127
+ override_specs = override_specs
128
+ )
129
+ if not wrapper_cmd is None :
130
+ specs ['executable' ] = wrapper_cmd
131
+ specs ['nodescript' ] = \
132
+ '%s %s %s' % (wrapper_args % specs , # give access to variables
133
+ sys .executable ,
134
+ pyscript )
135
+ submitspec = template % specs
85
136
# write submit spec for this job
86
137
submitfile = os .path .join (batch_dir ,
87
138
'%s.submit' % name )
@@ -105,3 +156,13 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
105
156
self ._dagman_args )
106
157
cmd .run ()
107
158
logger .info ('submitted all jobs to Condor DAGMan' )
159
+ if self ._block :
160
+ # wait for DAGMan to settle down, no time wasted it is already running
161
+ time .sleep (10 )
162
+ if not os .path .exists ('%s.condor.sub' % dagfilename ):
163
+ raise EnvironmentError ("DAGMan did not create its submit file, please check the logs" )
164
+ # wait for completion
165
+ logger .info ('waiting for DAGMan to finish' )
166
+ lockfilename = '%s.lock' % dagfilename
167
+ while os .path .exists (lockfilename ):
168
+ time .sleep (5 )
0 commit comments