Skip to content

Commit 4e01087

Browse files
committed
feat: add machine_spec to metadata
Signed-off-by: Sunyanan Choochotkaew <[email protected]>
1 parent 9567ebe commit 4e01087

File tree

9 files changed

+105
-23
lines changed

9 files changed

+105
-23
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ run-estimator-with-test-spec:
5757
$(CTR_CMD) run --rm -d --platform linux/amd64 \
5858
--name estimator \
5959
$(TEST_IMAGE) \
60-
/bin/bash -c "estimator --machine-spec tests/data/machine/spec.json"
60+
/bin/bash -c "estimator --machine-spec tests/data/machine/spec.json --log-level debug"
6161

6262
run-collector-client:
6363
$(CTR_CMD) exec estimator /bin/bash -c \
@@ -129,7 +129,7 @@ run-model-server-with-db:
129129
-p 8100:8100 \
130130
--name model-server $(TEST_IMAGE) \
131131
model-server
132-
while ! docker logs model-server 2>&1 | grep -q 'Running on all'; do \
132+
while ! $(CTR_CMD) logs model-server 2>&1 | grep -q 'Running on all'; do \
133133
echo "... waiting for model-server to serve"; sleep 5; \
134134
done
135135

src/kepler_model/estimate/estimator.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
from kepler_model.estimate.archived_model import get_achived_model
1313
from kepler_model.estimate.model.model import load_downloaded_model
1414
from kepler_model.estimate.model_server_connector import is_model_server_enabled, make_request
15-
from kepler_model.train.profiler.node_type_index import get_machine_spec
15+
from kepler_model.train.profiler.node_type_index import NodeTypeSpec, discover_spec_values, get_machine_spec
1616
from kepler_model.util.config import SERVE_SOCKET, download_path, set_env_from_model_config
17-
from kepler_model.util.loader import get_download_output_path
17+
from kepler_model.util.loader import get_download_output_path, load_metadata
1818
from kepler_model.util.train_types import ModelOutputType, convert_enery_source, is_output_type_supported
1919

