Skip to content

Commit f5dfdf5

Browse files
committed
wip: creation of subsystem solver as a plugin
We want solvers for subsystems (what determines if something is a valid match) to be an interface. E.g., the first approach I took was a database, but I also want to try a python (C++ backed) library that creates a graph. Mostly because that way would be really fun. Signed-off-by: vsoch <[email protected]>
1 parent eea6fbd commit f5dfdf5

File tree

8 files changed

+323
-231
lines changed

8 files changed

+323
-231
lines changed

examples/fractale/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ fractale generate --cluster A spack /home/vanessa/Desktop/Code/spack
2525

2626
## Satisfy Request
2727

28+
Satisfy asks two questions:
29+
30+
1. Which clusters have the subsystem resources that I need?
31+
2. Which clusters have the job resources that I need?
32+
2833
This is the step where we want to say "Run gromacs on 2-4 nodes with these requirements." Since we haven't formalized a way to do that, I'm going to start with a flux jobspec, and then add attributes that can be used to search our subsystems. For example, I generated [software-gromacs.json](software-gromacs.json) with:
2934

3035
```bash

fractale/cli/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from compspec.plugin.registry import PluginRegistry
99

1010
import fractale
11+
import fractale.defaults as defaults
1112
from fractale.logger import setup_logger
1213

1314
# Generate the plugin registry to add parsers
@@ -80,6 +81,12 @@ def get_parser():
8081
)
8182
for cmd in [satisfy]:
8283
cmd.add_argument("jobspec", help="jobspec yaml or json file")
84+
cmd.add_argument(
85+
"--backend",
86+
help="subsystem solved backend",
87+
default=defaults.solver_backend_default,
88+
choices=defaults.solver_backends,
89+
)
8390

8491
extractors = generate.add_subparsers(
8592
title="generate",

fractale/cli/satisfy.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import sys
44

55
from fractale.store import FractaleStore
6-
from fractale.subsystem import get_subsystem_registry
6+
from fractale.subsystem import get_subsystem_solver
77

88

99
def main(args, extra, **kwargs):
@@ -12,6 +12,6 @@ def main(args, extra, **kwargs):
1212
This is a fairly simple (flat) check.
1313
"""
1414
store = FractaleStore(args.config_dir)
15-
registry = get_subsystem_registry(store.clusters_root)
16-
is_satisfied = registry.satisfied(args.jobspec)
15+
solver = get_subsystem_solver(store.clusters_root, args.backend)
16+
is_satisfied = solver.satisfied(args.jobspec)
1717
sys.exit(0 if is_satisfied else -1)

fractale/defaults.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
fractale_dir = ".fractale"
22
valid_settings = {"sharedfs", "stage"}
33
sharedfs = True
4+
solver_backends = ["database", "graph"]
5+
solver_backend_default = "database"

fractale/subsystem/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import os
22

3-
from .subsystem import SubsystemRegistry
3+
from .subsystem import SubsystemSolver
44

55

6-
def get_subsystem_registry(path):
6+
def get_subsystem_solver(path, backend="database"):
77
"""
88
Generate a user subsystem registry, where the structure is expected
99
to be a set of <cluster>/<subsystem>. For the FractaleStore, this
@@ -13,4 +13,4 @@ def get_subsystem_registry(path):
1313
raise ValueError(f"Cluster subsystem root {path} does not exist")
1414

1515
# Generate the subsystem registry
16-
return SubsystemRegistry(path)
16+
return SubsystemSolver(path, backend)

fractale/subsystem/solver/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import os
2+
3+
from .database import DatabaseSolver
4+
5+
6+
def load_solver(backend, path):
7+
"""
8+
Load the solver backend
9+
"""
10+
if backend == "database":
11+
return DatabaseSolver(path)
12+
13+
raise ValueError(f"Unsupported backend {backend}")

fractale/subsystem/solver/database.py

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
import os
2+
import sqlite3
3+
4+
import fractale.subsystem.queries as queries
5+
import fractale.utils as utils
6+
from fractale.logger import LogColors, logger
7+
8+
9+
class DatabaseSolver:
10+
"""
11+
A database solver solves for a cluster based on a simple database.
12+
13+
TODO: we need to have counters or another strategy for containment.
14+
"""
15+
16+
def __init__(self, path):
17+
self.systems = {}
18+
self.conn = sqlite3.connect(":memory:")
19+
self.create_tables()
20+
self.load(path)
21+
22+
def __exit__(self):
23+
self.close()
24+
25+
def close(self):
26+
self.conn.close()
27+
28+
def create_tables(self):
29+
"""
30+
Create tables for subsytems, nodes, edges.
31+
32+
Note that I'm flattening the graph, so edges become attributes for
33+
nodes so it's easy to query. This is a reasonable first shot over
34+
implementing an actual graph database.
35+
"""
36+
cursor = self.conn.cursor()
37+
38+
# Only save metadata we absolutely need
39+
# Note I'm not saving edges because we don't use
40+
# them for anything - we are going to parse them
41+
# into node attributes instead.
42+
create_sql = [
43+
queries.create_subsystem_sql,
44+
queries.create_clusters_sql,
45+
queries.create_nodes_sql,
46+
queries.create_attributes_sql,
47+
]
48+
for sql in create_sql:
49+
cursor.execute(sql)
50+
self.conn.commit()
51+
52+
def load(self, path):
53+
"""
54+
Load a group of subsystem files, typically json JGF.
55+
"""
56+
from fractale.subsystem.subsystem import Subsystem
57+
58+
if not os.path.exists(path):
59+
raise ValueError(f"User subsystem directory {path} does not exist.")
60+
files = utils.recursive_find(path, "graph[.]json")
61+
if not files:
62+
raise ValueError(f"There are no cluster subsystems defined under root {path}")
63+
for filename in files:
64+
new_subsystem = Subsystem(filename)
65+
self.load_subsystem(new_subsystem)
66+
67+
def load_subsystem(self, subsystem):
68+
"""
69+
Load a new subsystem to the memory database
70+
"""
71+
cursor = self.conn.cursor()
72+
73+
# Create the cluster if it doesn't exist
74+
values = f"('{subsystem.cluster}')"
75+
fields = '("name")'
76+
statement = f"INSERT OR IGNORE INTO clusters {fields} VALUES {values}"
77+
logger.debug(statement)
78+
cursor.execute(statement)
79+
self.conn.commit()
80+
81+
# Create the subsystem - it should error if already exists
82+
values = f"('{subsystem.name}', '{subsystem.cluster}', '{subsystem.type}')"
83+
fields = '("name", "cluster", "type")'
84+
statement = f"INSERT INTO subsystems {fields} VALUES {values}"
85+
logger.debug(statement)
86+
cursor.execute(statement)
87+
self.conn.commit()
88+
89+
# These are fields to insert a node and attributes
90+
node_fields = '("subsystem", "cluster", "label", "type", "basename", "name", "id")'
91+
92+
# First create all nodes.
93+
# for nid, node in subsystem.graph["nodes"].items():
94+
# typ = node["metadata"]["type"]
95+
# basename = node["metadata"]["basename"]
96+
# name = node["metadata"]["name"]
97+
# id = node["metadata"]["id"]
98+
# node_values = f"('{subsystem.name}', '{subsystem.cluster}', '{nid}', '{typ}', '{basename}', '{name}', '{id}')"
99+
# statement = f"INSERT INTO nodes {node_fields} VALUES {node_values}"
100+
# logger.debug(statement)
101+
# cursor.execute(statement)
102+
103+
# Commit transaction
104+
# self.conn.commit()
105+
attr_fields = '("cluster", "subsystem", "node", "name", "value")'
106+
107+
# Now all attributes, and also include type because I'm lazy
108+
for nid, node in subsystem.iter_nodes():
109+
typ = node["metadata"]["type"]
110+
attr_values = f"('{subsystem.cluster}', '{subsystem.name}', '{nid}', 'type', '{typ}')"
111+
statement = f"INSERT INTO attributes {attr_fields} VALUES {attr_values}"
112+
cursor.execute(statement)
113+
for key, value in node["metadata"].get("attributes", {}).items():
114+
attr_values = (
115+
f"('{subsystem.cluster}', '{subsystem.name}', '{nid}', '{key}', '{value}')"
116+
)
117+
statement = f"INSERT INTO attributes {attr_fields} VALUES {attr_values}"
118+
cursor.execute(statement)
119+
120+
# Note that we aren't doing anything with edges currently.
121+
self.conn.commit()
122+
123+
def get_subsystem_nodes(self, cluster, subsystem):
124+
"""
125+
Get nodes of a subsystem and cluster
126+
127+
Technically we could skip labels, but I'm assuming we eventually want
128+
nodes in this query somewhere.
129+
"""
130+
statement = (
131+
f"SELECT label from nodes WHERE subsystem = '{subsystem}' AND cluster = '{cluster}';"
132+
)
133+
labels = self.query(statement)
134+
return [f"'{x[0]}'" for x in labels]
135+
136+
def find_nodes(self, cluster, name, items):
137+
"""
138+
Given a list of node labels, find children (attributes)
139+
that have a specific key/value.
140+
"""
141+
# Final nodes that satisfy all item requirements
142+
satisfy = set()
143+
144+
# Each item is a set of requirements for one NODE. If we cannot satisfy one software
145+
# requirement the cluster does not match.
146+
for item in items:
147+
nodes = set()
148+
i = 0
149+
for key, value in item.items():
150+
statement = f"SELECT * from attributes WHERE cluster = '{cluster}' AND subsystem = '{name}' AND name = '{key}' AND value like '{value}';"
151+
result = self.query(statement)
152+
# We don't have any nodes yet, all are contenders
153+
if i == 0:
154+
[nodes.add(x[-1]) for x in result]
155+
else:
156+
new_nodes = {x[-1] for x in result}
157+
nodes = nodes.intersection(new_nodes)
158+
i += 1
159+
160+
# If we don't have nodes left, the cluster isn't a match
161+
if not nodes:
162+
return
163+
164+
# If we get down here, we found a matching node for one item requirement
165+
[satisfy.add(x) for x in nodes]
166+
return satisfy
167+
168+
def query(self, statement):
169+
"""
170+
Issue a query to the database, returning fetchall.
171+
"""
172+
cursor = self.conn.cursor()
173+
printed = statement
174+
175+
# Don't overwhelm the output!
176+
if len(printed) > 150:
177+
printed = printed[:150] + "..."
178+
printed = f"{LogColors.OKCYAN}{printed}{LogColors.ENDC}"
179+
cursor.execute(statement)
180+
self.conn.commit()
181+
182+
# Get results, show query and number of results
183+
results = cursor.fetchall()
184+
count = (f"{LogColors.PURPLE}({len(results)}){LogColors.ENDC} ").rjust(20)
185+
logger.info(count + printed)
186+
return results
187+
188+
def satisfied(self, jobspec):
189+
"""
190+
Determine if a jobspec is satisfied by user-space subsystems.
191+
"""
192+
# This handles json or yaml
193+
js = utils.load_jobspec(jobspec)
194+
195+
requires = js["attributes"].get("system", {}).get("requires")
196+
if not requires:
197+
logger.exit("Jobspec has no system requirements.")
198+
199+
# These clusters will satisfy the request
200+
matches = set()
201+
202+
# We don't care about the association with tasks - the requires are matching clusters to entire jobs
203+
# We could optimize this to be fewer queries, but it's likely trivial for now
204+
for subsystem_type, items in requires.items():
205+
206+
# Get one or more matching subsystems (top level) for some number of clusters
207+
# The subsystem type is like the category (e.g., software)
208+
subsystems = self.get_subsystem_by_type(subsystem_type)
209+
if not subsystems:
210+
continue
211+
212+
# For each subsystem, since we don't have a query syntax developed, we just look for nodes
213+
# that have matching attributes. Each here is a tuple, (name, cluster, type)
214+
for subsystem in subsystems:
215+
name, cluster, subsystem_type = subsystem
216+
217+
# "Get nodes in subsystem X" if we have a query syntax we could limit to a type, etc.
218+
# In this case, the subsystem is the name (e.g., spack) since we might have multiple for
219+
# a type (e.g., software). This returns labels we can associate with attributes.
220+
# labels = self.get_subsystem_nodes(cluster, name)
221+
222+
# "Get attribute key values associated with our search. This is done very stupidly now
223+
nodes = self.find_nodes(cluster, name, items)
224+
if not nodes:
225+
continue
226+
matches.add((cluster, name))
227+
228+
if matches:
229+
print(f"\n{LogColors.OKBLUE}({len(matches)}) Matches {LogColors.ENDC}")
230+
for match in matches:
231+
print(f"cluster ({match[0]}) subsystem ({match[1]})")
232+
return True
233+
else:
234+
print(f"{LogColors.RED}=> No Matches{LogColors.ENDC}")
235+
return False
236+
237+
def get_subsystem_by_type(self, subsystem_type, ignore_missing=True):
238+
"""
239+
Get subsystems based on a type. This will return one or more clusters
240+
that will be contenders for matching.
241+
"""
242+
# Check 2: the subsystem exists in our database
243+
statement = f"SELECT * from subsystems WHERE type = '{subsystem_type}';"
244+
return self.query(statement)

0 commit comments

Comments
 (0)