Skip to content

Commit 62109bc

Browse files
author
Sunil Thaha
authored
Merge pull request #370 from sunya-ch/server-api-rebase
feat: update select logic with spec similarity computation
2 parents cbb0cab + bf7c36d commit 62109bc

File tree

16 files changed

+534
-49
lines changed

16 files changed

+534
-49
lines changed

.github/workflows/unit-test.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,9 @@ jobs:
3838
timeout-minutes: 5
3939
- name: Test offline trainer
4040
run: make test-offline-trainer
41+
- name: Test model server select
42+
run: make test-model-server-select
43+
timeout-minutes: 5
44+
- name: Test model server select via estimator
45+
run: make test-model-server-estimator-select
46+
timeout-minutes: 5

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ tests/data/extractor_output
146146
tests/data/isolator_output
147147
tests/data/offline_trainer_output
148148
tests/data/plot_output
149+
tests/db-models
149150
model_training/*data*
150151
model_training/tekton/secret
151152
local-dev-cluster

Makefile

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ PYTHON = python3.10
1212

1313
DOCKERFILES_PATH := ./dockerfiles
1414
MODEL_PATH := ${PWD}/tests/models
15+
MACHINE_SPEC_PATH := ${PWD}/tests/data/machine_spec
1516

1617
build:
1718
$(CTR_CMD) build -t $(IMAGE) -f $(DOCKERFILES_PATH)/Dockerfile .
@@ -53,10 +54,15 @@ run-estimator:
5354

5455
run-collector-client:
5556
$(CTR_CMD) exec estimator /bin/bash -c \
56-
"while [ ! -S "/tmp/estimator.sock" ]; do sleep 1; done; hatch test -vvv -s ./tests/estimator_power_request_test.py"
57+
"while [ ! -S "/tmp/estimator.sock" ]; do \
58+
sleep 1; \
59+
done; \
60+
hatch run test -vvv -s ./tests/estimator_power_request_test.py"
5761

5862
clean-estimator:
59-
$(CTR_CMD) stop estimator
63+
@$(CTR_CMD) logs estimator
64+
@$(CTR_CMD) stop estimator
65+
@$(CTR_CMD) rm estimator || true
6066

6167
test-estimator: run-estimator run-collector-client clean-estimator
6268

@@ -76,7 +82,9 @@ run-estimator-client:
7682
hatch run test -vvv -s ./tests/estimator_model_request_test.py
7783

7884
clean-model-server:
85+
@$(CTR_CMD) logs model-server
7986
@$(CTR_CMD) stop model-server
87+
@$(CTR_CMD) rm model-server || true
8088

8189
test-model-server: \
8290
run-model-server \
@@ -104,6 +112,42 @@ test-offline-trainer: \
104112
run-offline-trainer-client \
105113
clean-offline-trainer
106114

115+
# test model server select
116+
create-container-net:
117+
@$(CTR_CMD) network create kepler-model-server-test
118+
119+
run-model-server-with-db:
120+
$(CTR_CMD) run -d --platform linux/amd64 \
121+
--network kepler-model-server-test \
122+
-p 8100:8100 \
123+
--name model-server $(TEST_IMAGE) \
124+
model-server
125+
while ! docker logs model-server 2>&1 | grep -q 'Running on all'; do \
126+
echo "... waiting for model-server to serve"; sleep 5; \
127+
done
128+
129+
run-estimator-with-model-server:
130+
$(CTR_CMD) run -d --platform linux/amd64 \
131+
--network kepler-model-server-test \
132+
-e "PYTHONUNBUFFERED=1" \
133+
-e "MACHINE_ID=test" \
134+
-v ${MACHINE_SPEC_PATH}:/etc/kepler/models/machine_spec \
135+
-e "MODEL_SERVER_ENABLE=true" \
136+
-e "MODEL_SERVER_URL=http://model-server:8100" \
137+
--name estimator $(TEST_IMAGE) \
138+
estimator
139+
140+
clean-container-net:
141+
@$(CTR_CMD) network rm kepler-model-server-test
142+
143+
run-select-client:
144+
$(CTR_CMD) exec model-server \
145+
hatch run test -vvv -s ./tests/model_select_test.py
146+
147+
test-model-server-select: create-container-net run-model-server-with-db run-select-client clean-model-server clean-container-net
148+
149+
test-model-server-estimator-select: create-container-net run-model-server-with-db run-estimator-with-model-server run-collector-client clean-estimator clean-model-server clean-container-net
150+
107151
test: \
108152
build-test \
109153
test-pipeline \
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
__pycache__
22
src/resource/
33
src/kepler_model/models/
4+
src/models/
45
tests/models/
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
__pycache__
22
src/resource/
33
src/kepler_model/models/
4+
src/models/
45
tests/models/
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
__pycache__
22
src/resource/
33
src/kepler_model/models/
4+
src/models/
45
tests/models/

src/kepler_model/estimate/estimator.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def handle_request(data):
7070
current_trainer = loaded_model[output_type.name][power_request.energy_source].trainer_name
7171
request_trainer = current_trainer != power_request.trainer_name
7272
if request_trainer:
73-
logger.info("try obtaining the requesting trainer {} (current: {})".format(power_request.trainer_name, current_trainer))
73+
logger.info(f"try obtaining the requesting trainer {power_request.trainer_name} (current: {current_trainer})")
7474
if power_request.energy_source not in loaded_model[output_type.name] or request_trainer:
7575
output_path = get_download_output_path(download_path, power_request.energy_source, output_type)
7676
if not os.path.exists(output_path):
@@ -84,20 +84,20 @@ def handle_request(data):
8484
logger.error(msg)
8585
return {"powers": dict(), "msg": msg}
8686
else:
87-
logger.info("load model from config: ", output_path)
87+
logger.info(f"load model from config: {output_path}")
8888
else:
89-
logger.info("load model from model server: %s", output_path)
89+
logger.info(f"load model from model server: {output_path}")
9090
loaded_item = load_downloaded_model(power_request.energy_source, output_type)
9191
if loaded_item is not None and loaded_item.estimator is not None:
9292
loaded_model[output_type.name][power_request.energy_source] = loaded_item
93-
logger.info("set model {0} for {2} ({1})".format(loaded_item.model_name, output_type.name, power_request.energy_source))
93+
logger.info(f"set model {loaded_item.model_name} for {power_request.energy_source} ({output_type.name})")
9494
# remove loaded model
9595
shutil.rmtree(output_path)
9696

9797
model = loaded_model[output_type.name][power_request.energy_source]
9898
powers, msg = model.get_power(power_request.datapoint)
9999
if msg != "":
100-
logger.info("{} fail to predict, removed: {}".format(model.model_name, msg))
100+
logger.info(f"{model.model_name} fail to predict, removed: {msg}")
101101
if output_path != "" and os.path.exists(output_path):
102102
shutil.rmtree(output_path)
103103
return {"powers": powers, "msg": msg}
@@ -111,7 +111,7 @@ def start(self):
111111
s = self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
112112
s.bind(self.socket_path)
113113
s.listen(1)
114-
logger.info("started serving on {}".format(self.socket_path))
114+
logger.info(f"started serving on {self.socket_path}")
115115
try:
116116
while True:
117117
connection, _ = s.accept()

src/kepler_model/estimate/model_server_connector.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,31 @@
88
from kepler_model.util.config import is_model_server_enabled, get_model_server_req_endpoint, get_model_server_list_endpoint, download_path
99
from kepler_model.util.loader import get_download_output_path
1010
from kepler_model.util.train_types import ModelOutputType
11+
from kepler_model.train.profiler.node_type_index import discover_spec_values
1112

13+
machine_spec_mount_path = "/etc/kepler/models/machine_spec"
14+
machine_id = os.getenv('MACHINE_ID', None)
1215

13-
# discover_spec: determine node spec in json format (refer to NodeTypeSpec)
14-
def discover_spec():
15-
import psutil
16+
# get_spec_values: determine node spec in json format (refer to NodeTypeSpec)
17+
def get_spec_values(machine_id : str|None):
18+
if machine_id is not None:
19+
spec_file = os.path.join(machine_spec_mount_path, machine_id)
20+
try:
21+
with open(spec_file) as f:
22+
res = json.load(f)
23+
return res
24+
except:
25+
pass
26+
return discover_spec_values()
1627

17-
# TODO: reuse node_type_index/generate_spec with loosen selection
18-
cores = psutil.cpu_count(logical=True)
19-
spec = {"cores": cores}
20-
return spec
21-
22-
23-
node_spec = discover_spec()
28+
node_spec = None
2429

2530

2631
def make_model_request(power_request):
32+
global node_spec
33+
if node_spec is None:
34+
node_spec = get_spec_values(machine_id)
35+
print(f"Node spec: {node_spec}")
2736
return {"metrics": power_request.metrics + power_request.system_features, "output_type": power_request.output_type, "source": power_request.energy_source, "filter": power_request.filter, "trainer_name": power_request.trainer_name, "spec": node_spec}
2837

2938

@@ -62,16 +71,21 @@ def make_request(power_request):
6271
return unpack(power_request.energy_source, output_type, response)
6372

6473

65-
def list_all_models():
74+
def list_all_models(energy_source=None, node_type=None):
6675
if not is_model_server_enabled():
6776
return dict()
6877
try:
69-
response = requests.get(get_model_server_list_endpoint())
78+
endpoint = get_model_server_list_endpoint()
79+
params= {}
80+
if energy_source:
81+
params["source"] = energy_source
82+
if node_type:
83+
params["type"] = node_type
84+
response = requests.get(endpoint, params=params)
7085
except Exception as err:
7186
print("cannot list model: {}".format(err))
7287
return dict()
7388
if response.status_code != 200:
7489
return dict()
7590
model_names = json.loads(response.content.decode("utf-8"))
7691
return model_names
77-

src/kepler_model/server/model_server.py

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from kepler_model.util.train_types import get_valid_feature_groups, ModelOutputType, FeatureGroups, FeatureGroup, PowerSourceMap, weight_support_trainers
1212
from kepler_model.util.config import getConfig, model_toppath, ERROR_KEY, MODEL_SERVER_MODEL_REQ_PATH, MODEL_SERVER_MODEL_LIST_PATH, initial_pipeline_urls, download_path
13-
from kepler_model.util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type, get_largest_candidates
13+
from kepler_model.util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type, get_largest_candidates, default_pipelines, get_node_type_from_name
1414
from kepler_model.util.saver import WEIGHT_FILENAME
1515
from kepler_model.train import NodeTypeSpec, NodeTypeIndexCollection
1616

@@ -65,10 +65,26 @@ def __init__(self, metrics, output_type, source="rapl-sysfs", node_type=-1, weig
6565
"""
6666

6767

68-
def select_best_model(spec, valid_groupath, filters, energy_source, pipeline_name="", trainer_name="", node_type=any_node_type, weight=False):
69-
model_names = [f for f in os.listdir(valid_groupath) if f != CHECKPOINT_FOLDERNAME and not os.path.isfile(os.path.join(valid_groupath, f)) and (trainer_name == "" or trainer_name in f)]
68+
def select_best_model(spec, valid_grouppath, filters, energy_source, pipeline_name="", trainer_name="", node_type=any_node_type, weight=False, loose_node_type=True):
69+
# Find initial model list filtered by trainer
70+
initial_model_names = [f for f in os.listdir(valid_grouppath) if f != CHECKPOINT_FOLDERNAME and not os.path.isfile(os.path.join(valid_grouppath, f)) and (trainer_name == "" or trainer_name in f)]
71+
if pipeline_name == "" and energy_source in default_pipelines:
72+
pipeline_name = default_pipelines[energy_source]
73+
74+
if node_type != any_node_type:
75+
model_names = [name for name in initial_model_names if "_{}".format(node_type) in name]
76+
if len(model_names) == 0:
77+
if not loose_node_type:
78+
return None, None
79+
logger.warning(f"{valid_grouppath} has no matched model for node type={node_type}, try all available models")
80+
model_names = initial_model_names
81+
else:
82+
model_names = initial_model_names
83+
84+
# Filter weight models
7085
if weight:
71-
model_names = [name for name in model_names if name.split("_")[0] in weight_support_trainers]
86+
candidates = [name for name in model_names if name.split("_")[0] in weight_support_trainers]
87+
7288
# Load metadata of trainers
7389
best_cadidate = None
7490
best_response = None
@@ -85,7 +101,7 @@ def select_best_model(spec, valid_groupath, filters, energy_source, pipeline_nam
85101
logger.warn("no large candidates, select from all availables")
86102
candidates = model_names
87103
for model_name in candidates:
88-
model_savepath = os.path.join(valid_groupath, model_name)
104+
model_savepath = os.path.join(valid_grouppath, model_name)
89105
metadata = load_json(model_savepath, METADATA_FILENAME)
90106
if metadata is None or not is_valid_model(metadata, filters) or ERROR_KEY not in metadata:
91107
# invalid metadata
@@ -98,7 +114,7 @@ def select_best_model(spec, valid_groupath, filters, energy_source, pipeline_nam
98114
logger.warn("weight failed: %s", model_savepath)
99115
continue
100116
else:
101-
response = get_archived_file(valid_groupath, model_name)
117+
response = get_archived_file(valid_grouppath, model_name)
102118
if not os.path.exists(response):
103119
# archived model file does not exists
104120
logger.warn("archive failed: %s", response)
@@ -130,20 +146,36 @@ def get_model():
130146
output_type = ModelOutputType[req.output_type]
131147
best_model = None
132148
best_response = None
149+
best_uncertainty = None
150+
best_looseness = None
133151
# find best model comparing best candidate from each valid feature group complied with filtering conditions
134152
for fg in valid_fgs:
135-
valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source])
153+
pipeline_name = pipelineName[energy_source]
154+
valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipeline_name)
155+
node_type = req.node_type
156+
if req.node_type == any_node_type and req.spec is not None and not req.spec.is_none() and pipeline_name in nodeCollection:
157+
node_type, uncertainty, looseness = nodeCollection[pipeline_name].get_node_type(req.spec, loose_search=True)
158+
else:
159+
uncertainty = 0
160+
looseness = 0
136161
if os.path.exists(valid_groupath):
137-
best_candidate, response = select_best_model(req.spec, valid_groupath, filters, energy_source, req.pipeline_name, req.trainer_name, req.node_type, req.weight)
162+
best_candidate, response = select_best_model(req.spec, valid_groupath, filters, energy_source, req.pipeline_name, req.trainer_name, node_type, req.weight)
138163
if best_candidate is None:
139164
continue
165+
if node_type != any_node_type and best_model is not None and get_node_type_from_name(best_model['model_name']) == node_type:
166+
if get_node_type_from_name(best_candidate['model_name']) != node_type:
167+
continue
140168
if best_model is None or best_model[ERROR_KEY] > best_candidate[ERROR_KEY]:
141169
best_model = best_candidate
142170
best_response = response
171+
best_uncertainty = uncertainty
172+
best_looseness = looseness
173+
logger.info(f"response: model {best_model['model_name']} by {best_model['features']} with {ERROR_KEY}={best_model[ERROR_KEY]} selected with uncertainty={best_uncertainty}, looseness={best_looseness}")
143174
if best_model is None:
144175
return make_response("cannot find model for {} at the moment".format(model_request), 400)
145176
if req.weight:
146177
try:
178+
best_response["model_name"] = best_model['model_name']
147179
response = app.response_class(response=json.dumps(best_response), status=200, mimetype="application/json")
148180
return response
149181
except ValueError as err:
@@ -154,13 +186,13 @@ def get_model():
154186
except ValueError as err:
155187
return make_response("send archived model error: {}".format(err), 400)
156188

157-
158189
# get_available_models: return name list of best-candidate pipelines
159190
@app.route(MODEL_SERVER_MODEL_LIST_PATH, methods=["GET"])
160191
def get_available_models():
161192
fg = request.args.get("fg")
162193
ot = request.args.get("ot")
163194
energy_source = request.args.get("source")
195+
node_type = request.args.get("type")
164196
filter = request.args.get("filter")
165197

166198
try:
@@ -181,21 +213,27 @@ def get_available_models():
181213
filters = dict()
182214
else:
183215
filters = parse_filters(filter)
216+
if node_type is None:
217+
node_type = -1
218+
else:
219+
node_type = int(node_type)
184220

185221
model_names = dict()
186222
for output_type in output_types:
223+
logger.debug(f"Searching output type {output_type}")
187224
model_names[output_type.name] = dict()
188225
for fg in valid_fgs:
226+
logger.debug(f"Searching feature group {fg}")
189227
valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source])
190228
if os.path.exists(valid_groupath):
191-
best_candidate, _ = select_best_model(None, valid_groupath, filters, energy_source)
229+
best_candidate, _ = select_best_model(None, valid_groupath, filters, energy_source, node_type=node_type, loose_node_type=False)
192230
if best_candidate is None:
193231
continue
194232
model_names[output_type.name][fg.name] = best_candidate["model_name"]
195233
response = app.response_class(response=json.dumps(model_names), status=200, mimetype="application/json")
196234
return response
197235
except (ValueError, Exception) as err:
198-
return make_response("failed to get best model list: {}".format(err), 400)
236+
return make_response(f"failed to get best model list: {err}", 400)
199237

200238

201239
# upack_zip_files: unpack all model.zip files to model folder and copy model.json to model/weight.zip

0 commit comments

Comments
 (0)