Skip to content

Commit 9e38a04

Browse files
authored
Merge pull request #4 from compspec/add-k8s-transformer
feat: simple transform utility
2 parents 3032d16 + bd9e724 commit 9e38a04

File tree

16 files changed

+633
-3
lines changed

16 files changed

+633
-3
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ This library is primarily being used for development for the descriptive thrust
99

1010
## Design
1111

12+
### Simple
13+
14+
We provide a simple translation layer between job specifications. We take the assumption that although each manager has many options, the actual options a user would use is a much smaller set, and it's relatively straight forward to translate (and have better accuracy).
15+
16+
See [examples/transform](examples/transform) for an example.
17+
18+
### Complex
19+
1220
We want to:
1321

1422
1. Generate software graphs for some cluster (fluxion JGF) (this is done with [compspec](https://github.com/compspec/compspec)

examples/fractale/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,3 +174,8 @@ Saving to "./examples/fractale/cluster-a-containment.png"
174174
```
175175

176176
<img src="./cluster-a-containment.svg">
177+
178+
179+
## Transform
180+
181+
This mirrors the previous transform functionality for the jobspec library. It's a simple conversion, and no LLMs needed.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Transform
2+
3+
This is an example of doing a transformation between types. We do a simple mapping of parameters.
4+
To start testing, we will assume one node runs, and of the equivalent container. This way we can create a Job in Kubernetes without considering MPI networking.
5+
6+
```bash
7+
# Print pretty
8+
fractale transform --to kubernetes --from flux ./flux_batch.sh --pretty
9+
10+
# Print as raw yaml (to pipe to file)
11+
fractale transform --to kubernetes --from flux ./flux_batch.sh
12+
```
13+
```console
14+
apiVersion: batch/v1
15+
kind: Job
16+
metadata:
17+
name: lammps
18+
spec:
19+
activeDeadlineSeconds: 100
20+
backoffLimit: 4
21+
completions: 1
22+
parallelism: 1
23+
template:
24+
metadata:
25+
labels:
26+
job-name: lammps
27+
spec:
28+
apiVersion: batch/v1
29+
kind: Job
30+
metadata:
31+
name: lammps
32+
spec:
33+
backoffLimit: 0
34+
template:
35+
spec:
36+
containers:
37+
- args:
38+
- lmp -v x 8 -v y 8 -v z 8 -in in.reaxc.hns -nocite
39+
command:
40+
- /bin/bash
41+
- -c
42+
image: ghcr.io/converged-computing/lammps-reax:ubuntu22.04
43+
name: lammps
44+
resources:
45+
limits:
46+
cpu: '64'
47+
requests:
48+
cpu: '64'
49+
restartPolicy: Never
50+
```
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/bin/bash
2+
#FLUX: -N 1
3+
#FLUX: -n 64
4+
#FLUX: -t 100s
5+
#FLUX: -o cpu-affinity=per-task
6+
#FLUX: --queue=pbatch
7+
#FLUX: --setattr=container_image=ghcr.io/converged-computing/lammps-reax:ubuntu22.04
8+
#FLUX: --output=job.{id}.out
9+
#FLUX: --error=job.{id}.err
10+
#FLUX: --job-name=lammps
11+
12+
lmp -v x 8 -v y 8 -v z 8 -in in.reaxc.hns -nocite

fractale/cli/__init__.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,29 @@ def get_parser():
7171
)
7272
generate.add_argument("-c", "--cluster", help="cluster name")
7373

74+
# Transform jobspecs from flux to Kubernetes (starting specific)
75+
transform = subparsers.add_parser(
76+
"transform",
77+
formatter_class=argparse.RawTextHelpFormatter,
78+
description="transform a jobspec",
79+
)
80+
transform.add_argument(
81+
"-t",
82+
"--to",
83+
dest="to_transformer",
84+
help="transform into this jobspec",
85+
default="kubernetes",
86+
)
87+
transform.add_argument(
88+
"-f", "--from", dest="from_transformer", help="transform from this jobspec", default="flux"
89+
)
90+
transform.add_argument(
91+
"--pretty",
92+
help="pretty print in the terminal",
93+
default=False,
94+
action="store_true",
95+
)
96+
7497
# run.add_argument("-t", "--transform", help="transformer to use", default="flux")
7598
save = subparsers.add_parser(
7699
"save",
@@ -101,7 +124,7 @@ def get_parser():
101124
script.add_argument(
102125
"--transformer", help="transformer to use", default="flux", choices=["flux"]
103126
)
104-
for cmd in [satisfy, script]:
127+
for cmd in [satisfy, script, transform]:
105128
cmd.add_argument("jobspec", help="jobspec yaml or json file")
106129

107130
for cmd in [satisfy, save, script]:
@@ -166,6 +189,8 @@ def help(return_code=0):
166189
from .script import main
167190
elif args.command == "save":
168191
from .save import main
192+
elif args.command == "transform":
193+
from .transform import main
169194
else:
170195
help(1)
171196
global registry

fractale/cli/script.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#!/usr/bin/env python
2+
3+
import json
4+
import sys
5+
6+
from rich import print, print_json
7+
from rich.json import JSON
8+
from rich.pretty import pprint
9+
10+
from fractale.store import FractaleStore
11+
from fractale.subsystem import get_subsystem_solver
12+
from fractale.transformer import get_transformer
13+
14+
15+
def main(args, extra, **kwargs):
16+
"""
17+
Save a cluster and subsystem graph.
18+
"""
19+
store = FractaleStore(args.config_dir)
20+
21+
# Prepare selector and transformer
22+
solver = get_subsystem_solver(store.clusters_root, args.solver)
23+
# This is probably overloaded, but we need to be able to look up
24+
# the templating logic for a subsystem
25+
transformer = get_transformer(args.transformer, args.selector, solver)
26+
matches = solver.satisfied(args.jobspec, return_results=True)
27+
if matches.count == 0:
28+
sys.exit(-1)
29+
30+
# Select the results to generate. This consolidates matches (which might include different nodes)
31+
# into clusters and groups of subsystems
32+
for script in transformer.render(matches, args.jobspec):
33+
# Generate batch script or jobspec and print
34+
pprint(script)

fractale/cli/transform.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/usr/bin/env python
2+
3+
import os
4+
import sys
5+
6+
import yaml
7+
from rich.pretty import pprint
8+
9+
from fractale.transformer import get_transformer
10+
11+
12+
def main(args, extra, **kwargs):
13+
"""
14+
Transform between jobspecs.
15+
"""
16+
# The jobspec needs to exist as a file here
17+
if not os.path.exists(args.jobspec):
18+
sys.exit(f"JobSpec {args.jobspec} does not exist.")
19+
20+
# No selector or solver, just manual transform
21+
from_transformer = get_transformer(args.from_transformer)
22+
to_transformer = get_transformer(args.to_transformer)
23+
24+
# Parse the jobspec to transform from
25+
normalized_jobspec = from_transformer.parse(args.jobspec)
26+
final_jobspec = to_transformer.convert(normalized_jobspec)
27+
28+
if args.pretty:
29+
pprint(final_jobspec, indent_guides=True)
30+
elif args.to_transformer in ["kubernetes"]:
31+
yaml.dump(final_jobspec, sys.stdout, sort_keys=True, default_flow_style=False)
32+
else:
33+
print(final_jobspec)

fractale/transformer/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
from .flux import Transformer as FluxTransformer
2+
from .kubernetes import Transformer as KubernetesTransformer
23

34
plugins = {
5+
"kubernetes": KubernetesTransformer,
46
"flux": FluxTransformer,
57
}
68

79

8-
def get_transformer(name, selector, solver):
10+
def get_transformer(name, selector="random", solver=None):
911
if name not in plugins:
1012
raise ValueError(f"{name} is not a valid transformer.")
1113
return plugins[name](selector, solver)

fractale/transformer/base.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,18 @@ def __init__(self, selector, solver):
2121
self.selector = get_selector(selector)
2222
self.solver = solver
2323

24+
def parse(self, *args, **kwargs):
25+
"""
26+
Parse converts the native jobspec to the standard JobSpec
27+
"""
28+
raise NotImplementedError
29+
30+
def convert(self, *args, **kwargs):
31+
"""
32+
Convert a normalized jobspec to the format here.
33+
"""
34+
raise NotImplementedError
35+
2436
def render(self, matches, jobspec):
2537
"""
2638
Run the transformer:

fractale/transformer/common.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import sys
2+
from dataclasses import dataclass, field
3+
from typing import Dict, List, Optional, Union
4+
5+
# Requires Python 3.8+ for dataclass
6+
7+
8+
@dataclass
9+
class JobSpec:
10+
"""
11+
A scheduler-agnostic data structure for defining a computational job.
12+
Version 2: Now includes accounting, priority, environment, and more constraints.
13+
"""
14+
15+
# Job Identity & Accounting
16+
job_name: Optional[str] = None
17+
account: Optional[str] = None
18+
19+
# What to Run
20+
executable: Optional[str] = None
21+
arguments: List[str] = field(default_factory=list)
22+
container_image: Optional[str] = None
23+
working_directory: Optional[str] = None
24+
25+
# Custom attributes or options
26+
attrs: Optional[dict] = field(default_factory=dict)
27+
options: Optional[dict] = field(default_factory=dict)
28+
29+
# Resource Requests ---
30+
num_tasks: int = 1
31+
num_nodes: int = 1
32+
cpus_per_task: int = 1
33+
mem_per_task: Optional[str] = None
34+
gpus_per_task: int = 0
35+
36+
# Scheduling and Constraints
37+
wall_time: Optional[str] = None
38+
queue: Optional[str] = None
39+
priority: Optional[int] = None
40+
exclusive_access: bool = False
41+
constraints: List[str] = field(default_factory=list)
42+
begin_time: Optional[str] = None
43+
44+
# Environment and I/O
45+
environment: Dict[str, str] = field(default_factory=dict)
46+
input_file: Optional[str] = None
47+
output_file: Optional[str] = None
48+
error_file: Optional[str] = None
49+
50+
# Dependencies
51+
depends_on: Optional[Union[str, List[str]]] = None

0 commit comments

Comments
 (0)