Skip to content

Commit 9567ebe

Browse files
authored
Merge pull request #401 from sunya-ch/server-api-rebase-patch-2
feat: add compute similarity and loose selection
2 parents 5f75c6f + ffdaee8 commit 9567ebe

File tree

12 files changed

+470
-24
lines changed

12 files changed

+470
-24
lines changed

.github/workflows/unit-test.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,9 @@ jobs:
3838
timeout-minutes: 5
3939
- name: Test offline trainer
4040
run: make test-offline-trainer
41+
- name: Test model server select
42+
run: make test-model-server-select
43+
timeout-minutes: 5
44+
- name: Test model server select via estimator
45+
run: make test-model-server-estimator-select
46+
timeout-minutes: 5

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,5 @@ model_training/*data*
150150
model_training/tekton/secret
151151
local-dev-cluster
152152
tmp
153+
tests/db-models
154+
db-models

Makefile

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,47 @@ test-offline-trainer: \
119119
run-offline-trainer-client \
120120
clean-offline-trainer
121121

122+
# test model server select
123+
create-container-net:
124+
@$(CTR_CMD) network create kepler-model-server-test
125+
126+
run-model-server-with-db:
127+
$(CTR_CMD) run -d --platform linux/amd64 \
128+
--network kepler-model-server-test \
129+
-p 8100:8100 \
130+
--name model-server $(TEST_IMAGE) \
131+
model-server
132+
while ! docker logs model-server 2>&1 | grep -q 'Running on all'; do \
133+
echo "... waiting for model-server to serve"; sleep 5; \
134+
done
135+
136+
run-estimator-with-model-server:
137+
$(CTR_CMD) run -d --platform linux/amd64 \
138+
--network kepler-model-server-test \
139+
-e "MODEL_SERVER_ENABLE=true" \
140+
-e "MODEL_SERVER_URL=http://model-server:8100" \
141+
--name estimator $(TEST_IMAGE) \
142+
/bin/bash -c "estimator --machine-spec tests/data/machine/spec.json"
143+
144+
clean-container-net:
145+
@$(CTR_CMD) network rm kepler-model-server-test
146+
147+
run-select-client:
148+
$(CTR_CMD) exec model-server \
149+
hatch run test -vvv -s ./tests/model_select_test.py
150+
151+
test-model-server-select: create-container-net run-model-server-with-db run-select-client clean-model-server clean-container-net
152+
153+
test-model-server-estimator-select: create-container-net run-model-server-with-db run-estimator-with-model-server run-collector-client clean-estimator clean-model-server clean-container-net
154+
122155
test: \
123156
build-test \
124157
test-pipeline \
125158
test-estimator \
126159
test-model-server \
127-
test-offline-trainer
160+
test-offline-trainer \
161+
test-model-server-select \
162+
test-model-server-estimator-select
128163

129164
# set image
130165
set-image:

src/kepler_model/server/model_server.py

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
CHECKPOINT_FOLDERNAME,
2424
METADATA_FILENAME,
2525
any_node_type,
26+
default_pipelines,
2627
get_archived_file,
2728
get_largest_candidates,
2829
get_model_group_path,
30+
get_node_type_from_name,
2931
get_pipeline_path,
3032
is_matched_type,
3133
is_valid_model,
@@ -51,11 +53,11 @@
5153

5254

5355
class ModelRequest:
54-
def __init__(self, metrics, output_type, source="rapl-sysfs", node_type=-1, weight=False, trainer_name="", filter="", pipeline_name="", spec=None):
56+
def __init__(self, metrics, output_type, source="rapl-sysfs", node_type=-1, weight=False, trainer_name="", filter="", pipeline_name="", spec=None, loose_node_type=True):
5557
# target source of power metric to be predicted (e.g., rapl-sysfs, acpi)
5658
self.source = convert_enery_source(source)
5759
# type of node to select a model learned from similar nodes (default: -1, applied universal model learned by all node_type (TODO))
58-
self.node_type = node_type
60+
self.node_type = int(node_type) if node_type or node_type == 0 else -1
5961
# list of available resource usage metrics to find applicable models (using a valid feature group that can be obtained from the list)
6062
self.metrics = metrics
6163
# specific trainer name (default: empty, selecting any of the best trainer)
@@ -72,6 +74,7 @@ def __init__(self, metrics, output_type, source="rapl-sysfs", node_type=-1, weig
7274
self.spec = NodeTypeSpec()
7375
if spec is not None:
7476
self.spec = NodeTypeSpec(**spec)
77+
self.loose_node_type = loose_node_type
7578

7679
# ModelListParams defines parameters for /best-models API
7780
class ModelListParam(enum.Enum):
@@ -102,10 +105,27 @@ class ModelListParam(enum.Enum):
102105
"""
103106

104107

105-
def select_best_model(spec, valid_group_path: str, filters: dict, energy_source: str, pipeline_name: str="", trainer_name: str="", node_type: int=any_node_type, weight: bool=False):
106-
model_names = [f for f in os.listdir(valid_group_path) if f != CHECKPOINT_FOLDERNAME and not os.path.isfile(os.path.join(valid_group_path, f)) and (trainer_name == "" or trainer_name in f)]
108+
def select_best_model(spec, valid_group_path: str, filters: dict, energy_source: str, pipeline_name: str="", trainer_name: str="", node_type: int=any_node_type, weight: bool=False, loose_node_type: bool=True):
109+
# Set default pipeline if not specified
110+
if pipeline_name == "" and energy_source in default_pipelines:
111+
pipeline_name = default_pipelines[energy_source]
112+
113+
# Find initial model list filtered by trainer
114+
initial_model_names = [f for f in os.listdir(valid_group_path) if f != CHECKPOINT_FOLDERNAME and not os.path.isfile(os.path.join(valid_group_path, f)) and os.path.exists(os.path.join(valid_group_path, f, METADATA_FILENAME + ".json")) and (trainer_name == "" or trainer_name in f)]
115+
if node_type != any_node_type:
116+
model_names = [name for name in initial_model_names if f"_{node_type}" in name]
117+
if len(model_names) == 0:
118+
if not loose_node_type:
119+
return None, None
120+
logger.warning(f"{valid_group_path} has no matched model for node type={node_type}, try all available models")
121+
model_names = initial_model_names
122+
else:
123+
model_names = initial_model_names
124+
125+
# Filter weight models
107126
if weight:
108127
model_names = [name for name in model_names if name.split("_")[0] in weight_support_trainers]
128+
109129
# Load metadata of trainers
110130
best_cadidate = None
111131
best_response = None
@@ -167,18 +187,33 @@ def get_model():
167187
output_type = ModelOutputType[req.output_type]
168188
best_model = None
169189
best_response = None
190+
best_uncertainty = None
191+
best_looseness = None
170192
# find best model comparing best candidate from each valid feature group complied with filtering conditions
171193
for fg in valid_fgs:
194+
pipeline_name = pipelineName[energy_source]
172195
valid_group_path = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source])
196+
node_type = req.node_type
197+
if req.node_type == any_node_type and req.spec is not None and not req.spec.is_none() and pipeline_name in nodeCollection:
198+
node_type, uncertainty, looseness = nodeCollection[pipeline_name].get_node_type(req.spec, loose_search=True)
199+
else:
200+
uncertainty = 0
201+
looseness = 0
173202
if os.path.exists(valid_group_path):
174-
best_candidate, response = select_best_model(req.spec, valid_group_path, filters, energy_source, req.pipeline_name, req.trainer_name, req.node_type, req.weight)
203+
best_candidate, response = select_best_model(req.spec, valid_group_path, filters, energy_source, req.pipeline_name, req.trainer_name, node_type, req.weight, loose_node_type=req.loose_node_type)
175204
if best_candidate is None:
176205
continue
206+
if node_type != any_node_type and best_model is not None and get_node_type_from_name(best_model['model_name']) == node_type:
207+
if get_node_type_from_name(best_candidate['model_name']) != node_type:
208+
continue
177209
if best_model is None or best_model[ERROR_KEY] > best_candidate[ERROR_KEY]:
178210
best_model = best_candidate
179211
best_response = response
212+
best_uncertainty = uncertainty
213+
best_looseness = looseness
180214
if best_model is None:
181215
return make_response(f"cannot find model for {model_request} at the moment", 400)
216+
logger.info(f"response: model {best_model['model_name']} by {best_model['features']} with {ERROR_KEY}={best_model[ERROR_KEY]} selected with uncertainty={best_uncertainty}, looseness={best_looseness}")
182217
if req.weight:
183218
try:
184219
response = app.response_class(response=json.dumps(best_response), status=200, mimetype="application/json")
@@ -234,7 +269,7 @@ def get_available_models():
234269
logger.debug(f"Searching feature group {fg}")
235270
valid_group_path = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source])
236271
if os.path.exists(valid_group_path):
237-
best_candidate, _ = select_best_model(None, valid_group_path, filters, energy_source, node_type=node_type)
272+
best_candidate, _ = select_best_model(None, valid_group_path, filters, energy_source, node_type=node_type, loose_node_type=False)
238273
if best_candidate is None:
239274
continue
240275
model_names[output_type.name][fg.name] = best_candidate["model_name"]

src/kepler_model/train/profiler/node_type_index.py

Lines changed: 120 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
# node_type = index_collection.index_train_machine(machine_id, new_spec)
66
# index_collection.save()
77

8-
import enum
98
import logging
109
import os
1110
import re
@@ -17,6 +16,8 @@
1716

1817
from kepler_model.util.loader import load_json, load_node_type_index
1918
from kepler_model.util.saver import save_machine_spec, save_node_type_index
19+
from kepler_model.util.similarity import compute_jaccard_similarity, compute_looseness, compute_similarity, compute_uncertainty, find_best_candidate, get_candidate_score, get_num_of_none, get_similarity_weight
20+
from kepler_model.util.train_types import NodeAttribute
2021

2122
logger = logging.getLogger(__name__)
2223

@@ -100,13 +101,6 @@ def get_machine_spec(cmd_machine_spec_file: str):
100101
return spec
101102
return discover_spec_values()
102103

103-
class NodeAttribute(str, enum.Enum):
104-
PROCESSOR = "processor"
105-
CORES = "cores"
106-
CHIPS = "chips"
107-
MEMORY = "memory"
108-
FREQ = "frequency"
109-
110104
def load_node_type_spec(node_type_index_json):
111105
node_type_spec_index = dict()
112106
if node_type_index_json is not None:
@@ -181,6 +175,34 @@ def cover(self, compare_spec):
181175
return False
182176
return True
183177

178+
def get_uncertain_attribute_freq(self, compare_spec):
179+
uncertain_attribute_freq = dict()
180+
if not self.cover(compare_spec):
181+
# not covered
182+
return None
183+
size = self.get_size()
184+
for attr in NodeAttribute:
185+
if compare_spec.attrs[attr] is None:
186+
uncertain_attribute_freq[attr] = size
187+
return uncertain_attribute_freq
188+
189+
def get_similarity(self, compare_spec, debug=False):
190+
total_similarity = 0
191+
for attr in NodeAttribute:
192+
similarity = 0
193+
# compare similar string
194+
if compare_spec.attrs[attr] is not None and attr in [NodeAttribute.PROCESSOR]:
195+
similarity = compute_jaccard_similarity(self.attrs[attr], compare_spec.attrs[attr])
196+
# compare number
197+
elif compare_spec.attrs[attr] is not None:
198+
similarity = compute_similarity(self.attrs[attr], compare_spec.attrs[attr])
199+
if debug:
200+
print(attr, self.attrs[attr], compare_spec.attrs[attr], similarity, get_similarity_weight(attr))
201+
total_similarity += (similarity*get_similarity_weight(attr))
202+
if total_similarity > 1:
203+
total_similarity = 1
204+
return total_similarity
205+
184206
def __str__(self):
185207
out_str = ""
186208
for attr in NodeAttribute:
@@ -218,7 +240,7 @@ def index_train_machine(self, machine_id, new_spec):
218240
if not new_spec.complete_info():
219241
print("Machine info not completed: ", str(new_spec))
220242
return -1
221-
covered_index = self.get_node_type(new_spec)
243+
covered_index, _, _ = self.get_node_type(new_spec)
222244
if covered_index == -1:
223245
covered_index = 0
224246
if len(self.node_type_index.keys()) > 0:
@@ -227,13 +249,31 @@ def index_train_machine(self, machine_id, new_spec):
227249
self.node_type_index[covered_index].add_member(machine_id)
228250
return covered_index
229251

230-
def get_node_type(self, compare_spec):
252+
def get_node_type(self, in_spec: NodeTypeSpec, loose_search: bool=False):
231253
if len(self.node_type_index) == 0:
232-
return -1
233-
for index, node_type_spec in self.node_type_index.items():
234-
if node_type_spec.cover(compare_spec):
235-
return index
236-
return -1
254+
return -1, -1, -1
255+
compare_spec = in_spec.copy()
256+
num_of_none = get_num_of_none(compare_spec)
257+
similarity_map, max_similarity, most_similar_index, has_candidate, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total = self._find_candidates(in_spec, loose_search)
258+
if max_similarity == 1:
259+
return most_similar_index, 0, 0
260+
if has_candidate:
261+
# covered
262+
candidate_score = get_candidate_score(candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total)
263+
best_candidate_index, max_score = find_best_candidate(candidate_score)
264+
uncertainty = compute_uncertainty(max_score, num_of_none)
265+
return best_candidate_index, uncertainty, 0
266+
elif loose_search:
267+
if most_similar_index != -1:
268+
candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total, num_of_none = self._loose_search(compare_spec, similarity_map, max_similarity, most_similar_index, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total)
269+
candidate_score = get_candidate_score(candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total)
270+
logger.debug(f"candidate score: {candidate_score}")
271+
most_similar_score = candidate_score[most_similar_index]
272+
uncertainty = compute_uncertainty(most_similar_score, num_of_none)
273+
if max_similarity != -1:
274+
looseness = compute_looseness(max_similarity)
275+
return most_similar_index, uncertainty, looseness
276+
return -1, -1, -1
237277

238278
def get_json(self):
239279
json_obj = dict()
@@ -251,3 +291,68 @@ def copy(self):
251291
for node_type in removed_items:
252292
del node_collection.node_type_index[node_type]
253293
return node_collection
294+
295+
def _find_candidates(self, compare_spec, loose_search=False):
296+
"""
297+
This function returns most similar node_type index.
298+
- similarity value for the compare_spec to each node_type in collection index will be computed
299+
- among candidates with similarity value, the most frequently-found node_type will be selected
300+
- loose_search flag allows adding candidate even if the compare spec is not covered
301+
"""
302+
candidate_uncertain_attribute_freq = dict()
303+
candidate_uncertain_attribute_total = dict()
304+
most_similar_index = -1
305+
max_similarity = -1
306+
most_similar_freq = -1
307+
completed_info = compare_spec.complete_info()
308+
has_candidate = False
309+
similarity_map = dict()
310+
for attr in NodeAttribute:
311+
candidate_uncertain_attribute_freq[attr] = []
312+
candidate_uncertain_attribute_total[attr] = 0
313+
for index, node_type_spec in self.node_type_index.items():
314+
freq = node_type_spec.get_size()
315+
if loose_search:
316+
similarity = node_type_spec.get_similarity(compare_spec)
317+
similarity_map[index] = similarity
318+
if similarity > max_similarity or (similarity == max_similarity and most_similar_freq < freq):
319+
most_similar_index = index
320+
max_similarity = similarity
321+
most_similar_freq = freq
322+
logger.debug(f"{index} - {node_type_spec}: {similarity}")
323+
if node_type_spec.cover(compare_spec):
324+
if completed_info:
325+
return similarity_map, 1, index, has_candidate, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total
326+
else:
327+
for attr in NodeAttribute:
328+
if compare_spec.attrs[attr] is None:
329+
candidate_uncertain_attribute_freq[attr] += [(index, freq)]
330+
candidate_uncertain_attribute_total[attr] += freq
331+
has_candidate = True
332+
return similarity_map, max_similarity, most_similar_index, has_candidate, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total
333+
334+
def _loose_search(self, compare_spec, similarity_map, max_similarity, most_similar_index, candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total):
335+
"""
336+
This function tries loosing the attribute that doesn't match the spec with maximum similarility and recompute uncertainty value of selection.
337+
"""
338+
num_of_none = get_num_of_none(compare_spec)
339+
most_similar_spec = self.node_type_index[most_similar_index]
340+
# remove uncovered spec
341+
for attr in NodeAttribute:
342+
if compare_spec.attrs[attr] != most_similar_spec.attrs[attr]:
343+
logger.debug(f"Loosen {attr} ({compare_spec.attrs[attr]}-->{most_similar_spec.attrs[attr]})")
344+
compare_spec.attrs[attr] = None
345+
num_of_none += 1
346+
# find uncertainty
347+
for index, node_type_spec in self.node_type_index.items():
348+
if node_type_spec.cover(compare_spec):
349+
similarity = similarity_map[index]
350+
freq = node_type_spec.get_size()
351+
if similarity == max_similarity and freq > self.node_type_index[most_similar_index].get_size():
352+
logger.debug(f"change most similar index from {most_similar_index} to {index}")
353+
most_similar_index = index
354+
for attr in NodeAttribute:
355+
if compare_spec.attrs[attr] is None:
356+
candidate_uncertain_attribute_freq[attr] += [(index, freq)]
357+
candidate_uncertain_attribute_total[attr] += freq
358+
return candidate_uncertain_attribute_freq, candidate_uncertain_attribute_total, num_of_none

src/kepler_model/util/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
#################################################
1414

1515
import os
16+
1617
import requests
1718

18-
from .loader import base_model_url, default_pipelines, default_train_output_pipeline, get_pipeline_url, get_url, default_init_model_name
19+
from .loader import base_model_url, default_init_model_name, default_pipelines, default_train_output_pipeline, get_pipeline_url, get_url
1920
from .train_types import FeatureGroup, ModelOutputType, is_output_type_supported
2021

2122
# must be writable (for shared volume mount)

src/kepler_model/util/loader.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,18 @@ def load_remote_pkl(url_path):
112112
logger.error(f"failed to load pkl url {url_path}: {e}")
113113
return None
114114

115+
def load_remote_json(url_path):
116+
if ".json" not in url_path:
117+
url_path = url_path + ".json"
118+
try:
119+
response = urlopen(url_path)
120+
response_data = response.read().decode('utf-8')
121+
json_data = json.loads(response_data)
122+
return json_data
123+
except Exception as e:
124+
logger.error(f"failed to load json url {url_path}: {e}")
125+
return None
126+
115127
def load_machine_spec(data_path, machine_id):
116128
machine_spec_path = os.path.join(data_path, MACHINE_SPEC_PATH)
117129
return load_json(machine_spec_path, machine_id)
@@ -216,7 +228,7 @@ def is_matched_type(nodeCollection, spec, pipeline_name, model_name, node_type,
216228
return True
217229
return False
218230

219-
231+
# get_largest_candidates return list of model_names that have maximum number of cores
220232
def get_largest_candidates(model_names, pipeline_name, nodeCollection, energy_source):
221233
pipeline_name = assure_pipeline_name(pipeline_name, energy_source, nodeCollection)
222234
if pipeline_name not in nodeCollection:

0 commit comments

Comments
 (0)