Skip to content

Commit 1c90468

Browse files
authored
Merge pull request #9 from compspec/add-features
feat: support for job manager features;
2 parents 8be8e1a + b1ae51a commit 1c90468

File tree

10 files changed

+141
-51
lines changed

10 files changed

+141
-51
lines changed

fractale/transformer/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ def add(self, name: str, value=None):
7878

7979
# Determine if it's a short (-n) or long (--tasks) option
8080
prefix = "-" if len(name) == 1 else "--"
81-
self.script_lines.append(f"{self.directive}: {prefix}{name}={value}")
81+
self.script_lines.append(f"{self.directive} {prefix}{name}={value}")
8282

8383
def add_flag(self, name: str):
8484
"""
8585
Add a boolean flag (e.g., --exclusive).
8686
"""
87-
self.script_lines.append(f"{self.directive}: --{name}")
87+
self.script_lines.append(f"{self.directive} --{name}")
8888

8989
def render(self) -> str:
9090
"""

fractale/transformer/cobalt/transform.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,12 @@ def convert(self, spec):
154154
)
155155
qsub_cmd.extend(["--dependencies", dep_str])
156156

157-
# Node constraints are handled by --attrs
158-
if spec.constraints:
159-
qsub_cmd.extend(["--attrs", ":".join(spec.constraints)])
157+
# Node constraints and GPU type are handled by --attrs
158+
attrs = list(spec.constraints)
159+
if spec.gpu_type:
160+
attrs.append(f"gpu_type={spec.gpu_type}")
161+
if attrs:
162+
qsub_cmd.extend(["--attrs", ":".join(attrs)])
160163

161164
# -O sets the prefix for output/error files, which is derived from the job name.
162165
qsub_cmd.extend(["-O", job_name])
@@ -284,7 +287,11 @@ def _parse(self, content, return_unhandled=False):
284287
elif key == "dependencies":
285288
spec.depends_on = value.split(":")
286289
elif key == "attrs":
287-
spec.constraints = value.split(":")
290+
for attr in value.split(":"):
291+
if attr.startswith("gpu_type="):
292+
spec.gpu_type = attr.split("=", 1)[1]
293+
else:
294+
spec.constraints.append(attr)
288295
elif key == "M":
289296
spec.mail_user = value
290297
elif key == "notify" and value == "user":

fractale/transformer/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import sys
21
from dataclasses import dataclass, field
32
from typing import Dict, List, Optional, Union
43

@@ -32,6 +31,7 @@ class JobSpec:
3231
cpus_per_task: int = 1
3332
mem_per_task: Optional[str] = None
3433
gpus_per_task: int = 0
34+
gpu_type: Optional[str] = None
3535

3636
# Scheduling and Constraints
3737
wall_time: Optional[int] = None

fractale/transformer/flux/transform.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import re
2+
import shlex
3+
from typing import Optional
4+
15
from fractale.logger.generate import JobNamer
26
from fractale.transformer.base import Script, TransformerBase
37
from fractale.transformer.flux.validate import Validator
@@ -15,22 +19,15 @@ def priority_to_flux_priority(class_name):
1519
numerical priority value. This is the reverse of map_numeric_priority_to_class_name.
1620
"""
1721
# Define the mapping from the string class back to a representative number.
18-
mapping = {
19-
"low": 15,
20-
"normal": 16,
21-
"high": 50,
22-
"urgent": 100,
23-
}
22+
mapping = {"low": 15, "normal": 16, "high": 50, "urgent": 100}
23+
2424
# If we don't get it, default to Flux's default
2525
return mapping.get(class_name, 16)
2626

2727

2828
class FluxTransformer(TransformerBase):
2929
"""
30-
A Flux Transformer is a very manual way to transform a subsystem into
31-
a batch script. I am not even using jinja templates, I'm just
32-
parsing the subsystems in a sort of manual way. This a filler,
33-
and assuming that we will have an LLM that can replace this.
30+
A Flux Transformer for converting a generic JobSpec into a Flux batch script.
3431
"""
3532

