Skip to content

Commit 64e301f

Browse files
committed
feat: support for heuristic for containment resources
Signed-off-by: vsoch <[email protected]>
1 parent f5dfdf5 commit 64e301f

File tree

4 files changed

+85
-3
lines changed

4 files changed

+85
-3
lines changed

examples/fractale/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,10 @@ And right now the search is just over attributes to find matching clusters. E.g.
7272
}
7373
```
7474

75+
Here is a jobspec that can't be satisfied because we ask for too many resources given the cluster [containment-subsystem.json]([containment-subsystem.json).
76+
77+
```bash
78+
fractale satisfy ./examples/fractale/jobspec-containment-unsatisfied.yaml
79+
```
80+
7581
We likely want to have a more structured query syntax that can handle AND, OR, and other specifics. The actual search should remain general to support any generic key/value pair of attributes. My database structure and queries are also bad.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
version: 1
2+
resources:
3+
- type: slot
4+
count: 1
5+
with:
6+
- type: core
7+
count: 100
8+
label: task
9+
tasks:
10+
- command:
11+
- gmx
12+
slot: task
13+
count:
14+
per_slot: 1
15+
attributes:
16+
system:
17+
duration: 0
18+
requires:
19+
software:
20+
- name: curl
21+
type: binary

fractale/jobspec.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import copy
2+
3+
4+
def flatten_jobspec_resources(jobspec):
5+
"""
6+
Given a jobspec, turn the required resources into a flattened version.
7+
"""
8+
resources = {}
9+
resource_list = copy.deepcopy(jobspec["resources"])
10+
multiplier = 1
11+
while resource_list:
12+
requires = resource_list.pop(0)
13+
resource_type = requires["type"]
14+
resource_count = requires.get("count")
15+
if resource_type == "slot":
16+
multiplier = resource_count or 1
17+
else:
18+
if resource_type not in resources:
19+
resources[resource_type] = 0
20+
resources[resource_type] += resource_count * multiplier
21+
resource_list += requires.get("with") or []
22+
return resources

fractale/subsystem/solver/database.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import copy
12
import os
23
import sqlite3
34

5+
import fractale.jobspec as jspec
46
import fractale.subsystem.queries as queries
57
import fractale.utils as utils
68
from fractale.logger import LogColors, logger
@@ -9,12 +11,10 @@
911
class DatabaseSolver:
1012
"""
1113
A database solver solves for a cluster based on a simple database.
12-
13-
TODO: we need to have counters or another strategy for containment.
1414
"""
1515

1616
def __init__(self, path):
17-
self.systems = {}
17+
self.subsystems = {}
1818
self.conn = sqlite3.connect(":memory:")
1919
self.create_tables()
2020
self.load(path)
@@ -104,9 +104,18 @@ def load_subsystem(self, subsystem):
104104
# self.conn.commit()
105105
attr_fields = '("cluster", "subsystem", "node", "name", "value")'
106106

107+
# Keep track of counts of all types
108+
counts = {}
109+
107110
# Now all attributes, and also include type because I'm lazy
108111
for nid, node in subsystem.iter_nodes():
109112
typ = node["metadata"]["type"]
113+
114+
# Assume a node is a count of 1
115+
if typ not in counts:
116+
counts[typ] = 0
117+
counts[typ] += 1
118+
110119
attr_values = f"('{subsystem.cluster}', '{subsystem.name}', '{nid}', 'type', '{typ}')"
111120
statement = f"INSERT INTO attributes {attr_fields} VALUES {attr_values}"
112121
cursor.execute(statement)
@@ -119,6 +128,7 @@ def load_subsystem(self, subsystem):
119128

120129
# Note that we aren't doing anything with edges currently.
121130
self.conn.commit()
131+
self.subsystems[subsystem.name] = counts
122132

123133
def get_subsystem_nodes(self, cluster, subsystem):
124134
"""
@@ -196,6 +206,10 @@ def satisfied(self, jobspec):
196206
if not requires:
197207
logger.exit("Jobspec has no system requirements.")
198208

209+
# Special case: containment - try matching resources if we have one
210+
if "containment" in self.subsystems:
211+
requires["containment"] = jspec.flatten_jobspec_resources(js)
212+
199213
# These clusters will satisfy the request
200214
matches = set()
201215

@@ -214,6 +228,12 @@ def satisfied(self, jobspec):
214228
for subsystem in subsystems:
215229
name, cluster, subsystem_type = subsystem
216230

231+
# If subsystem is containment and we don't have enough totals, fail
232+
if "containment" in self.subsystems:
233+
if not self.assess_containment(requires["containment"]):
234+
print(f"{LogColors.RED}=> No Matches due to containment{LogColors.ENDC}")
235+
return False
236+
217237
# "Get nodes in subsystem X" if we have a query syntax we could limit to a type, etc.
218238
# In this case, the subsystem is the name (e.g., spack) since we might have multiple for
219239
# a type (e.g., software). This returns labels we can associate with attributes.
@@ -234,6 +254,19 @@ def satisfied(self, jobspec):
234254
print(f"{LogColors.RED}=> No Matches{LogColors.ENDC}")
235255
return False
236256

257+
def assess_containment(self, requires):
258+
"""
259+
A rough heuirstic to see if the cluster has enough resources
260+
of specific types.
261+
"""
262+
for typ, count in requires.items():
263+
if typ not in self.subsystems.get("containment", {}):
264+
return False
265+
have_count = self.subsystems["containment"][typ]
266+
if have_count < count:
267+
return False
268+
return True
269+
237270
def get_subsystem_by_type(self, subsystem_type, ignore_missing=True):
238271
"""
239272
Get subsystems based on a type. This will return one or more clusters

0 commit comments

Comments
 (0)