Skip to content

Commit 5abd240

Browse files
committed
add node type indexing
Signed-off-by: Sunyanan Choochotkaew <[email protected]>
1 parent 3930957 commit 5abd240

40 files changed

+789
-314
lines changed

.github/workflows/collect-data-self-hosted.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ jobs:
147147
working-directory: model_training/tekton
148148
run: |
149149
kubectl apply -f tasks
150-
kubectl apply -f tasks/s3-pusher
150+
kubectl apply -f tasks/s3
151151
kubectl apply -f pipelines
152152
- name: Run Tekton Pipeline with S3Push
153153
run: |
@@ -166,7 +166,7 @@ jobs:
166166
claimName: task-pvc
167167
params:
168168
- name: MODEL_SERVER_IMAGE
169-
value: ${{ inputs.model_server_image }}}
169+
value: ${{ inputs.model_server_image }}
170170
- name: COS_PROVIDER
171171
value: aws
172172
- name: COS_SECRET_NAME
@@ -177,6 +177,10 @@ jobs:
177177
name: collect-data-pipeline
178178
EOF
179179
180+
- name: Wait for PipelineRun
181+
run: |
182+
./hack/k8s_helper.sh wait_for_pipelinerun self-hosted-aws-collect
183+
180184
destroy-runner:
181185
if: always()
182186
needs: [setup-runner, collect-data]

.github/workflows/collect-train.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ jobs:
2222
aws_region: ${{ secrets.AWS_REGION }}
2323

2424
train-model:
25+
needs: [collect-data]
2526
strategy:
2627
matrix:
2728
instance_type: [i3.metal]

.github/workflows/tekton-test.yml

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ on:
3131
required: false
3232
type: string
3333
default: AbsPower
34+
trainers:
35+
description: 'Model trainer list (delimit: ,)'
36+
required: false
37+
type: string
38+
default: XgboostFitTrainer
3439

3540
env:
3641
BASE_IMAGE: ${{ inputs.image_repo }}/kepler_model_server_base:${{ inputs.image_tag }}
@@ -171,4 +176,35 @@ jobs:
171176
pipelineRef:
172177
name: single-retrain-pipeline
173178
EOF
174-
./hack/k8s_helper.sh wait_for_pipelinerun test-retrain
179+
./hack/k8s_helper.sh wait_for_pipelinerun test-retrain
180+
181+
- name: Run Tekton Complete Retrain
182+
run: |
183+
cat <<EOF | kubectl apply -f -
184+
apiVersion: tekton.dev/v1
185+
kind: PipelineRun
186+
metadata:
187+
name: test-complete-retrain
188+
spec:
189+
timeouts:
190+
pipeline: "6h"
191+
tasks: "5h50m"
192+
workspaces:
193+
- name: mnt
194+
persistentVolumeClaim:
195+
claimName: task-pvc
196+
params:
197+
- name: MODEL_SERVER_IMAGE
198+
value: $IMAGE
199+
- name: PIPELINE_NAME
200+
value: ${{ inputs.pipeline_name }}
201+
- name: MACHINE_ID
202+
value: test
203+
- name: ABS_TRAINERS
204+
value: ${{ inputs.trainers }}
205+
- name: DYN_TRAINERS
206+
value: ${{ inputs.trainers }}
207+
pipelineRef:
208+
name: complete-retrain-pipeline
209+
EOF
210+
./hack/k8s_helper.sh wait_for_pipelinerun test-complete-retrain

.github/workflows/train-model.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ jobs:
171171
claimName: task-pvc
172172
params:
173173
- name: MODEL_SERVER_IMAGE
174-
value: ${{ inputs.model_server_image }}}
174+
value: ${{ inputs.model_server_image }}
175175
- name: PIPELINE_NAME
176176
value: ${{ inputs.pipeline_name }}
177177
- name: COS_PROVIDER

