Skip to content

Commit 3788f62

Browse files
committed
tweak: updates to transformers
Signed-off-by: vsoch <[email protected]>
1 parent 371dfae commit 3788f62

File tree

6 files changed

+540
-168
lines changed

6 files changed

+540
-168
lines changed

fractale/transformer/cobalt/transform.py

Lines changed: 122 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,17 @@ def parse_cobalt_command(command_lines, spec):
9292
if not command_lines:
9393
return []
9494

95-
main_command = command_lines[-1]
95+
# Find the first non-empty, non-comment line, which is likely the main command
96+
main_command = ""
97+
for line in command_lines:
98+
line = line.strip()
99+
if line and not line.startswith("#"):
100+
main_command = line
101+
break
102+
103+
if not main_command:
104+
return []
105+
96106
parts = shlex.split(main_command)
97107

98108
# The common launcher on ALCF systems is 'aprun'
@@ -136,13 +146,38 @@ def convert(self, spec):
136146
bt = epoch_to_cobalt_begin_time(spec.begin_time)
137147
if bt:
138148
qsub_cmd.extend(["--at", bt])
139-
140-
# -O sets the prefix for output/error files
149+
150+
# Dependencies are specified with a colon-separated list of job IDs
151+
if spec.depends_on:
152+
dep_str = ":".join(spec.depends_on) if isinstance(spec.depends_on, list) else spec.depends_on
153+
qsub_cmd.extend(["--dependencies", dep_str])
154+
155+
# Node constraints are handled by --attrs
156+
if spec.constraints:
157+
qsub_cmd.extend(["--attrs", ":".join(spec.constraints)])
158+
159+
# -O sets the prefix for output/error files, which is derived from the job name.
141160
qsub_cmd.extend(["-O", job_name])
161+
# If explicit files are given, they override the prefix.
162+
if spec.output_file:
163+
qsub_cmd.extend(["-o", spec.output_file])
164+
if spec.error_file:
165+
qsub_cmd.extend(["-e", spec.error_file])
166+
167+
# Email notifications
168+
if spec.mail_user:
169+
qsub_cmd.extend(["-M", spec.mail_user])
170+
# Cobalt has a simple '--notify user' flag, equivalent to ALL in Slurm.
171+
if spec.mail_type:
172+
qsub_cmd.append("--notify user")
173+
142174

143175
if spec.environment:
144176
for k, v in spec.environment.items():
145177
qsub_cmd.extend(["--env", f"{k}={v}"])
178+
179+
# Note: Cobalt exclusive access is often handled by queue policy or `--mode script`.
180+
# We omit a direct flag to avoid conflicting with system-specific setups.
146181

147182
# Build the script that will be executed on the compute node
148183
exec_script_parts = ["#!/bin/bash", ""]
@@ -151,17 +186,23 @@ def convert(self, spec):
151186
aprun_cmd = ["aprun"]
152187

153188
# Match aprun geometry to qsub submission
189+
# -n total processes, -N processes per node
154190
aprun_cmd.extend(["-n", str(spec.num_tasks)])
155191
aprun_cmd.extend(["-N", str(spec.cpus_per_task)])
156192

157193
if spec.container_image:
158194
aprun_cmd.extend(["singularity", "exec", spec.container_image])
159-
if spec.executable:
160-
aprun_cmd.append(spec.executable)
161-
if spec.arguments:
162-
aprun_cmd.extend(spec.arguments)
195+
196+
# If spec.script is defined, it takes precedence over executable/arguments
197+
if spec.script:
198+
exec_script_parts.extend(spec.script)
199+
else:
200+
if spec.executable:
201+
aprun_cmd.append(spec.executable)
202+
if spec.arguments:
203+
aprun_cmd.extend(spec.arguments)
204+
exec_script_parts.append(" ".join(aprun_cmd))
163205

164-
exec_script_parts.append(" ".join(aprun_cmd))
165206
exec_script = "\n".join(exec_script_parts)
166207

