Skip to content

Commit 8749334

Browse files
committed
feat: add moab
Signed-off-by: vsoch <[email protected]>
1 parent 3e6e760 commit 8749334

File tree

3 files changed

+310
-0
lines changed

3 files changed

+310
-0
lines changed

fractale/transformer/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from .oar import Transformer as OARTransformer
88
from .pbs import Transformer as PBSTransformer
99
from .slurm import Transformer as SlurmTransformer
10+
from .moab import Transformer as MoabTransformer
1011

1112
plugins = {
1213
"kubernetes": KubernetesTransformer,
@@ -16,6 +17,7 @@
1617
"lsf": LSFTransformer,
1718
"oar": OARTransformer,
1819
"cobalt": CobaltTransformer,
20+
"moab": MoabTransformer,
1921
}
2022

2123

@@ -32,6 +34,8 @@ def detect_transformer(jobspec):
3234
content = utils.read_file(jobspec)
3335
if "#FLUX" in content and "FLUX_CAPACITOR" not in content:
3436
return "flux"
37+
if "#MSUB " in content:
38+
return "moab"
3539
if "#SBATCH " in content:
3640
return "slurm"
3741
if "kind:" in content and "Job" in content:

fractale/transformer/moab/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .transform import MoabTransformer as Transformer
2+
3+
assert Transformer
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
import re
2+
import shlex
3+
from datetime import timedelta
4+
5+
import fractale.utils as utils
6+
from fractale.logger.generate import JobNamer
7+
from fractale.transformer.base import Script, TransformerBase
8+
from fractale.transformer.common import JobSpec
9+
10+
11+
class MoabScript(Script):
12+
"""
13+
A helper class to build a Moab (#MSUB) batch script line by line.
14+
"""
15+
def __init__(self):
16+
self.script_lines = ["#!/bin/bash"]
17+
self.directive = "#MSUB"
18+
19+
def add(self, flag, value=None):
20+
"""
21+
Add a Moab directive, e.g., #MSUB -N my-job.
22+
Format is #MSUB -<flag> <value>
23+
"""
24+
if value is None:
25+
return
26+
self.script_lines.append(f"{self.directive} -{flag} {str(value)}")
27+
28+
29+
def seconds_to_moab_walltime(seconds):
30+
"""
31+
Converts an integer number of seconds into Moab's HH:MM:SS walltime format.
32+
"""
33+
# This shouldn't happen, but we return 0 so we use the default.
34+
if not seconds or seconds <= 0:
35+
return None
36+
37+
# Moab walltime does not typically include days.
38+
hours, seconds_rem = divmod(seconds, 3600)
39+
minutes, seconds = divmod(seconds_rem, 60)
40+
41+
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
42+
43+
44+
def epoch_to_moab_begin_time(epoch_seconds: int) -> str:
45+
"""
46+
Converts a Unix epoch timestamp into Moab's required epoch integer string
47+
for the '-S' (starttime) flag.
48+
"""
49+
if not isinstance(epoch_seconds, int) or epoch_seconds < 0:
50+
raise ValueError("begin_time must be a positive integer (Unix epoch seconds).")
51+
52+
return str(epoch_seconds)
53+
54+
55+
def moab_walltime_to_seconds(time_str):
56+
if not time_str:
57+
return None
58+
59+
h = 0
60+
m = 0
61+
s = 0
62+
parts = time_str.split(":")
63+
if len(parts) == 3:
64+
h, m, s = map(int, parts)
65+
elif len(parts) == 2:
66+
m, s = map(int, parts)
67+
elif len(parts) == 1:
68+
m = int(parts[0])
69+
return int(timedelta(hours=h, minutes=m, seconds=s).total_seconds())
70+
71+
72+
def parse_moab_command(command_lines, spec):
73+
"""
74+
Parses a moab command into parts.
75+
"""
76+
# We use the last command line as the primary execution logic
77+
main_command = command_lines[-1]
78+
parts = shlex.split(main_command)
79+
80+
# Unwrap common launchers like mpiexec
81+
if parts and parts[0] in ('mpiexec', 'mpirun'):
82+
parts = parts[1:]
83+
84+
if parts and parts[0] in ("singularity", "apptainer") and parts[1] == "exec":
85+
spec.container_image = parts[2]
86+
# The rest is the command inside the container
87+
parts = parts[3:]
88+
89+
# Handle input redirection
90+
if "<" in parts:
91+
try:
92+
idx = parts.index("<")
93+
spec.input_file = parts[idx + 1]
94+
# Remove '<' and the filename from the arguments
95+
parts.pop(idx)
96+
parts.pop(idx)
97+
except (ValueError, IndexError):
98+
pass
99+
return parts
100+
101+
102+
def moab_begin_time_to_epoch(time_str):
103+
"""
104+
Converts a Moab begin time string (epoch) to an integer.
105+
"""
106+
if not time_str:
107+
return None
108+
try:
109+
return int(time_str)
110+
except ValueError:
111+
return None
112+
113+
114+
def priority_to_moab_priority(priority):
115+
"""
116+
Maps a semantic priority string ("high") to a Moab priority value (-1024 to 1023).
117+
"""
118+
# Higher value means HIGHER priority in Moab.
119+
mapping = {
120+
"low": -500,
121+
"normal": 0,
122+
"high": 500,
123+
"urgent": 1000,
124+
}
125+
# Default to 'normal' (0) if the string is None or not in the map
126+
return mapping.get(priority, 0)
127+
128+
129+
def moab_priority_to_priority(moab_priority):
130+
"""
131+
Maps a Moab priority value back to a semantic string ("high").
132+
"""
133+
if moab_priority is None or moab_priority == 0:
134+
return "normal"
135+
if moab_priority < 0:
136+
return "low"
137+
if 0 < moab_priority < 1000:
138+
return "high"
139+
return "urgent" # for priority >= 1000
140+
141+
142+
class MoabTransformer(TransformerBase):
143+
"""
144+
A Moab Transformer for converting a generic JobSpec into a Moab batch script.
145+
146+
This transformer maps the fields of the JobSpec to their corresponding #MSUB
147+
directives and constructs a runnable script.
148+
"""
149+
150+
def convert(self, spec) -> str:
151+
"""
152+
Convert a normalized jobspec to a Moab batch script.
153+
"""
154+
script = MoabScript()
155+
156+
# If we don't have a job name, generate one
157+
job_name = spec.job_name or JobNamer().generate()
158+
script.add("N", job_name)
159+
160+
# Job Identity & Accounting
161+
script.add("A", spec.account)
162+
163+
# I/O
164+
script.add("o", spec.output_file)
165+
script.add("e", spec.error_file)
166+
# Moab does not have direct mail-type equivalents like Slurm, -m is simpler
167+
if spec.mail_user:
168+
script.add("m", spec.mail_user)
169+
170+
# Resource Requests
171+
resource_parts = []
172+
# Moab's `-l` is key: nodes=X:ppn=Y where ppn is procs per node
173+
if spec.num_nodes and spec.cpus_per_task:
174+
resource_parts.append(f"nodes={spec.num_nodes}:ppn={spec.cpus_per_task}")
175+
176+
if spec.gpus_per_task and spec.gpus_per_task > 0:
177+
resource_parts.append(f"gpus={spec.gpus_per_task}")
178+
179+
if spec.mem_per_task:
180+
resource_parts.append(f"mem={spec.mem_per_task}")
181+
182+
if spec.exclusive_access:
183+
resource_parts.append("naccesspolicy=singlejob")
184+
185+
# Scheduling and Constraints
186+
wt = seconds_to_moab_walltime(spec.wall_time)
187+
if wt:
188+
script.add("l", f"walltime={wt}")
189+
190+
if resource_parts:
191+
script.add("l", " ".join(resource_parts))
192+
193+
script.add("q", spec.queue)
194+
195+
moab_prio = priority_to_moab_priority(spec.priority)
196+
if moab_prio != 0:
197+
script.add("p", moab_prio)
198+
199+
if spec.begin_time:
200+
script.add("S", epoch_to_moab_begin_time(spec.begin_time))
201+
202+
script.add("d", spec.working_directory)
203+
204+
# Dependencies: `-l depend=...`
205+
if spec.depends_on:
206+
if isinstance(spec.depends_on, list):
207+
dependency_str = ":".join(spec.depends_on)
208+
script.add("l", f"depend=afterok:{dependency_str}")
209+
else:
210+
script.add("l", f"depend={spec.depends_on}")
211+
212+
# I am just adding this for readability
213+
script.newline()
214+
215+
# Environment Variables
216+
if spec.environment:
217+
for key, value in spec.environment.items():
218+
script.add_line(f"export {key}='{value}'")
219+
script.newline()
220+
221+
# Execution logic
222+
container_exec = []
223+
if spec.container_image:
224+
container_exec = ["singularity", "exec", spec.container_image]
225+
226+
command_parts = container_exec + spec.script
227+
228+
if spec.input_file:
229+
command_parts.append(f"< {spec.input_file}")
230+
for line in command_parts:
231+
script.add_line(line)
232+
script.newline()
233+
return script.render()
234+
235+
def _parse(self, filename, return_unhandled=False):
236+
"""
237+
Parses the content of a Moab batch script into a JobSpec object.
238+
"""
239+
spec = JobSpec()
240+
command_lines = []
241+
not_handled = set()
242+
243+
# Regex to capture #MSUB directives (simple -f <val> format)
244+
msub_re = re.compile(r"#MSUB\s+-(\w+)(?:\s+(.+))?")
245+
script_content = utils.read_file(filename)
246+
247+
for line in script_content.splitlines():
248+
if not line.strip():
249+
continue
250+
251+
match = msub_re.match(line)
252+
if match:
253+
key, value = match.groups()
254+
value = value.strip() if value else ""
255+
256+
if key == 'N': spec.job_name = value
257+
elif key == 'A': spec.account = value
258+
elif key == 'o': spec.output_file = value
259+
elif key == 'e': spec.error_file = value
260+
elif key == 'm': spec.mail_user = value
261+
elif key == 'q': spec.queue = value
262+
elif key == 'd': spec.working_directory = value
263+
elif key == 'S': spec.begin_time = moab_begin_time_to_epoch(value)
264+
elif key == 'p': spec.priority = moab_priority_to_priority(int(value))
265+
elif key == 'l':
266+
# The -l line can contain walltime and other resources
267+
# e.g., -l walltime=01:00:00 -l nodes=4:ppn=8
268+
if 'walltime' in value:
269+
spec.wall_time = moab_walltime_to_seconds(value.split('=', 1)[1])
270+
elif 'nodes' in value:
271+
for part in value.split(':'):
272+
k, v = part.split('=', 1)
273+
if k == 'nodes': spec.num_nodes = int(v)
274+
elif k == 'ppn': spec.cpus_per_task = int(v)
275+
elif 'mem' in value:
276+
spec.mem_per_task = value.split('=', 1)[1]
277+
elif 'gpus' in value:
278+
spec.gpus_per_task = int(value.split('=', 1)[1])
279+
elif 'depend' in value:
280+
spec.depends_on = value.split('=', 1)[1]
281+
elif 'naccesspolicy' in value and 'singlejob' in value:
282+
spec.exclusive_access = True
283+
else:
284+
not_handled.add(key)
285+
continue
286+
287+
if line.lower().startswith("export "):
288+
env_match = re.match(r"export\s+([^=]+)=(.*)", line)
289+
if env_match:
290+
env_key, env_val = env_match.groups()
291+
spec.environment[env_key] = env_val.strip("'\"")
292+
293+
if line.startswith("#"):
294+
continue
295+
296+
command_lines.append(line)
297+
298+
if command_lines:
299+
spec.script = command_lines
300+
301+
if return_unhandled:
302+
return not_handled
303+
return spec

0 commit comments

Comments
 (0)