Skip to content

Commit fcc6b38

Browse files
committed
feat: add slurm transformer
Signed-off-by: vsoch <[email protected]>
1 parent 9e38a04 commit fcc6b38

File tree

9 files changed

+495
-17
lines changed

9 files changed

+495
-17
lines changed

examples/transform/flux-to-kubernetes/README.md renamed to examples/transform/flux/README.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
This is an example of doing a transformation between types. We do a simple mapping of parameters.
44
To start testing, we will assume one node runs, and of the equivalent container. This way we can create a Job in Kubernetes without considering MPI networking.
55

6+
## Flux to Kubernetes
7+
68
```bash
79
# Print pretty
8-
fractale transform --to kubernetes --from flux ./flux_batch.sh --pretty
10+
fractale transform --to kubernetes --from flux ./flux-batch.sh --pretty
911

1012
# Print as raw yaml (to pipe to file)
11-
fractale transform --to kubernetes --from flux ./flux_batch.sh
13+
fractale transform --to kubernetes --from flux ./flux-batch.sh
1214
```
1315
```console
1416
apiVersion: batch/v1
@@ -48,3 +50,10 @@ spec:
4850
cpu: '64'
4951
restartPolicy: Never
5052
```
53+
54+
## Flux to Slurm
55+
56+
```bash
57+
fractale transform --to slurm --from flux ./flux-batch.sh --pretty
58+
fractale transform --to slurm --from flux ./flux-batch.sh
59+
```

fractale/cli/transform.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import sys
55

66
import yaml
7+
from rich import print
78
from rich.pretty import pprint
89

910
from fractale.transformer import get_transformer
@@ -25,7 +26,9 @@ def main(args, extra, **kwargs):
2526
normalized_jobspec = from_transformer.parse(args.jobspec)
2627
final_jobspec = to_transformer.convert(normalized_jobspec)
2728

28-
if args.pretty:
29+
if args.pretty and args.to_transformer in ["slurm"]:
30+
print(final_jobspec)
31+
elif args.pretty:
2932
pprint(final_jobspec, indent_guides=True)
3033
elif args.to_transformer in ["kubernetes"]:
3134
yaml.dump(final_jobspec, sys.stdout, sort_keys=True, default_flow_style=False)

fractale/transformer/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from .flux import Transformer as FluxTransformer
22
from .kubernetes import Transformer as KubernetesTransformer
3+
from .slurm import Transformer as SlurmTransformer
34

45
plugins = {
56
"kubernetes": KubernetesTransformer,
67
"flux": FluxTransformer,
8+
"slurm": SlurmTransformer,
79
}
810

911

fractale/transformer/common.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ class JobSpec:
3434
gpus_per_task: int = 0
3535

3636
# Scheduling and Constraints
37-
wall_time: Optional[str] = None
37+
wall_time: Optional[int] = None
3838
queue: Optional[str] = None
3939
priority: Optional[int] = None
4040
exclusive_access: bool = False
4141
constraints: List[str] = field(default_factory=list)
42-
begin_time: Optional[str] = None
42+
begin_time: Optional[int] = None
4343

4444
# Environment and I/O
4545
environment: Dict[str, str] = field(default_factory=dict)

fractale/transformer/flux/workload.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
import copy
2-
import json
3-
4-
from fractale.logger import LogColors
51
from fractale.logger.generate import JobNamer
62
from fractale.transformer.base import TransformerBase
73
from fractale.transformer.flux.validate import Validator
@@ -34,7 +30,7 @@ def run(self, matches, jobspec):
3430
# We need to artificially parse the match metadata
3531
# This is handled by the solver, because each solver can
3632
# hold and represent metadata differently.
37-
for cluster, subsystems in matches.matches.items():
33+
for _, subsystems in matches.matches.items():
3834

3935
# There are two strategies we could take here. To update the flux
4036
# jobscript to have a batch script (more hardened, but doesn't

fractale/transformer/kubernetes/transform.py

Lines changed: 97 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
#!/usr/bin/env python3
22

3-
import argparse
4-
import os
53
import re
6-
import sys
74

85
import yaml
96

107
from fractale.logger.generate import JobNamer
118
from fractale.transformer.base import TransformerBase
9+
from fractale.transformer.common import JobSpec
1210

1311
# Assume GPUs are NVIDIA
1412
gpu_resource_name = "nvidia.com/gpu"
@@ -39,6 +37,31 @@ def normalize_memory_request(mem_str):
3937
return mem_str
4038