167208
# Combine into a self-submitting script using a "here document"
@@ -198,49 +239,87 @@ def _parse(self, content, return_unhandled=False):
198239
i = 0
199240
while i < len(args):
200241
arg = args[i]
201-
val = args[i + 1] if i + 1 < len(args) else ""
202-
203-
if arg == "-A":
204-
spec.account = val
205-
i += 2
206-
elif arg == "-q":
207-
spec.queue = val
208-
i += 2
209-
elif arg == "-n":
210-
spec.num_nodes = int(val)
211-
i += 2
212-
elif arg == "-t":
213-
spec.wall_time = cobalt_walltime_to_seconds(val)
214-
i += 2
215-
elif arg == "--proccount":
216-
spec.num_tasks = int(val)
217-
i += 2
218-
elif arg == "-O":
219-
spec.job_name = val
220-
i += 2
221-
elif arg == "--at":
222-
spec.begin_time = cobalt_begin_time_to_epoch(val)
223-
i += 2
224-
else:
225-
not_handled.add(arg)
242+
243+
# Logic to handle both `--key value` and `--key=value`
244+
key, value = None, None
245+
if "=" in arg:
246+
key, value = arg.split("=", 1)
226247
i += 1
227-
248+
else:
249+
key = arg
250+
# Check if next part is a value or another flag
251+
if i + 1 < len(args) and not args[i + 1].startswith("-"):
252+
value = args[i + 1]
253+
i += 2
254+
else: # It's a boolean flag
255+
value = True
256+
i += 1
257+
258+
key = key.lstrip("-")
259+
260+
if key == "A":
261+
spec.account = value
262+
elif key == "q":
263+
spec.queue = value
264+
elif key == "n":
265+
spec.num_nodes = int(value)
266+
elif key == "t":
267+
spec.wall_time = cobalt_walltime_to_seconds(value)
268+
elif key == "proccount":
269+
spec.num_tasks = int(value)
270+
elif key == "O":
271+
# This sets the job name AND the output file prefix
272+
spec.job_name = value
273+
if not spec.output_file: spec.output_file = f"{value}.output"
274+
if not spec.error_file: spec.error_file = f"{value}.error"
275+
elif key == "o":
276+
spec.output_file = value
277+
elif key == "e":
278+
spec.error_file = value
279+
elif key == "at":
280+
spec.begin_time = cobalt_begin_time_to_epoch(value)
281+
elif key == "dependencies":
282+
spec.depends_on = value.split(":")
283+
elif key == "attrs":
284+
spec.constraints = value.split(":")
285+
elif key == "M":
286+
spec.mail_user = value
287+
elif key == "notify" and value == "user":
288+
spec.mail_type = ["ALL"] # Simple mapping
289+
elif key == "env":
290+
env_key, env_val = value.split("=", 1)
291+
spec.environment[env_key] = env_val
292+
else:
293+
not_handled.add(key)
294+
228295
# We again assume a block of text here.
229-
spec.script = script_body
296+
if script_body:
297+
spec.script = script_body
230298

231299
# Parse the execution command from the script body
232300
parts = parse_cobalt_command(spec.script, spec)
233301
if parts:
234302
# Need to parse aprun args to get cpus_per_task
235303
temp_args = parts.copy()
236-
if "-N" in temp_args:
237-
idx = temp_args.index("-N")
238-
spec.cpus_per_task = int(temp_args[idx + 1])
239-
temp_args.pop(idx)
240-
temp_args.pop(idx)
241-
242-
spec.executable = temp_args[0]
243-
spec.arguments = temp_args[1:]
304+
# This is a bit simplistic, but covers the common case
305+
try:
306+
if "-N" in temp_args:
307+
idx = temp_args.index("-N")
308+
spec.cpus_per_task = int(temp_args[idx + 1])
309+
temp_args.pop(idx)
310+
temp_args.pop(idx)
311+
# Also handle -n for total tasks if --proccount wasn't used
312+
if "-n" in temp_args and spec.num_tasks == 1:
313+
idx = temp_args.index("-n")
314+
spec.num_tasks = int(temp_args[idx + 1])
315+
temp_args.pop(idx)
316+
temp_args.pop(idx)
317+
except (ValueError, IndexError):
318+
pass # Ignore if parsing aprun fails
319+
320+
if temp_args:
321+
spec.executable = temp_args[0]
322+
spec.arguments = temp_args[1:]
244323

245324
if return_unhandled:
246325
return not_handled

fractale/transformer/common.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,12 @@ class JobSpec:
5050
# Dependencies and script
5151
depends_on: Optional[Union[str, List[str]]] = None
5252
script: List[str] = field(default_factory=list)
53+
54+
array_spec: Optional[str] = None
55+
generic_resources: Optional[str] = None
56+
mail_user: Optional[str] = None
57+
mail_type: List[str] = field(default_factory=list)
58+
requeue: Optional[bool] = None
59+
nodelist: Optional[str] = None
60+
exclude_nodes: Optional[str] = None
61+
licenses: Optional[str] = None

0 commit comments

Comments
 (0)