@@ -27,6 +27,24 @@ class CondorDAGManPlugin(GraphPluginBase):
27
27
- dagman_args : arguments to be prepended to the job execution script in the
28
28
dagman call
29
29
"""
30
+
31
+ default_submit_template = """
32
+ universe = vanilla
33
+ notification = Never
34
+ executable = %(executable)s
35
+ arguments = %(nodescript)s
36
+ output = %(basename)s.out
37
+ error = %(basename)s.err
38
+ log = %(basename)s.log
39
+ getenv = True
40
+ """
41
+ def _get_str_or_file (self , arg ):
42
+ if os .path .isfile (arg ):
43
+ content = open (arg ).read ()
44
+ else :
45
+ content = arg
46
+ return content
47
+
30
48
# XXX feature wishlist
31
49
# - infer data file dependencies from jobs
32
50
# - infer CPU requirements from jobs
@@ -35,19 +53,21 @@ class CondorDAGManPlugin(GraphPluginBase):
35
53
# actually have to run. would be good to be able to decide whether they
36
54
# actually have to be scheduled (i.e. output already exist).
37
55
def __init__ (self , ** kwargs ):
38
- self ._template = "universe = vanilla\n notification = Never"
39
- self ._submit_specs = ""
56
+ self ._template = self .default_submit_template
57
+ self ._initial_specs = ""
58
+ self ._override_specs = ""
40
59
self ._dagman_args = ""
41
60
if 'plugin_args' in kwargs and not kwargs ['plugin_args' ] is None :
42
61
plugin_args = kwargs ['plugin_args' ]
43
62
if 'template' in plugin_args :
44
- self ._template = plugin_args ['template' ]
45
- if os .path .isfile (self ._template ):
46
- self ._template = open (self ._template ).read ()
47
- if 'submit_specs' in plugin_args :
48
- self ._submit_specs = plugin_args ['submit_specs' ]
49
- if os .path .isfile (self ._submit_specs ):
50
- self ._submit_specs = open (self ._submit_specs ).read ()
63
+ self ._template = \
64
+ self ._get_str_or_file (plugin_args ['template' ])
65
+ if 'initial_specs' in plugin_args :
66
+ self ._initial_specs = \
67
+ self ._get_str_or_file (plugin_args ['initial_specs' ])
68
+ if 'override_specs' in plugin_args :
69
+ self ._override_specs = \
70
+ self ._get_str_or_file (plugin_args ['override_specs' ])
51
71
if 'dagman_args' in plugin_args :
52
72
self ._dagman_args = plugin_args ['dagman_args' ]
53
73
super (CondorDAGManPlugin , self ).__init__ (** kwargs )
@@ -62,27 +82,25 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
62
82
# as jobs in the DAG
63
83
for idx , pyscript in enumerate (pyfiles ):
64
84
node = nodes [idx ]
65
- template , submit_specs = self ._get_args (
66
- node , ["template" , "submit_specs" ])
67
85
# XXX redundant with previous value? or could it change between
68
86
# scripts?
87
+ template , initial_specs , override_specs = self ._get_args (
88
+ node , ["template" , "initial_specs" , "override_specs" ])
89
+ # add required slots to the template
90
+ template = '%s\n %s\n %s\n ' % ('%(initial_specs)s' ,
91
+ template ,
92
+ '%(override_specs)s' )
69
93
batch_dir , name = os .path .split (pyscript )
70
94
name = '.' .join (name .split ('.' )[:- 1 ])
71
- submitspec = '\n ' .join (
72
- (template ,
73
- 'executable = %s' % sys .executable ,
74
- 'arguments = %s' % pyscript ,
75
- 'output = %s' % os .path .join (batch_dir ,
76
- '%s.out' % name ),
77
- 'error = %s' % os .path .join (batch_dir ,
78
- '%s.err' % name ),
79
- 'log = %s' % os .path .join (batch_dir ,
80
- '%s.log' % name ),
81
- 'getenv = True' ,
82
- submit_specs ,
83
- 'queue' ,
84
- ''
85
- ))
95
+ specs = dict (
96
+ # TODO make parameter for this,
97
+ initial_specs = initial_specs ,
98
+ executable = sys .executable ,
99
+ nodescript = pyscript ,
100
+ basename = os .path .join (batch_dir , name ),
101
+ override_specs = override_specs
102
+ )
103
+ submitspec = template % specs
86
104
# write submit spec for this job
87
105
submitfile = os .path .join (batch_dir ,
88
106
'%s.submit' % name )
0 commit comments