2020
###############################################
@@ -42,7 +42,7 @@ def __init__(self, metrics, values, output_type, source, system_features, system
4242

4343
loaded_model = dict()
4444

45-
def handle_request(data: str, machine_spec=None) -> dict:
45+
def handle_request(data: str, machine_spec=None, discovered_core=None) -> dict:
4646
try:
4747
power_request = json.loads(data, object_hook=lambda d: PowerRequest(**d))
4848
except Exception as e:
@@ -94,22 +94,41 @@ def handle_request(data: str, machine_spec=None) -> dict:
9494
if loaded_item is not None and loaded_item.estimator is not None:
9595
loaded_model[output_type.name][power_request.energy_source] = loaded_item
9696
logger.info(f"set model {loaded_item.model_name} for {output_type.name} ({power_request.energy_source})")
97+
else:
98+
msg = f"load item for {power_request.energy_source} is none"
99+
logger.error(msg)
100+
return {"powers": dict(), "msg": msg}
97101

98102
model = loaded_model[output_type.name][power_request.energy_source]
99103
powers, msg = model.get_power(power_request.datapoint)
100104
if msg != "":
101105
logger.info(f"{model.model_name} failed to predict; removed: {msg}")
102106
if output_path != "" and os.path.exists(output_path):
103107
shutil.rmtree(output_path)
104-
105-
return {"powers": powers, "msg": msg}
106-
108+
response = {"powers": powers, "msg": msg}
109+
# add core_ratio if applicable
110+
core_ratio = 1
111+
if discovered_core is not None and discovered_core > 0:
112+
metadata = load_metadata(output_path)
113+
if metadata is not None and "machine_spec" in metadata:
114+
model_spec = NodeTypeSpec(**metadata["machine_spec"])
115+
model_cores = model_spec.get_cores()
116+
if model_cores > 0:
117+
core_ratio = discovered_core/model_cores
118+
logger.debug(f"model cores: {model_cores}")
119+
logger.debug(f"metadata: {metadata}")
120+
response["core_ratio"] = core_ratio
121+
122+
return response
107123

108124
class EstimatorServer:
109125
def __init__(self, socket_path, machine_spec):
110126
self.socket_path = socket_path
111127
self.machine_spec = machine_spec
112-
logger.info(f"initialize EstimatorServer with spec={machine_spec}")
128+
spec_values = discover_spec_values()
129+
discovered_spec = NodeTypeSpec(**spec_values)
130+
self.discovered_core = discovered_spec.get_cores()
131+
logger.info(f"initialize EstimatorServer with spec={machine_spec}, discovered_core={self.discovered_core}")
113132

114133
def start(self):
115134
s = self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -135,11 +154,10 @@ def accepted(self, connection):
135154
if shunk is None or shunk.decode()[-1] == "}":
136155
break
137156
decoded_data = data.decode()
138-
y = handle_request(decoded_data, self.machine_spec)
157+
y = handle_request(decoded_data, self.machine_spec, self.discovered_core)
139158
response = json.dumps(y)
140159
connection.send(response.encode())
141160

142-
143161
def clean_socket():
144162
logger.info("clean socket")
145163
if os.path.exists(SERVE_SOCKET):

src/kepler_model/estimate/model/model.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ def load_model(model_path):
165165
return None
166166

167167
metadata["model_path"] = model_path
168+
logger.info(f"load metadata {metadata}")
169+
# need to delete machine_spec before load model
170+
if "machine_spec" in metadata:
171+
del metadata["machine_spec"]
168172
metadata_str = json.dumps(metadata)
169173
try:
170174
model = json.loads(metadata_str, object_hook=lambda d: Model(**d))

src/kepler_model/server/model_server.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@
3232
is_matched_type,
3333
is_valid_model,
3434
load_json,
35+
load_metadata,
3536
load_weight,
3637
parse_filters,
3738
)
38-
from kepler_model.util.saver import WEIGHT_FILENAME
39+
from kepler_model.util.saver import WEIGHT_FILENAME, save_metadata
3940
from kepler_model.util.train_types import (
4041
FeatureGroup,
4142
FeatureGroups,
@@ -216,6 +217,13 @@ def get_model():
216217
logger.info(f"response: model {best_model['model_name']} by {best_model['features']} with {ERROR_KEY}={best_model[ERROR_KEY]} selected with uncertainty={best_uncertainty}, looseness={best_looseness}")
217218
if req.weight:
218219
try:
220+
# add this condition to provide compatibility to old version
221+
# the old version always set default node_type
222+
if req.node_type == any_node_type:
223+
best_response["model_name"] = best_model["model_name"]
224+
if "machine_spec" in best_model:
225+
best_response["machine_spec"] = best_model["machine_spec"]
226+
best_response[ERROR_KEY] = best_model[ERROR_KEY]
219227
response = app.response_class(response=json.dumps(best_response), status=200, mimetype="application/json")
220228
return response
221229
except ValueError as err:
@@ -344,7 +352,30 @@ def load_init_pipeline():
344352
# remove downloaded zip
345353
os.remove(tmp_filepath)
346354
set_pipelines()
347-
355+
fill_machine_spec()
356+
357+
def fill_machine_spec():
358+
for energy_source in PowerSourceMap.keys():
359+
if energy_source in pipelineName:
360+
pipeline_name = pipelineName[energy_source]
361+
if pipeline_name in nodeCollection:
362+
node_collection = nodeCollection[pipeline_name]
363+
for output_type in ModelOutputType:
364+
for feature_group in FeatureGroup:
365+
valid_group_path = get_model_group_path(model_toppath, output_type, feature_group, energy_source, pipeline_name=pipeline_name)
366+
for f in os.listdir(valid_group_path):
367+
path = os.path.join(valid_group_path, f)
368+
if not os.path.isfile(path):
369+
metadata = load_metadata(path)
370+
if metadata is not None:
371+
if "machine_spec" not in metadata and "model_name" in metadata:
372+
model_name = metadata["model_name"]
373+
node_type = get_node_type_from_name(model_name)
374+
if node_type in node_collection.node_type_index:
375+
metadata["machine_spec"] = node_collection.node_type_index[node_type].get_json()["attrs"]
376+
save_metadata(path, metadata)
377+
save_path = os.path.join(valid_group_path, model_name)
378+
shutil.make_archive(save_path, "zip", save_path)
348379

349380
@click.command()
350381
@click.option(

src/kepler_model/train/pipeline.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ def __init__(self, name, trainers, extractor, isolator):
4444
self.metadata["abs_trainers"] = [trainer.__class__.__name__ for trainer in trainers if trainer.node_level]
4545
self.metadata["dyn_trainers"] = [trainer.__class__.__name__ for trainer in trainers if not trainer.node_level]
4646
self.metadata["init_time"] = time_to_str(datetime.datetime.utcnow())
47+
for trainer in trainers:
48+
trainer.set_node_type_index(self.node_collection.node_type_index)
4749

4850
def get_abs_data(self, query_results, energy_components, feature_group, energy_source, aggr):
4951
extracted_data, power_labels, _, _ = self.extractor.extract(query_results, energy_components, feature_group, energy_source, node_level=True, aggr=aggr)

src/kepler_model/train/profiler/node_type_index.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,10 @@ def add_member(self, machine_id):
156156
def get_size(self):
157157
return len(self.members)
158158

159-
def get_cores(self):
160-
return self.attrs[NodeAttribute.CORES]
159+
def get_cores(self) -> int:
160+
if attr_has_value(self.attrs, NodeAttribute.CORES):
161+
return int(self.attrs[NodeAttribute.CORES])
162+
return 0
161163

162164
# check the comparing node-type spec is covered by this node-type spec
163165
def cover(self, compare_spec):

src/kepler_model/train/trainer/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ def __init__(self, model_class, energy_components, feature_group, energy_source,
6161
self.node_models = dict()
6262
self.node_scalers = dict()
6363
self.scaler_type = scaler_type
64+
self.node_type_index = dict()
65+
66+
def set_node_type_index(self, node_type_index):
67+
self.node_type_index = node_type_index
6468

6569
def _get_save_path(self, node_type):
6670
save_path = get_save_path(self.group_path, self.trainer_name, node_type=node_type)
@@ -214,13 +218,17 @@ def save_metadata(self, node_type, mae, mae_map, mape, mape_map, item):
214218
save_path = self._get_save_path(node_type)
215219
model_name, model_file = self._model_filename(node_type)
216220
item["model_name"] = model_name
221+
item["trainer"] = self.trainer_name
217222
item["model_class"] = self.model_class
218223
item["model_file"] = model_file
219224
item["features"] = self.features
220225
item["fe_files"] = [] if not hasattr(self, "fe_files") else self.fe_files
221226
item["output_type"] = self.output_type.name
222227
item["mae"] = mae
223228
item["mape"] = mape
229+
if node_type in self.node_type_index:
230+
item["node_type"] = node_type
231+
item["machine_spec"] = self.node_type_index[node_type].get_json()["attrs"]
224232
item.update(mae_map)
225233
item.update(mape_map)
226234
self.metadata = item

tests/estimator_model_request_test.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
# - kepler-model-server is not connected, but some achived models can be download via URL.
88
# - set sample model and make a dummy valid PowerRequest and another invalid PowerRequest
99
#
10+
# Requires
11+
# - run `model-server``
12+
# - run `pytest tests/pipeline_test.py` (run once to get models)
13+
# - run `MODEL_PATH=$(pwd)/src/kepler_model/models python tests/http_server.py`
14+
#
1015
#########################
1116
# import external modules
1217
import json
@@ -66,7 +71,7 @@ def test_model_request():
6671
if url != "":
6772
print("Download: ", url)
6873
response = requests.get(url)
69-
assert response.status_code == 200, "init url must be set and valid"
74+
assert response.status_code == 200, f"init url {url} must be set and valid"
7075
output_path = get_download_output_path(download_path, energy_source, output_type)
7176
if output_type_name in loaded_model and energy_source in loaded_model[output_type.name]:
7277
del loaded_model[output_type_name][energy_source]

tests/model_server_test.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
import requests
88

99
from kepler_model.server.model_server import MODEL_SERVER_PORT
10-
from kepler_model.train.profiler.node_type_index import NodeAttribute, attr_has_value
11-
from kepler_model.util.config import download_path
10+
from kepler_model.train.profiler.node_type_index import NodeAttribute, NodeTypeSpec, attr_has_value
11+
from kepler_model.util.config import ERROR_KEY, download_path
12+
from kepler_model.util.loader import any_node_type
1213
from kepler_model.util.train_types import FeatureGroup, FeatureGroups, ModelOutputType
1314

1415
TMP_FILE = "tmp.zip"
@@ -18,17 +19,28 @@ def get_model_request_json(metrics, output_type, node_type, weight, trainer_name
1819
return {"metrics": metrics, "output_type": output_type.name, "node_type": node_type, "weight": weight, "trainer_name": trainer_name, "source": energy_source}
1920

2021

21-
def make_request(metrics, output_type, node_type=-1, weight=False, trainer_name="", energy_source="rapl-sysfs"):
22+
def make_request(metrics, output_type, node_type=any_node_type, weight=False, trainer_name="", energy_source="rapl-sysfs"):
2223
model_request = get_model_request_json(metrics, output_type, node_type, weight, trainer_name, energy_source)
2324
response = requests.post(f"http://localhost:{MODEL_SERVER_PORT}/model", json=model_request)
2425
assert response.status_code == 200, response.text
2526
if weight:
2627
weight_dict = json.loads(response.text)
2728
assert len(weight_dict) > 0, "weight dict must contain one or more than one component"
28-
for weight_values in weight_dict.values():
29-
weight_length = len(weight_values["All_Weights"]["Numerical_Variables"])
30-
expected_length = len(metrics)
31-
assert weight_length <= expected_length, f"weight metrics should covered by the requested {weight_length} > {expected_length}"
29+
if node_type == any_node_type:
30+
assert "model_name" in weight_dict
31+
assert "machine_spec" in weight_dict
32+
assert ERROR_KEY in weight_dict
33+
assert len(weight_dict["model_name"]) > 0
34+
spec_values = weight_dict["machine_spec"]
35+
spec = NodeTypeSpec(**spec_values)
36+
assert spec.get_cores() > 0
37+
38+
for key, values in weight_dict.items():
39+
if key not in ["model_name", "machine_spec", ERROR_KEY]:
40+
if "All_Weights" in values:
41+
weight_length = len(values["All_Weights"]["Numerical_Variables"])
42+
expected_length = len(metrics)
43+
assert weight_length <= expected_length, f"weight metrics should covered by the requested {weight_length} > {expected_length}"
3244
else:
3345
output_path = os.path.join(download_path, output_type.name)
3446
if os.path.exists(output_path):

0 commit comments

Comments
 (0)