Skip to content

Commit 4db2cdf

Browse files
committed
feat: graph solver backend
Signed-off-by: vsoch <[email protected]>
1 parent 64e301f commit 4db2cdf

File tree

15 files changed

+3550
-98
lines changed

15 files changed

+3550
-98
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@ We want to:
1818

1919
See [examples/fractale](examples/fractale) for a detailed walk-through of the above.
2020

21+
For graph tool:
22+
23+
```bash
24+
conda install -c conda-forge graph-tool
25+
```
26+
27+
## Questions
28+
29+
- Should other subsystem types have edges? How used?
30+
- Should we try to map them to nodes in the graph or use another means (or assume global across cluster nodes as we do now)?
31+
- Can we simplify spack subsystem graph (it's really big...)
32+
2133
<!-- ⭐️ [Documentation](https://compspec.github.io/fractale) ⭐️ -->
2234

2335
## License

examples/fractale/README.md

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ fractale generate --cluster A spack /home/vanessa/Desktop/Code/spack
2828
Satisfy asks two questions:
2929

3030
1. Which clusters have the subsystem resources that I need?
31-
2. Which clusters have the job resources that I need?
31+
2. Which clusters have the job resources that I need (containment subsystem)?
3232

3333
This is the step where we want to say "Run gromacs on 2-4 nodes with these requirements." Since we haven't formalized a way to do that, I'm going to start with a flux jobspec, and then add attributes that can be used to search our subsystems. For example, I generated [software-gromacs.json](software-gromacs.json) with:
3434

@@ -52,6 +52,40 @@ fractale satisfy ./examples/fractale/software-curl.yaml
5252
fractale satisfy ./examples/fractale/software-curl.json
5353
```
5454

55+
Try the graph backend, install [graph-tool](https://graph-tool.skewed.de/installation.html) then:
56+
57+
```bash
58+
export PYTHONPATH=/home/vanessa/anaconda3/lib/python3.12/site-packages
59+
fractale satisfy --solver graph ./examples/fractale/software-curl.yaml
60+
fractale satisfy --solver graph ./examples/fractale/software-curl.json
61+
```
62+
```console
63+
=> 🍇 Loading cluster "a" subsystem "containment"
64+
=> 🍇 Loading cluster "a" subsystem "modules"
65+
=> 🍇 Loading cluster "a" subsystem "spack"
66+
=> Exploring cluster "a" containment subsystem
67+
(1/1) satisfied resource core
68+
Cluster "a" is a match
69+
```
70+
71+
Here is for a nested slot:
72+
73+
```bash
74+
fractale satisfy --solver graph ./examples/fractale/software-nested-slot.yaml
75+
```
76+
```console
77+
=> 🍇 Loading cluster "a" subsystem "containment"
78+
=> 🍇 Loading cluster "a" subsystem "modules"
79+
=> 🍇 Loading cluster "a" subsystem "spack"
80+
=> Exploring cluster "a" containment subsystem
81+
(1/1) satisfied resource socket
82+
(1/4) found resource core
83+
(2/4) found resource core
84+
(3/4) found resource core
85+
(4/4) satisfied resource core
86+
Cluster "a" is a match
87+
```
88+
5589
By default, the above assumes subsystems located in the fractale home. If you want to adjust that, set `fractale --config-dir=<path> satisfy...` to adjust that (and note you will need to have generated the tree here. What we basically do with satisfy is build a database with tables for:
5690

5791
- clusters
@@ -77,5 +111,27 @@ Here is a jobspec that can't be satisfied because we ask for too many resources
77111
```bash
78112
fractale satisfy ./examples/fractale/jobspec-containment-unsatisfied.yaml
79113
```
80-
114+
```console
115+
=> 🍇 Loading cluster "a" subsystem "containment"
116+
=> 🍇 Loading cluster "a" subsystem "modules"
117+
=> 🍇 Loading cluster "a" subsystem "spack"
118+
(2) SELECT * from subsystems WHERE type = 'software';
119+
=> No Matches due to containment
120+
```
81121
We likely want to have a more structured query syntax that can handle AND, OR, and other specifics. The actual search should remain general to support any generic key/value pair of attributes. My database structure and queries are also bad.
122+
123+
## Save
124+
125+
We can save an image of our subystem for a cluster. E.g.,
126+
127+
```bash
128+
$ fractale save a --out ./examples/fractale/cluster-a-containment.png
129+
```
130+
```console
131+
=> 🍇 Loading cluster "a" subsystem "containment"
132+
=> 🍇 Loading cluster "a" subsystem "modules"
133+
=> 🍇 Loading cluster "a" subsystem "spack"
134+
Saving to "./examples/fractale/cluster-a-containment.png"
135+
```
136+
137+
<img src="./cluster-a-containment.svg">

examples/fractale/cluster-a-containment-graph.svg

Lines changed: 2780 additions & 0 deletions
Loading
111 KB
Loading
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
version: 1
2+
resources:
3+
- type: slot
4+
count: 2
5+
with:
6+
- type: socket
7+
count: 1
8+
with:
9+
- type: core
10+
count: 4
11+
label: task
12+
tasks:
13+
- command:
14+
- gmx
15+
slot: task
16+
count:
17+
per_slot: 1
18+
attributes:
19+
system:
20+
duration: 0
21+
requires:
22+
software:
23+
- name: curl
24+
type: binary

fractale/cli/__init__.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,27 @@ def get_parser():
7272
generate.add_argument("-c", "--cluster", help="cluster name")
7373

7474
# run.add_argument("-t", "--transform", help="transformer to use", default="flux")
75+
save = subparsers.add_parser(
76+
"save",
77+
formatter_class=argparse.RawTextHelpFormatter,
78+
description="save a picture of a subsystem graph",
79+
)
80+
save.add_argument("cluster", help="cluster to save")
81+
save.add_argument(
82+
"--subsystem", help="cluster to save (defaults to containment)", default="containment"
83+
)
84+
save.add_argument("--out", help="output file name")
7585

7686
# This does just the user space subsystem match
7787
satisfy = subparsers.add_parser(
7888
"satisfy",
7989
formatter_class=argparse.RawTextHelpFormatter,
8090
description="determine clusters that satisfy a jobspec based on user subsystems",
8191
)
82-
for cmd in [satisfy]:
83-
cmd.add_argument("jobspec", help="jobspec yaml or json file")
92+
satisfy.add_argument("jobspec", help="jobspec yaml or json file")
93+
for cmd in [satisfy, save]:
8494
cmd.add_argument(
85-
"--backend",
95+
"--solver",
8696
help="subsystem solved backend",
8797
default=defaults.solver_backend_default,
8898
choices=defaults.solver_backends,
@@ -138,6 +148,8 @@ def help(return_code=0):
138148
from .generate_subsystem import main
139149
elif args.command == "satisfy":
140150
from .satisfy import main
151+
elif args.command == "save":
152+
from .save import main
141153
else:
142154
help(1)
143155
global registry

fractale/cli/satisfy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ def main(args, extra, **kwargs):
1212
This is a fairly simple (flat) check.
1313
"""
1414
store = FractaleStore(args.config_dir)
15-
solver = get_subsystem_solver(store.clusters_root, args.backend)
15+
solver = get_subsystem_solver(store.clusters_root, args.solver)
1616
is_satisfied = solver.satisfied(args.jobspec)
1717
sys.exit(0 if is_satisfied else -1)

fractale/cli/save.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/usr/bin/env python
2+
3+
import sys
4+
5+
from fractale.store import FractaleStore
6+
from fractale.subsystem import get_subsystem_solver
7+
8+
9+
def main(args, extra, **kwargs):
10+
"""
11+
Save a cluster and subsystem graph.
12+
"""
13+
store = FractaleStore(args.config_dir)
14+
solver = get_subsystem_solver(store.clusters_root, args.solver)
15+
outfile = args.out
16+
if not outfile:
17+
outfile = f"cluster-{args.cluster}-{args.subsystem}-{args.solver}.svg"
18+
solver.save(args.cluster, args.subsystem, outfile)

fractale/defaults.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
valid_settings = {"sharedfs", "stage"}
33
sharedfs = True
44
solver_backends = ["database", "graph"]
5-
solver_backend_default = "database"
5+
solver_backend_default = "graph"

fractale/jobspec.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
11
import copy
2+
from contextlib import contextmanager
3+
4+
import fractale.utils as utils
25

36

47
def flatten_jobspec_resources(jobspec):
58
"""
69
Given a jobspec, turn the required resources into a flattened version.
10+
11+
This is intended for subsystem matching that doesn't assume any structure.
12+
E.g., the database solved backend, which was first just for prototyping.
713
"""
814
resources = {}
915
resource_list = copy.deepcopy(jobspec["resources"])
1016
multiplier = 1
17+
18+
# Resource lists are nested, under "with"
1119
while resource_list:
1220
requires = resource_list.pop(0)
1321
resource_type = requires["type"]
1422
resource_count = requires.get("count")
23+
24+
# The slot is a marker for the set we care about matching
1525
if resource_type == "slot":
1626
multiplier = resource_count or 1
1727
else:
@@ -20,3 +30,146 @@ def flatten_jobspec_resources(jobspec):
2030
resources[resource_type] += resource_count * multiplier
2131
resource_list += requires.get("with") or []
2232
return resources
33+
34+
35+
def extract_slot(jobspec):
36+
"""
37+
Given a jobspec, parse into slot and return counts and requirements.
38+
This is intended for the containment subsystem.
39+
"""
40+
js = utils.load_jobspec(jobspec)
41+
slots = []
42+
43+
# Tasks will have per slot (this is a list of one item, required)
44+
# In the future Flux could theoretically support more than one task
45+
# (and slot) in which case we would need to select the label
46+
task = js["tasks"][0]
47+
slot_name = task["slot"]
48+
49+
# I'm not sure I've ever seen anything other than "per_slot"
50+
# Might as well raise an error and alert if that happens :)
51+
slot_count = task.get("count", {})
52+
if "per_slot" not in slot_count:
53+
raise ValueError(f"Unexpected value for tasks->slot->count: {slot_count}")
54+
55+
# Assume a default of one per slot
56+
slot_count = slot_count.get("per_slot") or 1
57+
58+
def check_resource(resource, is_slot=False):
59+
"""
60+
Recursive function to dig into resources under "with.
61+
Once we find the slot, we save the entire thing (with nesting)
62+
as a Slot class, which better exposes the counts, etc.
63+
"""
64+
if "with" in resource:
65+
for item in resource["with"]:
66+
if is_slot:
67+
new_slot = copy.deepcopy(item)
68+
total = item["count"] * slot_count
69+
slots.append(Slot(new_slot, total=total))
70+
71+
# Again, we can eventually support multiple slots by
72+
# checking the label. Flux right now only expects one.
73+
check_resource(item, item["type"] == "slot")
74+
75+
# Kick it off, y'all
76+
for resource in js["resources"]:
77+
check_resource(resource, resource["type"] == "slot")
78+
return slots
79+
80+
81+
class Slot:
82+
"""
83+
A slot is like a wedge we stick in the containment graph, and say
84+
"We are defining our needs for the resources below this wedge, and we need
85+
this many of this wedge." Since we need to evaluate the resource counts
86+
dynamically, we provide a context-based function to do that. It will restore
87+
back to the original state when the context is left. The expected usage is:
88+
89+
# Temporary view of the slot to evaluate with (make changes to)
90+
with slot.evaluate() as evaluator:
91+
# This is the current spot we are at in the slot
92+
requires = evaluator.next_requirement()
93+
v_type, count = next(requires)
94+
95+
Check for a StopIteration exception to know when we are done.
96+
"""
97+
98+
def __init__(self, spec, total=1):
99+
# This is a nested structure, e.g.,
100+
# {'type': 'socket', 'count': 1, 'with': [{'type': 'core', 'count': 4}]}
101+
self.spec = spec
102+
self.total = total
103+
# This holds state for what we find during an evaluation
104+
self._found = {}
105+
106+
@property
107+
def start_type(self):
108+
"""
109+
The start type identifies the top of the slot
110+
"""
111+
return self.spec["type"]
112+
113+
@contextmanager
114+
def evaluate(self):
115+
"""
116+
Yield a temporary copy of the spec in case it is mutated.
117+
Also init and reset found, so we can evaluate multiple spots
118+
for satisfying the same slot instance.
119+
"""
120+
spec = copy.deepcopy(self.spec)
121+
self._found = {}
122+
try:
123+
yield self
124+
# Restore the original state, you mangy animal
125+
finally:
126+
self.spec = spec
127+
self._found = {}
128+
129+
def next_requirement(self):
130+
"""
131+
Yield requirements. We are strict for now, requiring that we see all levels.
132+
"""
133+
resources = [copy.deepcopy(self.spec)]
134+
while resources:
135+
resource = resources.pop(0)
136+
yield resource["type"], resource.get("count") or 1
137+
if "with" in resource:
138+
resources += resource["with"]
139+
140+
def count(self, v_type):
141+
"""
142+
Get the count for a resource type that has been found.
143+
"""
144+
return self._found.get(v_type) or 0
145+
146+
def found(self, v_type, count=1, needed=None):
147+
"""
148+
Indicate that a type was found.
149+
"""
150+
if v_type not in self._found:
151+
self._found[v_type] = 0
152+
self._found[v_type] += 1
153+
154+
# This tells the caller if we are done with the type.
155+
if needed is not None and self._found[v_type] == needed:
156+
return True
157+
return False
158+
159+
def satisfied(self):
160+
"""
161+
Determine if the slot is satisifed, meaning all needed counts
162+
are <= 0. This should be run in the context of evaluate.
163+
"""
164+
# These are all the requirements
165+
needed = {}
166+
for requires_type, requires_count in self.next_requirement():
167+
needed[requires_type] = requires_count
168+
169+
updated = copy.deepcopy(needed)
170+
for found_type, found_count in self._found.items():
171+
if found_count >= needed[found_type]:
172+
del updated[found_type]
173+
174+
# If updated is empty, we got all needed
175+
return not updated

0 commit comments

Comments
 (0)