66
77data_path = "/data"
88default_output_filename = "output"
9- default_trainer_names = [ 'PolynomialRegressionTrainer' , 'GradientBoostingRegressorTrainer' , 'SGDRegressorTrainer' , 'KNeighborsRegressorTrainer' , 'LinearRegressionTrainer' ,'SVRRegressorTrainer' , 'XgboostFitTrainer' ]
10- default_trainers = "," .join (default_trainer_names )
11- default_version = "v0.7"
129
13- data_path = os .getenv ("CPE_DATAPATH " , data_path )
10+ data_path = os .getenv ("DATAPATH " , data_path )
1411
1512cur_path = os .path .join (os .path .dirname (__file__ ), '.' )
1613sys .path .append (cur_path )
1714src_path = os .path .join (os .path .dirname (__file__ ), '..' , 'src' )
1815sys .path .append (src_path )
1916
2017from util .prom_types import PROM_SERVER , PROM_QUERY_INTERVAL , PROM_QUERY_STEP , PROM_QUERY_START_TIME , PROM_QUERY_END_TIME , PROM_HEADERS , PROM_SSL_DISABLE , PROM_THIRDPARTY_METRICS
21- from util .prom_types import metric_prefix as KEPLER_METRIC_PREFIX , prom_responses_to_results , TIMESTAMP_COL , feature_to_query , update_thirdparty_metrics
18+ from util .prom_types import metric_prefix as KEPLER_METRIC_PREFIX , prom_responses_to_results , TIMESTAMP_COL , feature_to_query , update_thirdparty_metrics , node_info_column
2219from util .extract_types import get_expected_power_columns
23- from util .train_types import ModelOutputType , FeatureGroups , is_single_source_feature_group , SingleSourceFeatures , all_feature_groups
24- from util .loader import load_json , DEFAULT_PIPELINE , load_pipeline_metadata , get_pipeline_path , get_model_group_path , list_pipelines , list_model_names , load_metadata , load_csv , get_machine_path , get_preprocess_folder , get_general_filename
20+ from util .train_types import ModelOutputType , FeatureGroups , is_single_source_feature_group , all_feature_groups , default_trainers
21+ from util .loader import load_json , DEFAULT_PIPELINE , load_pipeline_metadata , get_pipeline_path , get_model_group_path , list_pipelines , list_model_names , load_metadata , load_csv , get_preprocess_folder , get_general_filename , load_machine_spec
2522from util .saver import save_json , save_csv , save_train_args
26- from util .config import ERROR_KEY
23+ from util .config import ERROR_KEY , model_toppath
2724from util import get_valid_feature_group_from_queries , PowerSourceMap
2825from train .prom .prom_query import _range_queries
2926from train .exporter import exporter
3027from train import load_class
28+ from train .profiler .node_type_index import NodeTypeIndexCollection , NodeTypeSpec , generate_spec
3129
3230from cmd_plot import ts_plot , feature_power_plot , summary_plot
3331from cmd_util import extract_time , save_query_results , get_validate_df , summary_validation , get_extractor , check_ot_fg , get_pipeline , assert_train , get_isolator , UTC_OFFSET_TIMEDELTA
5452 - --interval : last interval in second
5553 * The priority is input > start-time,end-time > interval
5654- --to-csv : to save converted query result in csv format
55+ - --id : specify machine ID
5756"""
5857
5958def query (args ):
59+ if not args .id :
60+ args .id = "unknown"
61+ print ("Machine ID has not defined by --id, use `unknown`" )
62+ machine_id = args .id
63+ generate_spec (data_path , machine_id )
6064 from prometheus_api_client import PrometheusConnect
6165 prom = PrometheusConnect (url = args .server , headers = PROM_HEADERS , disable_ssl = PROM_SSL_DISABLE )
6266 start = None
@@ -288,6 +292,23 @@ def train_from_data(args):
288292
289293 energy_components = PowerSourceMap [args .energy_source ]
290294
295+ node_type = None
296+ node_collection = None
297+ if args .id :
298+ machine_id = args .id
299+ pipeline_path = get_pipeline_path (model_toppath , pipeline_name = args .pipeline_name )
300+ node_collection = NodeTypeIndexCollection (pipeline_path )
301+ machine_spec_json = load_machine_spec (data_path , machine_id )
302+ if machine_spec_json is not None :
303+ new_spec = NodeTypeSpec ()
304+ new_spec .load (machine_spec_json )
305+ node_type = node_collection .index_train_machine (machine_id , new_spec )
306+ print ("Replace {} with {}" .format (node_info_column , node_type ))
307+ data [node_info_column ] = int (node_type )
308+
309+ if node_type is None :
310+ print ("Machine ID has not defined by --id or machine spec is not available, do not auto-replace node_type" )
311+
291312 trainers = args .trainers .split ("," )
292313 metadata_list = []
293314 for trainer in trainers :
@@ -300,6 +321,10 @@ def train_from_data(args):
300321 metadata_df = pd .concat (metadata_list )
301322 print_cols = ["model_name" , "mae" , "mape" ]
302323 print (metadata_df [print_cols ])
324+
325+ if node_collection is not None :
326+ print ("Save node index" )
327+ node_collection .save ()
303328
304329"""
305330train
@@ -322,6 +347,7 @@ def train_from_data(args):
322347- --dyn-trainers : specify a list of trainers for training DynPower models (use comma(,) as delimiter) - default: apply all available trainers
323348- --energy-source : specify target energy sources (use comma(,) as delimiter)
324349- --thirdparty-metrics : specify list of third party metric to export (required only for ThirdParty feature group)
350+ - --id : specify machine ID
325351"""
326352
327353def train (args ):
@@ -366,14 +392,28 @@ def train(args):
366392
367393 abs_trainer_names = args .abs_trainers .split ("," )
368394 dyn_trainer_names = args .dyn_trainers .split ("," )
369- pipeline = get_pipeline (data_path , pipeline_name , args .extractor , args .profile , args .target_hints , args .bg_hints , args .abs_pipeline_name , args .isolator , abs_trainer_names , dyn_trainer_names , energy_sources , valid_feature_groups )
395+
396+ node_type = None
397+ if args .id :
398+ machine_id = args .id
399+ pipeline = get_pipeline (data_path , pipeline_name , args .extractor , args .profile , args .target_hints , args .bg_hints , args .abs_pipeline_name , args .isolator , abs_trainer_names , dyn_trainer_names , energy_sources , valid_feature_groups )
400+ machine_spec_json = load_machine_spec (data_path , machine_id )
401+ if machine_spec_json is not None :
402+ new_spec = NodeTypeSpec ()
403+ new_spec .load (machine_spec_json )
404+ node_type = pipeline .node_collection .index_train_machine (machine_id , new_spec )
405+ node_type = int (node_type )
406+
407+ if node_type is None :
408+ print ("Machine ID has not defined by --id or machine spec is not available, do not auto-replace node_type" )
409+
370410 if pipeline is None :
371411 print ("cannot get pipeline" )
372412 exit ()
373413 for energy_source in energy_sources :
374414 energy_components = PowerSourceMap [energy_source ]
375415 for feature_group in valid_feature_groups :
376- success , abs_data , dyn_data = pipeline .process_multiple_query (input_query_results_list , energy_components , energy_source , feature_group = feature_group .name )
416+ success , abs_data , dyn_data = pipeline .process_multiple_query (input_query_results_list , energy_components , energy_source , feature_group = feature_group .name , replace_node_type = node_type )
377417 assert success , "failed to process pipeline {}" .format (pipeline .name )
378418 for trainer in pipeline .trainers :
379419 if trainer .feature_group == feature_group and trainer .energy_source == energy_source :
@@ -396,6 +436,8 @@ def train(args):
396436 print ("Train args:" , argparse_dict )
397437 # save metadata
398438 pipeline .save_metadata ()
439+ # save node collection
440+ pipeline .node_collection .save ()
399441 # save pipeline
400442 pipeline .archive_pipeline ()
401443 print_cols = ["feature_group" , "model_name" , "mae" , "mape" ]
@@ -650,19 +692,17 @@ def plot(args):
650692 export preprocessed data and trained models to the kepler-model-db path
651693
652694arguments:
653- - --id : specify machine ID
654695- --pipeline-name : specify pipeline name that contains the trained models
655696- --output : specify kepler-model-db/models in local path
656697- --publisher : specify publisher (github) account
657698- --benchmark : specify benchmark file that contains data of start time and end time which could be either of the following format
658699 - CPE benchmark resource in json if you run workload with CPE-operator (https://github.com/IBM/cpe-operator)
659700 - custom benchmark in json with `startTimeUTC` and `endTimeUTC` data
701+ - --collect-date : specify collection time manually in UTC
702+ - --input : specify kepler query response file (output of `query` function) - optional
660703"""
661704
662705def export (args ):
663- if not args .id :
664- print ("need to specify --id" )
665- exit ()
666706
667707 if not args .pipeline_name :
668708 print ("need to specify pipeline name via -p or --pipeline-name" )
@@ -677,43 +717,22 @@ def export(args):
677717 print ("need to specify --publisher" )
678718 exit ()
679719
680- if not args .benchmark :
681- print ("need to specify --benchmark to extract collection time" )
720+ if args .benchmark :
721+ collect_date , _ = extract_time (data_path , args .benchmark )
722+ elif args .collect_date :
723+ collect_date = args .collect_date
724+ else :
725+ print ("need to specify --benchmark or --collect-date" )
682726 exit ()
683727
728+ inputs = []
729+ if args .input :
730+ inputs = args .input .split ("," )
731+
684732 pipeline_name = args .pipeline_name
685- machine_id = args .id
686733 pipeline_path = get_pipeline_path (data_path , pipeline_name = pipeline_name )
687- machine_path = get_machine_path (output_path , args .version , machine_id )
688-
689- collect_date , _ = extract_time (data_path , args .benchmark )
690- exporter .export (data_path , pipeline_path , machine_path , machine_id = machine_id , version = args .version , publisher = args .publisher , collect_date = collect_date , include_raw = args .include_raw )
691-
692- args .energy_source = "," .join (PowerSourceMap .keys ())
693- out_pipeline_path = os .path .join (machine_path , pipeline_name )
694-
695- for ot in ModelOutputType :
696- args .output_type = ot .name
697734
698- # plot preprocess data
699- args .target_data = "preprocess"
700- args .output = get_preprocess_folder (out_pipeline_path )
701- plot (args )
702-
703- # plot error
704- args .target_data = "error"
705- args .output = os .path .join (out_pipeline_path , "error_summary" )
706- plot (args )
707-
708-
709- args .target_data = "estimate"
710- args .output = os .path .join (out_pipeline_path , "best_estimation" )
711- for ot in ModelOutputType :
712- args .output_type = ot .name
713- # plot estimate
714- for feature_group in SingleSourceFeatures :
715- args .feature_group = feature_group
716- plot (args )
735+ exporter .export (data_path , pipeline_path , output_path , publisher = args .publisher , collect_date = collect_date , inputs = inputs )
717736
718737"""
719738plot_scenario
@@ -847,10 +866,11 @@ def plot_scenario(args):
847866 parser .add_argument ("--scenario" , type = str , help = "Speficy scenario" )
848867
849868 # Export arguments
850- parser .add_argument ("--id" , type = str , help = "specify machine id" )
851- parser .add_argument ("--version" , type = str , help = "Specify model server version." , default = default_version )
852869 parser .add_argument ("--publisher" , type = str , help = "Specify github account of model publisher" )
853870 parser .add_argument ("--include-raw" , type = bool , help = "Include raw query data" )
871+ parser .add_argument ("--collect-date" , type = str , help = "Specify collect date directly" )
872+
873+ parser .add_argument ("--id" , type = str , help = "specify machine id" )
854874
855875 # Parse the command-line arguments
856876 args = parser .parse_args ()
@@ -867,7 +887,7 @@ def plot_scenario(args):
867887 os .makedirs (data_path )
868888 print ("create new folder for data: {}" .format (data_path ))
869889 else :
870- print ("{} must be mount, add -v \" $(pwd)\" :{} . " .format (data_path , data_path ))
890+ print ("{0} not exists. For docker run, {0} must be mount, add -v \" $(pwd)\" :{0}. For native run, set DATAPATH " .format (data_path ))
871891 exit ()
872892 getattr (sys .modules [__name__ ], args .command )(args )
873893
0 commit comments