3633
def _parse(self, jobspec, return_unhandled=False):
@@ -97,6 +94,10 @@ def convert(self, spec):
9794
script.add("c", spec.cpus_per_task if spec.cpus_per_task > 1 else None)
9895
script.add("gpus-per-task", spec.gpus_per_task if spec.gpus_per_task > 0 else None)
9996

97+
# Add a constraint for the specific GPU type, if provided
98+
# We could probably add gpu_type to requires if an admin configures it,
99+
# but it's too risky.
100+
100101
# Scheduling Directives
101102
if spec.exclusive_access:
102103
script.add_flag("exclusive")
@@ -106,7 +107,7 @@ def convert(self, spec):
106107
script.add("t", spec.wall_time)
107108

108109
flux_prio = priority_to_flux_priority(spec.priority)
109-
if flux_prio != 0:
110+
if flux_prio != 16:
110111
script.add("urgency", flux_prio)
111112
script.newline()
112113

fractale/transformer/kubernetes/transform.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
# Assume GPUs are NVIDIA
1212
gpu_resource_name = "nvidia.com/gpu"
13+
gpu_product_label = "nvidia.com/gpu.product"
1314

1415

1516
def normalize_cpu_request(cpus: int) -> str:
@@ -37,7 +38,7 @@ def normalize_memory_request(mem_str):
3738
return mem_str
3839

3940

40-
def parse_memory(self, mem_str: str) -> str:
41+
def parse_memory(mem_str: str) -> str:
4142
"""
4243
Converts K8s memory (e.g., 1Gi) to JobSpec format (e.g., 1G).
4344
"""
@@ -53,7 +54,7 @@ def parse_memory(self, mem_str: str) -> str:
5354
return mem_str
5455

5556

56-
def parse_cpu(self, cpu_str: str) -> int:
57+
def parse_cpu(cpu_str: str) -> int:
5758
"""
5859
Converts K8s CPU string to an integer. Assumes no millicores.
5960
"""
@@ -125,12 +126,14 @@ def convert(self, spec):
125126
if spec.environment:
126127
container["env"] = [{"name": k, "value": v} for k, v in spec.environment.items()]
127128

129+
# This is the spec for the pod template
130+
template_pod_spec = {"containers": [container], "restartPolicy": "Never"}
128131
pod_spec = {
129132
"apiVersion": "batch/v1",
130133
"kind": "Job",
131134
"metadata": {"name": job_name},
132135
"spec": {
133-
"template": {"spec": {"containers": [container], "restartPolicy": "Never"}},
136+
"template": {"spec": template_pod_spec},
134137
"backoffLimit": 0,
135138
},
136139
}
@@ -158,7 +161,7 @@ def convert(self, spec):
158161
job_spec = {
159162
"parallelism": spec.num_nodes,
160163
"completions": spec.num_nodes,
161-
"backoffLimit": 4, # A sensible default
164+
"backoffLimit": 4,
162165
"template": {"metadata": {"labels": {"job-name": spec.job_name}}, "spec": pod_spec},
163166
}
164167

@@ -241,6 +244,11 @@ def parse(self, job_manifest):
241244
if cpu_val == 1:
242245
spec.cpus_per_task = 1
243246

247+
# GPU Type from Node Selector
248+
node_selector = pod_spec.get("nodeSelector", {})
249+
if gpu_product_label in node_selector:
250+
spec.gpu_type = node_selector[gpu_product_label]
251+
244252
# Scheduling
245253
if pod_spec.get("priorityClassName"):
246254
try:

fractale/transformer/lsf/transform.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,15 @@ def convert(self, spec):
160160

161161
# Build the complex -R "select[...] span[...] rusage[...]" string
162162
r_parts = []
163-
if spec.constraints:
164-
r_parts.append(f'select[{":".join(spec.constraints)}]')
163+
164+
# Handle select criteria, including GPU type
165+
select_criteria = list(spec.constraints)
166+
167+
# I'm not sure this would actually work
168+
if spec.gpu_type:
169+
select_criteria.append(spec.gpu_type)
170+
if select_criteria:
171+
r_parts.append(f'select[{":".join(select_criteria)}]')
165172

