Skip to content
This repository was archived by the owner on Feb 20, 2023. It is now read-only.

Commit 711df3e

Browse files
authored
Save and load data_info along with the mini models (#1436)
1 parent 72bebca commit 711df3e

11 files changed

+190
-177
lines changed

script/model/data_class/grouped_op_unit_data.py

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ def _default_get_global_data(filename, sample_interval=0):
4646
df = pd.read_csv(filename)
4747
file_name = os.path.splitext(os.path.basename(filename))[0]
4848

49-
x = df.iloc[:, :-data_info.METRICS_OUTPUT_NUM].values
50-
y = df.iloc[:, -data_info.MINI_MODEL_TARGET_NUM:].values
49+
x = df.iloc[:, :-data_info.instance.METRICS_OUTPUT_NUM].values
50+
y = df.iloc[:, -data_info.instance.MINI_MODEL_TARGET_NUM:].values
5151

5252
# Construct the new data
5353
opunit = OpUnit[file_name.upper()]
@@ -66,14 +66,14 @@ def _txn_get_mini_runner_data(filename, txn_sample_interval):
6666
# prepending a column of ones as the base transaction data feature
6767
base_x = pd.DataFrame(data=np.ones((df.shape[0], 1), dtype=int))
6868
df = pd.concat([base_x, df], axis=1)
69-
x = df.iloc[:, :-data_info.METRICS_OUTPUT_NUM].values
70-
y = df.iloc[:, -data_info.MINI_MODEL_TARGET_NUM:].values
71-
start_times = df.iloc[:, data_info.TARGET_CSV_INDEX[data_info.Target.START_TIME]].values
72-
cpu_ids = df.iloc[:, data_info.TARGET_CSV_INDEX[data_info.Target.CPU_ID]].values
69+
x = df.iloc[:, :-data_info.instance.METRICS_OUTPUT_NUM].values
70+
y = df.iloc[:, -data_info.instance.MINI_MODEL_TARGET_NUM:].values
71+
start_times = df.iloc[:, data_info.instance.target_csv_index[data_info.instance.Target.START_TIME]].values
72+
cpu_ids = df.iloc[:, data_info.instance.target_csv_index[data_info.instance.Target.CPU_ID]].values
7373

7474
logging.info("Loaded file: {}".format(OpUnit[file_name.upper()]))
7575

76-
interval = data_info.CONTENDING_OPUNIT_INTERVAL
76+
interval = data_info.instance.CONTENDING_OPUNIT_INTERVAL
7777

7878
# Map from interval start time to the data in this interval
7979
interval_x_map = {}
@@ -123,15 +123,14 @@ def _pipeline_get_grouped_op_unit_data(filename, warmup_period, ee_sample_interv
123123
data_list = []
124124
with open(filename, "r") as f:
125125
reader = csv.reader(f, delimiter=",", skipinitialspace=True)
126-
indexes = next(reader)
127-
data_info.parse_csv_header(indexes, True)
128-
features_vector_index = data_info.RAW_FEATURES_CSV_INDEX[ExecutionFeature.FEATURES]
129-
input_output_boundary = data_info.RAW_FEATURES_CSV_INDEX[data_info.INPUT_OUTPUT_BOUNDARY]
130-
input_end_boundary = len(data_info.INPUT_CSV_INDEX)
126+
next(reader)
127+
features_vector_index = data_info.instance.raw_features_csv_index[ExecutionFeature.FEATURES]
128+
input_output_boundary = data_info.instance.raw_features_csv_index[data_info.instance.INPUT_OUTPUT_BOUNDARY]
129+
input_end_boundary = len(data_info.instance.input_csv_index)
131130

132131
for line in reader:
133132
# extract the time
134-
cpu_time = line[data_info.RAW_TARGET_CSV_INDEX[Target.START_TIME]]
133+
cpu_time = line[data_info.instance.raw_target_csv_index[Target.START_TIME]]
135134
if start_time is None:
136135
start_time = cpu_time
137136

@@ -144,7 +143,7 @@ def _pipeline_get_grouped_op_unit_data(filename, warmup_period, ee_sample_interv
144143
record = [d for i,d in enumerate(line) if i >= input_output_boundary]
145144
data = list(map(data_util.convert_string_to_numeric, record))
146145
x_multiple = data[:input_end_boundary]
147-
metrics = np.array(data[-data_info.METRICS_OUTPUT_NUM:])
146+
metrics = np.array(data[-data_info.instance.METRICS_OUTPUT_NUM:])
148147

149148
# Get the opunits located within
150149
opunits = []
@@ -156,12 +155,12 @@ def _pipeline_get_grouped_op_unit_data(filename, warmup_period, ee_sample_interv
156155

157156
opunit = OpUnit[feature]
158157
x_loc = [v[idx] if type(v) == list else v for v in x_multiple]
159-
if x_loc[data_info.INPUT_CSV_INDEX[ExecutionFeature.NUM_ROWS]] == 0:
158+
if x_loc[data_info.instance.input_csv_index[ExecutionFeature.NUM_ROWS]] == 0:
160159
logging.info("Skipping {} OU with 0 tuple num".format(opunit.name))
161160
continue
162161

163162
if opunit == OpUnit.CREATE_INDEX:
164-
concurrency = x_loc[data_info.CONCURRENCY_INDEX]
163+
concurrency = x_loc[data_info.instance.CONCURRENCY_INDEX]
165164
# TODO(lin): we won't do sampling for CREATE_INDEX. We probably should encapsulate this when
166165
# generating the data
167166
sample_interval = 0
@@ -189,15 +188,13 @@ def _pipeline_get_grouped_op_unit_data(filename, warmup_period, ee_sample_interv
189188
def _interval_get_grouped_op_unit_data(filename):
190189
# In the default case, the data does not need any pre-processing and the file name indicates the opunit
191190
df = pd.read_csv(filename, skipinitialspace=True)
192-
headers = list(df.columns.values)
193-
data_info.parse_csv_header(headers, False)
194191
file_name = os.path.splitext(os.path.basename(filename))[0]
195192

196-
x = df.iloc[:, :-data_info.METRICS_OUTPUT_NUM].values
197-
y = df.iloc[:, -data_info.MINI_MODEL_TARGET_NUM:].values
198-
start_times = df.iloc[:, data_info.TARGET_CSV_INDEX[Target.START_TIME]].values
199-
cpu_ids = df.iloc[:, data_info.TARGET_CSV_INDEX[Target.CPU_ID]].values
200-
interval = data_info.PERIODIC_OPUNIT_INTERVAL
193+
x = df.iloc[:, :-data_info.instance.METRICS_OUTPUT_NUM].values
194+
y = df.iloc[:, -data_info.instance.MINI_MODEL_TARGET_NUM:].values
195+
start_times = df.iloc[:, data_info.instance.target_csv_index[Target.START_TIME]].values
196+
cpu_ids = df.iloc[:, data_info.instance.target_csv_index[Target.CPU_ID]].values
197+
interval = data_info.instance.PERIODIC_OPUNIT_INTERVAL
201198

202199
# Map from interval start time to the data in this interval
203200
interval_x_map = {}
@@ -248,9 +245,9 @@ def __init__(self, name, opunit_features, metrics, sample_interval=0, concurrenc
248245
"""
249246
self.name = name
250247
self.opunit_features = opunit_features
251-
self.y = metrics[-data_info.MINI_MODEL_TARGET_NUM:]
248+
self.y = metrics[-data_info.instance.MINI_MODEL_TARGET_NUM:]
252249
self.y_pred = None
253-
index_map = data_info.TARGET_CSV_INDEX
250+
index_map = data_info.instance.target_csv_index
254251
self.start_time = metrics[index_map[Target.START_TIME]]
255252
self.end_time = self.start_time + self.y[index_map[Target.ELAPSED_US]] - 1
256253
self.cpu_id = int(metrics[index_map[Target.CPU_ID]])
@@ -282,7 +279,7 @@ def get_end_time(self, concurrent_counting_mode):
282279
if concurrent_counting_mode is ConcurrentCountingMode.EXACT:
283280
end_time = self.end_time
284281
if concurrent_counting_mode is ConcurrentCountingMode.ESTIMATED:
285-
end_time = self.start_time + self.y_pred[data_info.TARGET_CSV_INDEX[Target.ELAPSED_US]] - 1
282+
end_time = self.start_time + self.y_pred[data_info.instance.target_csv_index[Target.ELAPSED_US]] - 1
286283
if concurrent_counting_mode is ConcurrentCountingMode.INTERVAL:
287284
end_time = self.start_time + global_model_config.INTERVAL_START + global_model_config.INTERVAL_SIZE
288285
return end_time

script/model/data_class/opunit_data.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ def _default_get_mini_runner_data(filename):
5353
# In the default case, the data does not need any pre-processing and the file name indicates the opunit
5454
df = pd.read_csv(filename, skipinitialspace=True)
5555
headers = list(df.columns.values)
56-
data_info.parse_csv_header(headers, False)
56+
data_info.instance.parse_csv_header(headers, False)
5757
file_name = os.path.splitext(os.path.basename(filename))[0]
5858

59-
x = df.iloc[:, :-data_info.METRICS_OUTPUT_NUM].values
60-
y = df.iloc[:, -data_info.MINI_MODEL_TARGET_NUM:].values
59+
x = df.iloc[:, :-data_info.instance.METRICS_OUTPUT_NUM].values
60+
y = df.iloc[:, -data_info.instance.MINI_MODEL_TARGET_NUM:].values
6161

6262
return [OpUnitData(OpUnit[file_name.upper()], x, y)]
6363

@@ -70,18 +70,18 @@ def _txn_get_mini_runner_data(filename, model_results_path, txn_sample_interval)
7070
# prepending a column of ones as the base transaction data feature
7171
base_x = pd.DataFrame(data=np.ones((df.shape[0], 1), dtype=int))
7272
df = pd.concat([base_x, df], axis=1)
73-
x = df.iloc[:, :-data_info.METRICS_OUTPUT_NUM].values
74-
y = df.iloc[:, -data_info.MINI_MODEL_TARGET_NUM:].values
75-
start_times = df.iloc[:, data_info.TARGET_CSV_INDEX[data_info.Target.START_TIME]].values
76-
cpu_ids = df.iloc[:, data_info.TARGET_CSV_INDEX[data_info.Target.CPU_ID]].values
73+
x = df.iloc[:, :-data_info.instance.METRICS_OUTPUT_NUM].values
74+
y = df.iloc[:, -data_info.instance.MINI_MODEL_TARGET_NUM:].values
75+
start_times = df.iloc[:, data_info.instance.TARGET_CSV_INDEX[data_info.instance.Target.START_TIME]].values
76+
cpu_ids = df.iloc[:, data_info.instance.TARGET_CSV_INDEX[data_info.instance.Target.CPU_ID]].values
7777

7878
logging.info("Loaded file: {}".format(OpUnit[file_name.upper()]))
7979

8080
# change the data based on the interval for the periodically invoked operating units
8181
prediction_path = "{}/{}_txn_converted_data.csv".format(model_results_path, file_name)
8282
io_util.create_csv_file(prediction_path, [""])
8383

84-
interval = data_info.CONTENDING_OPUNIT_INTERVAL
84+
interval = data_info.instance.CONTENDING_OPUNIT_INTERVAL
8585

8686
# Map from interval start time to the data in this interval
8787
interval_x_map = {}
@@ -119,19 +119,19 @@ def _interval_get_mini_runner_data(filename, model_results_path):
119119
# In the default case, the data does not need any pre-processing and the file name indicates the opunit
120120
df = pd.read_csv(filename, skipinitialspace=True)
121121
headers = list(df.columns.values)
122-
data_info.parse_csv_header(headers, False)
122+
data_info.instance.parse_csv_header(headers, False)
123123
file_name = os.path.splitext(os.path.basename(filename))[0]
124124

125-
x = df.iloc[:, :-data_info.METRICS_OUTPUT_NUM].values
126-
y = df.iloc[:, -data_info.MINI_MODEL_TARGET_NUM:].values
127-
start_times = df.iloc[:, data_info.RAW_TARGET_CSV_INDEX[Target.START_TIME]].values
125+
x = df.iloc[:, :-data_info.instance.METRICS_OUTPUT_NUM].values
126+
y = df.iloc[:, -data_info.instance.MINI_MODEL_TARGET_NUM:].values
127+
start_times = df.iloc[:, data_info.instance.RAW_TARGET_CSV_INDEX[Target.START_TIME]].values
128128
logging.info("Loaded file: {}".format(OpUnit[file_name.upper()]))
129129

130130
# change the data based on the interval for the periodically invoked operating units
131131
prediction_path = "{}/{}_interval_converted_data.csv".format(model_results_path, file_name)
132132
io_util.create_csv_file(prediction_path, [""])
133133

134-
interval = data_info.PERIODIC_OPUNIT_INTERVAL
134+
interval = data_info.instance.PERIODIC_OPUNIT_INTERVAL
135135

136136
# Map from interval start time to the data in this interval
137137
interval_x_map = {}
@@ -179,17 +179,17 @@ def _execution_get_mini_runner_data(filename, model_map, predict_cache, trim):
179179
with open(filename, "r") as f:
180180
reader = csv.reader(f, delimiter=",", skipinitialspace=True)
181181
indexes = next(reader)
182-
data_info.parse_csv_header(indexes, True)
183-
features_vector_index = data_info.RAW_FEATURES_CSV_INDEX[ExecutionFeature.FEATURES]
184-
raw_boundary = data_info.RAW_FEATURES_CSV_INDEX[data_info.INPUT_OUTPUT_BOUNDARY]
185-
input_output_boundary = len(data_info.INPUT_CSV_INDEX)
182+
data_info.instance.parse_csv_header(indexes, True)
183+
features_vector_index = data_info.instance.raw_features_csv_index[ExecutionFeature.FEATURES]
184+
raw_boundary = data_info.instance.raw_features_csv_index[data_info.instance.INPUT_OUTPUT_BOUNDARY]
185+
input_output_boundary = len(data_info.instance.input_csv_index)
186186

187187
for line in reader:
188188
# drop query_id, pipeline_id, num_features, features_vector
189189
record = [d for i, d in enumerate(line) if i >= raw_boundary]
190190
data = list(map(data_util.convert_string_to_numeric, record))
191191
x_multiple = data[:input_output_boundary]
192-
y_merged = np.array(data[-data_info.MINI_MODEL_TARGET_NUM:])
192+
y_merged = np.array(data[-data_info.instance.MINI_MODEL_TARGET_NUM:])
193193

194194
# Get the opunits located within
195195
opunits = []
@@ -254,7 +254,7 @@ def _execution_get_mini_runner_data(filename, model_map, predict_cache, trim):
254254
for opunit, values in data_map.items():
255255
np_value = np.array(values)
256256
x = np_value[:, :input_output_boundary]
257-
y = np_value[:, -data_info.MINI_MODEL_TARGET_NUM:]
257+
y = np_value[:, -data_info.instance.MINI_MODEL_TARGET_NUM:]
258258
data_list.append(OpUnitData(opunit, x, y))
259259

260260
return data_list

script/model/endtoend_estimator.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def estimate(self):
4545
self.model_results_path,
4646
0,
4747
False,
48+
False,
4849
self.ee_sample_interval,
4950
self.txn_sample_interval,
5051
self.network_sample_interval)
@@ -88,7 +89,7 @@ def _model_prediction_with_derived_data(self, impact_data_list, model_name, mode
8889
data_list.append(d.target_grouped_op_unit_data)
8990
mini_model_y_pred.append(d.target_grouped_op_unit_data.y_pred)
9091
raw_y.append(d.target_grouped_op_unit_data.y)
91-
predicted_elapsed_us = mini_model_y_pred[-1][data_info.TARGET_CSV_INDEX[Target.ELAPSED_US]]
92+
predicted_elapsed_us = mini_model_y_pred[-1][data_info.instance.target_csv_index[Target.ELAPSED_US]]
9293
predicted_resource_util = None
9394
if model_name == "impact":
9495
predicted_resource_util = d.get_y_pred()
@@ -100,9 +101,9 @@ def _model_prediction_with_derived_data(self, impact_data_list, model_name, mode
100101
predicted_resource_util[:mini_model_y_pred[-1].shape[0]] -= self_resource
101102
predicted_resource_util[predicted_resource_util < 0] = 0
102103
x.append(np.concatenate((mini_model_y_pred[-1] / predicted_elapsed_us,
103-
predicted_resource_util,
104-
d.resource_util_same_core_x)))
105-
#x.append(np.concatenate((mini_model_y_pred[-1] / predicted_elapsed_us, predicted_resource_util)))
104+
predicted_resource_util,
105+
d.resource_util_same_core_x)))
106+
# x.append(np.concatenate((mini_model_y_pred[-1] / predicted_elapsed_us, predicted_resource_util)))
106107
y.append(d.target_grouped_op_unit_data.y / (d.target_grouped_op_unit_data.y_pred +
107108
global_model_config.RATIO_DIVISION_EPSILON))
108109

@@ -154,7 +155,7 @@ def _record_results(self, x, y, y_pred, raw_y, mini_model_y_pred, label, data_li
154155
ratio_error = np.abs(raw_y - raw_y_pred) / (raw_y + epsilon)
155156
avg_ratio_error = np.average(ratio_error, axis=0)
156157
accumulated_percentage_error = np.abs(accumulated_raw_y - accumulated_raw_y_pred) / (
157-
accumulated_raw_y + epsilon)
158+
accumulated_raw_y + epsilon)
158159
original_accumulated_percentage_error = np.abs(accumulated_raw_y - np.sum(mini_model_y_pred, axis=0)) / (
159160
accumulated_raw_y + epsilon)
160161

@@ -175,8 +176,8 @@ def _record_results(self, x, y, y_pred, raw_y, mini_model_y_pred, label, data_li
175176
prediction_path = "{}/grouped_opunit_prediction.csv".format(self.model_results_path)
176177
io_util.create_csv_file(prediction_path, ["Pipeline", "", "Actual", "", "Predicted", "", "Ratio Error"])
177178
for i, data in enumerate(data_list):
178-
io_util.write_csv_result(prediction_path, data.name, [""] + list(raw_y[i]) + [""] +
179-
list(raw_y_pred[i]) + [""] + list(ratio_error[i]))
179+
io_util.write_csv_result(prediction_path, data.name, [""] + list(raw_y[i]) + [""] +
180+
list(raw_y_pred[i]) + [""] + list(ratio_error[i]))
180181

181182
average_result_path = "{}/interval_average_prediction.csv".format(self.model_results_path)
182183
io_util.create_csv_file(average_result_path,
@@ -185,7 +186,7 @@ def _record_results(self, x, y, y_pred, raw_y, mini_model_y_pred, label, data_li
185186
interval_y_map = {}
186187
interval_y_pred_map = {}
187188
mark_list = None
188-
#mark_list = _generate_mark_list(data_list)
189+
# mark_list = _generate_mark_list(data_list)
189190
for i, data in enumerate(data_list):
190191
# Don't count the create index OU
191192
# TODO(lin): needs better way to evaluate... maybe add a id_query field to GroupedOpunitData
@@ -268,7 +269,7 @@ def _generate_mark_list(data_list):
268269
logging_util.init_logging(args.log)
269270

270271
with open(args.mini_model_file, 'rb') as pickle_file:
271-
model_map = pickle.load(pickle_file)
272+
model_map, data_info.instance = pickle.load(pickle_file)
272273
with open(args.global_resource_model_file, 'rb') as pickle_file:
273274
resource_model = pickle.load(pickle_file)
274275
with open(args.global_impact_model_file, 'rb') as pickle_file:

0 commit comments

Comments
 (0)