Skip to content

Commit 321b7fa

Browse files
committed
updates to moab to handle missed directives
Signed-off-by: vsoch <[email protected]>
1 parent 8749334 commit 321b7fa

File tree

7 files changed

+264
-163
lines changed

7 files changed

+264
-163
lines changed

fractale/transformer/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
from .flux import Transformer as FluxTransformer
55
from .kubernetes import Transformer as KubernetesTransformer
66
from .lsf import Transformer as LSFTransformer
7+
from .moab import Transformer as MoabTransformer
78
from .oar import Transformer as OARTransformer
89
from .pbs import Transformer as PBSTransformer
910
from .slurm import Transformer as SlurmTransformer
10-
from .moab import Transformer as MoabTransformer
1111

1212
plugins = {
1313
"kubernetes": KubernetesTransformer,

fractale/transformer/cobalt/transform.py

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def parse_cobalt_command(command_lines, spec):
9999
if line and not line.startswith("#"):
100100
main_command = line
101101
break
102-
102+
103103
if not main_command:
104104
return []
105105

@@ -146,12 +146,14 @@ def convert(self, spec):
146146
bt = epoch_to_cobalt_begin_time(spec.begin_time)
147147
if bt:
148148
qsub_cmd.extend(["--at", bt])
149-
149+
150150
# Dependencies are specified with a colon-separated list of job IDs
151151
if spec.depends_on:
152-
dep_str = ":".join(spec.depends_on) if isinstance(spec.depends_on, list) else spec.depends_on
152+
dep_str = (
153+
":".join(spec.depends_on) if isinstance(spec.depends_on, list) else spec.depends_on
154+
)
153155
qsub_cmd.extend(["--dependencies", dep_str])
154-
156+
155157
# Node constraints are handled by --attrs
156158
if spec.constraints:
157159
qsub_cmd.extend(["--attrs", ":".join(spec.constraints)])
@@ -163,19 +165,18 @@ def convert(self, spec):
163165
qsub_cmd.extend(["-o", spec.output_file])
164166
if spec.error_file:
165167
qsub_cmd.extend(["-e", spec.error_file])
166-
168+
167169
# Email notifications
168170
if spec.mail_user:
169171
qsub_cmd.extend(["-M", spec.mail_user])
170172
# Cobalt has a simple '--notify user' flag, equivalent to ALL in Slurm.
171173
if spec.mail_type:
172174
qsub_cmd.append("--notify user")
173175

174-
175176
if spec.environment:
176177
for k, v in spec.environment.items():
177178
qsub_cmd.extend(["--env", f"{k}={v}"])
178-
179+
179180
# Note: Cobalt exclusive access is often handled by queue policy or `--mode script`.
180181
# We omit a direct flag to avoid conflicting with system-specific setups.
181182

@@ -192,16 +193,16 @@ def convert(self, spec):
192193

193194
if spec.container_image:
194195
aprun_cmd.extend(["singularity", "exec", spec.container_image])
195-
196+
196197
# If spec.script is defined, it takes precedence over executable/arguments
197198
if spec.script:
198-
exec_script_parts.extend(spec.script)
199+
exec_script_parts.extend(spec.script)
199200
else:
200-
if spec.executable:
201-
aprun_cmd.append(spec.executable)
202-
if spec.arguments:
203-
aprun_cmd.extend(spec.arguments)
204-
exec_script_parts.append(" ".join(aprun_cmd))
201+
if spec.executable:
202+
aprun_cmd.append(spec.executable)
203+
if spec.arguments:
204+
aprun_cmd.extend(spec.arguments)
205+
exec_script_parts.append(" ".join(aprun_cmd))
205206

206207
exec_script = "\n".join(exec_script_parts)
207208

@@ -239,7 +240,7 @@ def _parse(self, content, return_unhandled=False):
239240
i = 0
240241
while i < len(args):
241242
arg = args[i]
242-
243+
243244
# Logic to handle both `--key value` and `--key=value`
244245
key, value = None, None
245246
if "=" in arg:
@@ -251,10 +252,10 @@ def _parse(self, content, return_unhandled=False):
251252
if i + 1 < len(args) and not args[i + 1].startswith("-"):
252253
value = args[i + 1]
253254
i += 2
254-
else: # It's a boolean flag
255+
else: # It's a boolean flag
255256
value = True
256257
i += 1
257-
258+
258259
key = key.lstrip("-")
259260

260261
if key == "A":
@@ -270,8 +271,10 @@ def _parse(self, content, return_unhandled=False):
270271
elif key == "O":
271272
# This sets the job name AND the output file prefix
272273
spec.job_name = value
273-
if not spec.output_file: spec.output_file = f"{value}.output"
274-
if not spec.error_file: spec.error_file = f"{value}.error"
274+
if not spec.output_file:
275+
spec.output_file = f"{value}.output"
276+
if not spec.error_file:
277+
spec.error_file = f"{value}.error"
275278
elif key == "o":
276279
spec.output_file = value
277280
elif key == "e":
@@ -285,13 +288,13 @@ def _parse(self, content, return_unhandled=False):
285288
elif key == "M":
286289
spec.mail_user = value
287290
elif key == "notify" and value == "user":
288-
spec.mail_type = ["ALL"] # Simple mapping
291+
spec.mail_type = ["ALL"] # Simple mapping
289292
elif key == "env":
290-
env_key, env_val = value.split("=", 1)
291-
spec.environment[env_key] = env_val
293+
env_key, env_val = value.split("=", 1)
294+
spec.environment[env_key] = env_val
292295
else:
293296
not_handled.add(key)
294-
297+
295298
# We again assume a block of text here.
296299
if script_body:
297300
spec.script = script_body
@@ -315,7 +318,7 @@ def _parse(self, content, return_unhandled=False):
315318
temp_args.pop(idx)
316319
temp_args.pop(idx)
317320
except (ValueError, IndexError):
318-
pass # Ignore if parsing aprun fails
321+
pass # Ignore if parsing aprun fails
319322

320323
if temp_args:
321324
spec.executable = temp_args[0]

fractale/transformer/lsf/transform.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,14 @@ def parse_lsf_command(command_lines, spec):
9898
"""
9999
if not command_lines:
100100
return []
101-
101+
102102
main_command = ""
103103
for line in command_lines:
104104
line = line.strip()
105105
if line and not line.startswith("#"):
106106
main_command = line
107107
break
108-
108+
109109
if not main_command:
110110
return []
111111

@@ -141,7 +141,7 @@ def convert(self, spec):
141141
script.add("q", spec.queue)
142142
script.add("o", spec.output_file)
143143
script.add("e", spec.error_file)
144-
144+
145145
# Mail notifications
146146
if spec.mail_user:
147147
script.add("u", spec.mail_user)
@@ -162,11 +162,11 @@ def convert(self, spec):
162162
r_parts = []
163163
if spec.constraints:
164164
r_parts.append(f'select[{":".join(spec.constraints)}]')
165-
165+
166166
if spec.num_nodes > 1 and spec.num_tasks > 0:
167167
tasks_per_node = spec.num_tasks // spec.num_nodes
168168
if tasks_per_node > 0:
169-
r_parts.append(f'span[ptile={tasks_per_node}]')
169+
r_parts.append(f"span[ptile={tasks_per_node}]")
170170

171171
rusage_parts = []
172172
if spec.mem_per_task:
@@ -179,7 +179,7 @@ def convert(self, spec):
179179
if spec.gpus_per_task > 0:
180180
# ngpus_excl_p = GPUs per process (task) in exclusive mode.
181181
rusage_parts.append(f"ngpus_excl_p={spec.gpus_per_task}")
182-
182+
183183
if rusage_parts:
184184
r_parts.append(f'rusage[{":".join(rusage_parts)}]')
185185

@@ -188,9 +188,9 @@ def convert(self, spec):
188188

189189
if spec.exclusive_access:
190190
script.add_flag("x")
191-
191+
192192
if spec.requeue:
193-
script.add_flag("r") # -r makes the job rerunnable
193+
script.add_flag("r") # -r makes the job rerunnable
194194

195195
# --- Priority and Scheduling ---
196196
lsf_prio = priority_to_lsf_priority(spec.priority)
@@ -199,10 +199,14 @@ def convert(self, spec):
199199

200200
bt = epoch_to_lsf_begin_time(spec.begin_time)
201201
script.add("b", bt)
202-
202+
203203
# Dependencies
204204
if spec.depends_on:
205-
dep_str = " && ".join([f"ended({job_id})" for job_id in spec.depends_on]) if isinstance(spec.depends_on, list) else f"ended({spec.depends_on})"
205+
dep_str = (
206+
" && ".join([f"ended({job_id})" for job_id in spec.depends_on])
207+
if isinstance(spec.depends_on, list)
208+
else f"ended({spec.depends_on})"
209+
)
206210
script.add("w", f'"{dep_str}"')
207211

208212
script.newline()
@@ -212,7 +216,7 @@ def convert(self, spec):
212216
for key, value in spec.environment.items():
213217
script.add_line(f"export {key}='{value}'")
214218
script.newline()
215-
219+
216220
# If spec.script is defined, it takes precedence.
217221
if spec.script:
218222
script.add_lines(spec.script)
@@ -226,7 +230,7 @@ def convert(self, spec):
226230
cmd_parts.append(f"--rs_per_host {spec.num_tasks // spec.num_nodes}")
227231
cmd_parts.append(f"--tasks_per_rs 1")
228232
cmd_parts.append(f"--cpu_per_rs {spec.cpus_per_task}")
229-
233+
230234
if spec.container_image:
231235
cmd_parts.extend(["singularity", "exec", spec.container_image])
232236
if spec.executable:
@@ -255,9 +259,9 @@ def _parse(self, content, return_unhandled=False):
255259
m = bsub_re.match(line)
256260
if m:
257261
key, val = m.groups()
258-
key = key.lstrip('-')
262+
key = key.lstrip("-")
259263
if val:
260-
val = val.split("#", 1)[0] # Remove trailing comments
264+
val = val.split("#", 1)[0] # Remove trailing comments
261265

262266
val = val.strip().strip('"') if val else True
263267

@@ -290,14 +294,14 @@ def _parse(self, content, return_unhandled=False):
290294
elif key == "N":
291295
spec.mail_type.append("END")
292296
elif key == "w":
293-
ended_jobs = re.findall(r'ended\(([^)]+)\)', val)
297+
ended_jobs = re.findall(r"ended\(([^)]+)\)", val)
294298
spec.depends_on = ended_jobs
295299
elif key == "R":
296300
# Parse complex -R string
297-
rusage_match = re.search(r'rusage\[(.*?)\]', val)
298-
span_match = re.search(r'span\[ptile=(\d+)\]', val)
299-
select_match = re.search(r'select\[(.*?)\]', val)
300-
301+
rusage_match = re.search(r"rusage\[(.*?)\]", val)
302+
span_match = re.search(r"span\[ptile=(\d+)\]", val)
303+
select_match = re.search(r"select\[(.*?)\]", val)
304+
301305
if rusage_match:
302306
for part in rusage_match.group(1).split(":"):
303307
k, v = part.split("=", 1)
@@ -310,7 +314,7 @@ def _parse(self, content, return_unhandled=False):
310314
if spec.num_tasks > 0 and tasks_per_node > 0:
311315
spec.num_nodes = spec.num_tasks // tasks_per_node
312316
if select_match:
313-
spec.constraints.extend(select_match.group(1).split(':'))
317+
spec.constraints.extend(select_match.group(1).split(":"))
314318
else:
315319
not_handled.add(key)
316320
continue

0 commit comments

Comments
 (0)