166173
if spec.num_nodes > 1 and spec.num_tasks > 0:
167174
tasks_per_node = spec.num_tasks // spec.num_nodes
@@ -252,6 +259,9 @@ def _parse(self, content, return_unhandled=False):
252259
command_lines = []
253260
not_handled = set()
254261

262+
# Heuristic list of common GPU names to identify as gpu_type
263+
known_gpu_types = {"a100", "v100", "h100", "a30", "a40", "mi250"}
264+
255265
for line in content.splitlines():
256266
if not line.strip():
257267
continue
@@ -314,7 +324,13 @@ def _parse(self, content, return_unhandled=False):
314324
if spec.num_tasks > 0 and tasks_per_node > 0:
315325
spec.num_nodes = spec.num_tasks // tasks_per_node
316326
if select_match:
317-
spec.constraints.extend(select_match.group(1).split(":"))
327+
criteria = select_match.group(1).split(":")
328+
for criterion in criteria:
329+
# If a criterion is a known GPU type, set it and move on
330+
if criterion.lower() in known_gpu_types:
331+
spec.gpu_type = criterion
332+
else:
333+
spec.constraints.append(criterion)
318334
else:
319335
not_handled.add(key)
320336
continue

fractale/transformer/moab/transform.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import re
22
import shlex
3-
from datetime import timedelta
3+
from datetime import datetime, timedelta
44

55
import fractale.utils as utils
66
from fractale.logger.generate import JobNamer
@@ -169,8 +169,19 @@ def convert(self, spec) -> str:
169169

170170
# Resource Requests
171171
resource_parts = []
172-
if spec.num_nodes and spec.cpus_per_task:
173-
resource_parts.append(f"nodes={spec.num_nodes}:ppn={spec.cpus_per_task}")
172+
node_spec = []
173+
if spec.num_nodes:
174+
node_spec.append(f"nodes={spec.num_nodes}")
175+
if spec.cpus_per_task:
176+
node_spec.append(f"ppn={spec.cpus_per_task}")
177+
if spec.gpus_per_task > 0:
178+
node_spec.append(f"gpus={spec.gpus_per_task}")
179+
if spec.gpu_type:
180+
# Add gpu type as a feature request
181+
node_spec.append(spec.gpu_type)
182+
183+
if node_spec:
184+
resource_parts.append(":".join(node_spec))
174185

175186
if spec.generic_resources:
176187
resource_parts.append(f"gres={spec.generic_resources}")
@@ -307,15 +318,26 @@ def _parse(self, filename, return_unhandled=False):
307318
for part in shlex.split(full_l_string):
308319

309320
# Split combined node:ppn requests first
310-
if "nodes" in part and ":" in part:
321+
if ":" in part:
322+
node_features = []
311323
for subpart in part.split(":"):
312324
if "=" not in subpart:
325+
# This is a feature request, like "gtx1080"
326+
node_features.append(subpart)
313327
continue
328+
314329
k, v = subpart.split("=", 1)
315330
if k == "nodes":
316331
spec.num_nodes = int(v)
317332
elif k == "ppn":
318333
spec.cpus_per_task = int(v)
334+
elif k == "gpus":
335+
spec.gpus_per_task = int(v)
336+
337+
# Heuristic: If we found GPUs and other features, assume the first
338+
# other feature is the gpu_type.
339+
if spec.gpus_per_task > 0 and node_features:
340+
spec.gpu_type = node_features[0]
319341

320342
elif "=" in part:
321343
k, v = part.split("=", 1)
@@ -329,7 +351,7 @@ def _parse(self, filename, return_unhandled=False):
329351
spec.num_tasks = int(v)
330352
elif k == "mem":
331353
spec.mem_per_task = v
332-
elif k == "gres" or k == "gpus":
354+
elif k == "gres":
333355
spec.generic_resources = v
334356
elif k == "depend":
335357
spec.depends_on = v