4139

40+
def parse_memory(self, mem_str: str) -> str:
41+
"""
42+
Converts K8s memory (e.g., 1Gi) to JobSpec format (e.g., 1G).
43+
"""
44+
if not mem_str:
45+
return None
46+
mem_str = mem_str.upper()
47+
if mem_str.endswith("GI"):
48+
return mem_str.replace("GI", "G")
49+
if mem_str.endswith("MI"):
50+
return mem_str.replace("MI", "M")
51+
if mem_str.endswith("KI"):
52+
return mem_str.replace("KI", "K")
53+
return mem_str
54+
55+
56+
def parse_cpu(self, cpu_str: str) -> int:
57+
"""
58+
Converts K8s CPU string to an integer. Assumes no millicores.
59+
"""
60+
if not cpu_str:
61+
return 1
62+
return int(cpu_str)
63+
64+
4265
def get_resources(spec):
4366
"""
4467
Get Kubernetes resources from standard jobspec
@@ -68,10 +91,8 @@ def get_resources(spec):
6891

6992
class KubernetesTransformer(TransformerBase):
7093
"""
71-
A Flux Transformer is a very manual way to transform a subsystem into
72-
a batch script. I am not even using jinja templates, I'm just
73-
parsing the subsystems in a sort of manual way. This a filler,
74-
and assuming that we will have an LLM that can replace this.
94+
A Kubernetes Transformer is a very manual transformation to convert
95+
a standard JobSpec to a Kubernetes Job.
7596
"""
7697

7798
def convert(self, spec):
@@ -159,3 +180,72 @@ def convert(self, spec):
159180
job["metadata"].setdefault("labels", {})
160181
job["metadata"]["labels"]["account"] = spec.account
161182
return job
183+
184+
def parse(self, job_manifest):
185+
"""
186+
Parses a Kubernetes Job manifest (dict or YAML string) into a JobSpec.
187+
"""
188+
if isinstance(job_manifest, str):
189+
manifest = yaml.safe_load(job_manifest)
190+
else:
191+
manifest = job_manifest
192+
193+
spec = JobSpec()
194+
195+
# Metadata
196+
metadata = manifest.get("metadata", {})
197+
spec.job_name = metadata.get("name")
198+
spec.account = metadata.get("labels", {}).get("account")
199+
200+
# Job Spec and template
201+
job_spec = manifest.get("spec", {})
202+
spec.num_nodes = job_spec.get("parallelism", 1)
203+
spec.wall_time = job_spec.get("activeDeadlineSeconds")
204+
pod_template = job_spec.get("template", {})
205+
pod_spec = pod_template.get("spec", {})
206+
207+
if not pod_spec.get("containers"):
208+
raise ValueError("Kubernetes manifest has no containers to parse.")
209+
210+
containers = pod_spec["containers"]
211+
if len(containers) > 1:
212+
print("Warning: job has >1 container, will use first.")
213+
214+
container = containers[0]
215+
spec.container_image = container.get("image")
216+
spec.executable = container.get("command")
217+
spec.arguments = container.get("args", [])
218+
spec.working_directory = container.get("workingDir")
219+
220+
# Environment
221+
env_list = container.get("env", [])
222+
if env_list:
223+
spec.environment = {item["name"]: item["value"] for item in env_list}
224+
225+
# Resources
226+
resources = container.get("resources", {})
227+
limits = resources.get("limits", {})
228+
requests = resources.get("requests", {})
229+
230+
if gpu_resource_name in limits:
231+
spec.gpus_per_task = int(limits[gpu_resource_name])
232+
233+
if "memory" in requests:
234+
spec.mem_per_task = parse_memory(requests["memory"])
235+
236+
if "cpu" in requests:
237+
cpu_val = parse_cpu(requests["cpu"])
238+
# convert uses num_tasks for the CPU request
239+
# if it's > 1, otherwise it uses cpus_per_task. We map it back to num_tasks.
240+
spec.num_tasks = cpu_val
241+
if cpu_val == 1:
242+
spec.cpus_per_task = 1
243+
244+
# Scheduling
245+
if pod_spec.get("priorityClassName"):
246+
try:
247+
spec.priority = int(pod_spec.get("priorityClassName"))
248+
except (ValueError, TypeError):
249+
spec.priority = None # Ignore if not a valid integer string
250+
251+
return spec
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .transform import SlurmTransformer as Transformer
2+
3+
assert Transformer

0 commit comments

Comments
 (0)