cmd/cmd_util.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from util.prom_types import node_info_column, prom_responses_to_results, SOURCE_COL, energy_component_to_query
1414
from util.train_types import ModelOutputType, FeatureGroup, PowerSourceMap
15-
from util.loader import load_json, get_pipeline_path
15+
from util.loader import load_json, get_pipeline_path, default_node_type
1616
from util.saver import assure_path, save_csv
1717

1818
def print_file_to_stdout(data_path, args):
@@ -58,7 +58,6 @@ def summary_validation(validate_df):
5858
# CPU instruction is mainly used for ratio.
5959
# reference: https://github.com/sustainable-computing-io/kepler/blob/0b328cf7c79db9a11426fb80a1a922383e40197c/pkg/config/config.go#L92
6060
"hwc": "kepler_container_cpu_instructions_total",
61-
"kubelet": "kepler_container_kubelet_cpu_usage_total",
6261
"bpf": "kepler_container_bpf_cpu_time_us_total",
6362
}
6463
metric_to_validate_power = {
@@ -245,15 +244,14 @@ def assert_train(trainer, data, energy_components):
245244
import pandas as pd
246245
node_types = pd.unique(data[node_info_column])
247246
for node_type in node_types:
248-
node_type_str = int(node_type)
249247
node_type_filtered_data = data[data[node_info_column] == node_type]
250248
X_values = node_type_filtered_data[trainer.features].values
251249
for component in energy_components:
252-
output = trainer.predict(node_type_str, component, X_values)
250+
output = trainer.predict(node_type, component, X_values)
253251
if output is not None:
254252
assert len(output) == len(X_values), "length of predicted values != features ({}!={})".format(len(output), len(X_values))
255253

256-
def get_isolator(data_path, isolator, profile, pipeline_name, target_hints, bg_hints, abs_pipeline_name):
254+
def get_isolator(data_path, isolator, profile, pipeline_name, target_hints, bg_hints, abs_pipeline_name, replace_node_type=default_node_type):
257255
pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name)
258256
from train import MinIdleIsolator, NoneIsolator, DefaultProfiler, ProfileBackgroundIsolator, TrainIsolator, generate_profiles
259257
supported_isolator = {
@@ -278,7 +276,7 @@ def get_isolator(data_path, isolator, profile, pipeline_name, target_hints, bg_h
278276
if idle_data is None:
279277
print("failed to read idle data")
280278
return None
281-
profile_map = DefaultProfiler.process(idle_data, profile_top_path=pipeline_path)
279+
profile_map = DefaultProfiler.process(idle_data, profile_top_path=pipeline_path, replace_node_type=replace_node_type)
282280
profiles = generate_profiles(profile_map)
283281
profile_isolator = ProfileBackgroundIsolator(profiles, idle_data)
284282
supported_isolator[profile_isolator.get_name()] = profile_isolator
@@ -306,9 +304,9 @@ def get_extractor(extractor):
306304
return None
307305
return supported_extractor[extractor]
308306

309-
def get_pipeline(data_path, pipeline_name, extractor, profile, target_hints, bg_hints, abs_pipeline_name, isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups):
307+
def get_pipeline(data_path, pipeline_name, extractor, profile, target_hints, bg_hints, abs_pipeline_name, isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups, replace_node_type=default_node_type):
310308
from train import NewPipeline
311-
isolator = get_isolator(data_path, isolator, profile, pipeline_name, target_hints, bg_hints, abs_pipeline_name)
309+
isolator = get_isolator(data_path, isolator, profile, pipeline_name, target_hints, bg_hints, abs_pipeline_name, replace_node_type=replace_node_type)
312310
extractor = get_extractor(extractor)
313311
pipeline = NewPipeline(pipeline_name, abs_trainer_names, dyn_trainer_names, extractor=extractor, isolator=isolator, target_energy_sources=energy_sources ,valid_feature_groups=valid_feature_groups)
314312
return pipeline

cmd/main.py

Lines changed: 69 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,26 @@
66

77
data_path = "/data"
88
default_output_filename = "output"
9-
default_trainer_names = [ 'PolynomialRegressionTrainer', 'GradientBoostingRegressorTrainer', 'SGDRegressorTrainer', 'KNeighborsRegressorTrainer', 'LinearRegressionTrainer','SVRRegressorTrainer', 'XgboostFitTrainer']
10-
default_trainers = ",".join(default_trainer_names)
11-
default_version = "v0.7"
129

13-
data_path = os.getenv("CPE_DATAPATH", data_path)
10+
data_path = os.getenv("DATAPATH", data_path)
1411

1512
cur_path = os.path.join(os.path.dirname(__file__), '.')
1613
sys.path.append(cur_path)
1714
src_path = os.path.join(os.path.dirname(__file__), '..', 'src')
1815
sys.path.append(src_path)
1916

2017
from util.prom_types import PROM_SERVER, PROM_QUERY_INTERVAL, PROM_QUERY_STEP, PROM_QUERY_START_TIME, PROM_QUERY_END_TIME, PROM_HEADERS, PROM_SSL_DISABLE, PROM_THIRDPARTY_METRICS
21-
from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics
18+
from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics, node_info_column
2219
from util.extract_types import get_expected_power_columns
23-
from util.train_types import ModelOutputType, FeatureGroups, is_single_source_feature_group, SingleSourceFeatures, all_feature_groups
24-
from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_machine_path, get_preprocess_folder, get_general_filename
20+
from util.train_types import ModelOutputType, FeatureGroups, is_single_source_feature_group, all_feature_groups, default_trainers
21+
from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_preprocess_folder, get_general_filename, load_machine_spec
2522
from util.saver import save_json, save_csv, save_train_args
26-
from util.config import ERROR_KEY
23+
from util.config import ERROR_KEY, model_toppath
2724
from util import get_valid_feature_group_from_queries, PowerSourceMap
2825
from train.prom.prom_query import _range_queries
2926
from train.exporter import exporter
3027
from train import load_class
28+
from train.profiler.node_type_index import NodeTypeIndexCollection, NodeTypeSpec, generate_spec
3129

3230
from cmd_plot import ts_plot, feature_power_plot, summary_plot
3331
from cmd_util import extract_time, save_query_results, get_validate_df, summary_validation, get_extractor, check_ot_fg, get_pipeline, assert_train, get_isolator, UTC_OFFSET_TIMEDELTA
@@ -54,9 +52,15 @@
5452
- --interval : last interval in second
5553
* The priority is input > start-time,end-time > interval
5654
- --to-csv : to save converted query result in csv format
55+
- --id : specify machine ID
5756
"""
5857

5958
def query(args):
59+
if not args.id:
60+
args.id = "unknown"
61+
print("Machine ID has not defined by --id, use `unknown`")
62+
machine_id = args.id
63+
generate_spec(data_path, machine_id)
6064
from prometheus_api_client import PrometheusConnect
6165
prom = PrometheusConnect(url=args.server, headers=PROM_HEADERS, disable_ssl=PROM_SSL_DISABLE)
6266
start = None
@@ -288,6 +292,23 @@ def train_from_data(args):
288292

289293
energy_components = PowerSourceMap[args.energy_source]
290294

295+
node_type = None
296+
node_collection = None
297+
if args.id:
298+
machine_id = args.id
299+
pipeline_path = get_pipeline_path(model_toppath, pipeline_name=args.pipeline_name)
300+
node_collection = NodeTypeIndexCollection(pipeline_path)
301+
machine_spec_json = load_machine_spec(data_path, machine_id)
302+
if machine_spec_json is not None:
303+
new_spec = NodeTypeSpec()
304+
new_spec.load(machine_spec_json)
305+
node_type = node_collection.index_train_machine(machine_id, new_spec)
306+
print("Replace {} with {}".format(node_info_column, node_type))
307+
data[node_info_column] = int(node_type)
308+
309+
if node_type is None:
310+
print("Machine ID has not defined by --id or machine spec is not available, do not auto-replace node_type")
311+
291312
trainers = args.trainers.split(",")
292313
metadata_list = []
293314
for trainer in trainers:
@@ -300,6 +321,10 @@ def train_from_data(args):
300321
metadata_df = pd.concat(metadata_list)
301322
print_cols = ["model_name", "mae", "mape"]
302323
print(metadata_df[print_cols])
324+
325+
if node_collection is not None:
326+
print("Save node index")
327+
node_collection.save()
303328

304329
"""
305330
train
@@ -322,6 +347,7 @@ def train_from_data(args):
322347
- --dyn-trainers : specify a list of trainers for training DynPower models (use comma(,) as delimiter) - default: apply all available trainers
323348
- --energy-source : specify target energy sources (use comma(,) as delimiter)
324349
- --thirdparty-metrics : specify list of third party metric to export (required only for ThirdParty feature group)
350+
- --id : specify machine ID
325351
"""
326352

327353
def train(args):
@@ -366,14 +392,28 @@ def train(args):
366392

367393
abs_trainer_names = args.abs_trainers.split(",")
368394
dyn_trainer_names = args.dyn_trainers.split(",")
369-
pipeline = get_pipeline(data_path, pipeline_name, args.extractor, args.profile, args.target_hints, args.bg_hints, args.abs_pipeline_name, args.isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups)
395+
396+
node_type=None
397+
if args.id:
398+
machine_id = args.id
399+
pipeline = get_pipeline(data_path, pipeline_name, args.extractor, args.profile, args.target_hints, args.bg_hints, args.abs_pipeline_name, args.isolator, abs_trainer_names, dyn_trainer_names, energy_sources, valid_feature_groups)
400+
machine_spec_json = load_machine_spec(data_path, machine_id)
401+
if machine_spec_json is not None:
402+
new_spec = NodeTypeSpec()
403+
new_spec.load(machine_spec_json)
404+
node_type = pipeline.node_collection.index_train_machine(machine_id, new_spec)
405+
node_type = int(node_type)
406+
407+
if node_type is None:
408+
print("Machine ID has not defined by --id or machine spec is not available, do not auto-replace node_type")
409+
370410
if pipeline is None:
371411
print("cannot get pipeline")
372412
exit()
373413
for energy_source in energy_sources:
374414
energy_components = PowerSourceMap[energy_source]
375415
for feature_group in valid_feature_groups:
376-
success, abs_data, dyn_data = pipeline.process_multiple_query(input_query_results_list, energy_components, energy_source, feature_group=feature_group.name)
416+
success, abs_data, dyn_data = pipeline.process_multiple_query(input_query_results_list, energy_components, energy_source, feature_group=feature_group.name, replace_node_type=node_type)
377417
assert success, "failed to process pipeline {}".format(pipeline.name)
378418
for trainer in pipeline.trainers:
379419
if trainer.feature_group == feature_group and trainer.energy_source == energy_source:
@@ -396,6 +436,8 @@ def train(args):
396436
print("Train args:", argparse_dict)
397437
# save metadata
398438
pipeline.save_metadata()
439+
# save node collection
440+
pipeline.node_collection.save()
399441
# save pipeline
400442
pipeline.archive_pipeline()
401443
print_cols = ["feature_group", "model_name", "mae", "mape"]
@@ -650,19 +692,17 @@ def plot(args):
650692
export preprocessed data and trained models to the kepler-model-db path
651693
652694
arguments:
653-
- --id : specify machine ID
654695
- --pipeline-name : specify pipeline name that contains the trained models
655696
- --output : specify kepler-model-db/models in local path
656697
- --publisher : specify publisher (github) account
657698
- --benchmark : specify benchmark file that contains data of start time and end time which could be either of the following format
658699
- CPE benchmark resource in json if you run workload with CPE-operator (https://github.com/IBM/cpe-operator)
659700
- custom benchmark in json with `startTimeUTC` and `endTimeUTC` data
701+
- --collect-date : specify collection time manually in UTC
702+
- --input : specify kepler query response file (output of `query` function) - optional
660703
"""
661704

662705
def export(args):
663-
if not args.id:
664-
print("need to specify --id")
665-
exit()
666706

667707
if not args.pipeline_name:
668708
print("need to specify pipeline name via -p or --pipeline-name")
@@ -677,43 +717,22 @@ def export(args):
677717
print("need to specify --publisher")
678718
exit()
679719

680-
if not args.benchmark:
681-
print("need to specify --benchmark to extract collection time")
720+
if args.benchmark:
721+
collect_date, _ = extract_time(data_path, args.benchmark)
722+
elif args.collect_date:
723+
collect_date = args.collect_date
724+
else:
725+
print("need to specify --benchmark or --collect-date")
682726
exit()
683727

728+
inputs = []
729+
if args.input:
730+
inputs = args.input.split(",")
731+
684732
pipeline_name = args.pipeline_name
685-
machine_id = args.id
686733
pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name)
687-
machine_path = get_machine_path(output_path, args.version, machine_id)
688-
689-
collect_date, _ = extract_time(data_path, args.benchmark)
690-
exporter.export(data_path, pipeline_path, machine_path, machine_id=machine_id, version=args.version, publisher=args.publisher, collect_date=collect_date, include_raw=args.include_raw)
691-
692-
args.energy_source = ",".join(PowerSourceMap.keys())
693-
out_pipeline_path = os.path.join(machine_path, pipeline_name)
694-
695-
for ot in ModelOutputType:
696-
args.output_type = ot.name
697734

698-
# plot preprocess data
699-
args.target_data = "preprocess"
700-
args.output = get_preprocess_folder(out_pipeline_path)
701-
plot(args)
702-
703-
# plot error
704-
args.target_data = "error"
705-
args.output = os.path.join(out_pipeline_path, "error_summary")
706-
plot(args)
707-
708-
709-
args.target_data = "estimate"
710-
args.output = os.path.join(out_pipeline_path, "best_estimation")
711-
for ot in ModelOutputType:
712-
args.output_type = ot.name
713-
# plot estimate
714-
for feature_group in SingleSourceFeatures:
715-
args.feature_group = feature_group
716-
plot(args)
735+
exporter.export(data_path, pipeline_path, output_path, publisher=args.publisher, collect_date=collect_date, inputs=inputs)
717736

718737
"""
719738
plot_scenario
@@ -847,10 +866,11 @@ def plot_scenario(args):
847866
parser.add_argument("--scenario", type=str, help="Speficy scenario")
848867

849868
# Export arguments
850-
parser.add_argument("--id", type=str, help="specify machine id")
851-
parser.add_argument("--version", type=str, help="Specify model server version.", default=default_version)
852869
parser.add_argument("--publisher", type=str, help="Specify github account of model publisher")
853870
parser.add_argument("--include-raw", type=bool, help="Include raw query data")
871+
parser.add_argument("--collect-date", type=str, help="Specify collect date directly")
872+
873+
parser.add_argument("--id", type=str, help="specify machine id")
854874

855875
# Parse the command-line arguments
856876
args = parser.parse_args()
@@ -867,7 +887,7 @@ def plot_scenario(args):
867887
os.makedirs(data_path)
868888
print("create new folder for data: {}".format(data_path))
869889
else:
870-
print("{} must be mount, add -v \"$(pwd)\":{} .".format(data_path, data_path))
890+
print("{0} not exists. For docker run, {0} must be mount, add -v \"$(pwd)\":{0}. For native run, set DATAPATH".format(data_path))
871891
exit()
872892
getattr(sys.modules[__name__], args.command)(args)
873893

0 commit comments

Comments
 (0)