fractale/transformer/oar/transform.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ def convert(self, spec):
182182
# This requests nodes that *each* have at least this many GPUs.
183183
l_parts.append(f"/gpunum={spec.gpus_per_task}")
184184

185+
# Add the specific GPU type as a resource property
186+
if spec.gpu_type:
187+
l_parts.append(f"/gpu_model='{spec.gpu_type}'")
188+
185189
resource_str = "".join(l_parts)
186190

187191
# Node constraints are added as properties to the resource string.
@@ -311,6 +315,8 @@ def _parse(self, content, return_unhandled=False):
311315
spec.cpus_per_task = int(v)
312316
elif k == "gpunum":
313317
spec.gpus_per_task = int(v)
318+
elif k == "gpu_model":
319+
spec.gpu_type = v.strip("'")
314320
else:
315321
# Assume parts without '=' are constraints
316322
spec.constraints.append(part.strip().strip("'"))

fractale/transformer/pbs/transform.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,21 @@ def convert(self, spec):
148148
# Resource Selection (-l)
149149
select_parts = []
150150
if spec.num_nodes > 0:
151-
select_parts.append(f"select={spec.num_nodes}")
152-
153-
# mpiprocs is often used to specify total tasks, which works well with our spec
154-
if spec.num_tasks > 1:
155-
select_parts.append(f"mpiprocs={spec.num_tasks}")
151+
# Build the select statement parts
152+
node_spec = [f"select={spec.num_nodes}"]
153+
if spec.cpus_per_task > 1:
154+
node_spec.append(f"ncpus={spec.cpus_per_task}")
155+
if spec.gpus_per_task > 0:
156+
node_spec.append(f"ngpus={spec.gpus_per_task}")
157+
# I am not clear difference between gpus and accelerators
158+
# but this seems supported - would need to test
159+
if spec.gpu_type:
160+
node_spec.append(f"accelerator_type={spec.gpu_type}")
161+
# mpiprocs is often used to specify total tasks, which works well with our spec
162+
if spec.num_tasks > 1:
163+
node_spec.append(f"mpiprocs={spec.num_tasks}")
156164

157-
if spec.cpus_per_task > 1:
158-
select_parts.append(f"ncpus={spec.cpus_per_task}")
159-
if spec.gpus_per_task > 0:
160-
select_parts.append(f"ngpus={spec.gpus_per_task}")
165+
select_parts.append(":".join(node_spec))
161166

162167
# PBS memory format often includes units like gb or mb
163168
if spec.mem_per_task:
@@ -167,7 +172,7 @@ def convert(self, spec):
167172
mem_val += "b"
168173
select_parts.append(f"mem={mem_val}")
169174

170-
resource_str = ":".join(select_parts)
175+
resource_str = ",".join(select_parts)
171176

172177
wt = seconds_to_pbs(spec.wall_time)
173178
if wt:
@@ -289,7 +294,7 @@ def _parse(self, content, return_unhandled=False):
289294
if k == "walltime":
290295
spec.wall_time = pbs_time_to_seconds(v)
291296
elif k == "select":
292-
# select=N:ncpus=C:mpiprocs=T...
297+
# select=N:ncpus=C:mpiprocs=T:gpu_type=a100...
293298
select_parts = v.split(":")
294299
spec.num_nodes = int(select_parts[0])
295300
for sp in select_parts[1:]:
@@ -298,6 +303,8 @@ def _parse(self, content, return_unhandled=False):
298303
spec.cpus_per_task = int(sv)
299304
elif sk == "ngpus":
300305
spec.gpus_per_task = int(sv)
306+
elif sk == "gpu_type":
307+
spec.gpu_type = sv
301308
elif sk == "mem":
302309
spec.mem_per_task = sv.upper().replace("B", "")
303310
elif sk == "mpiprocs":

0 commit comments

Comments
 (0)