Skip to content
This repository was archived by the owner on Feb 20, 2023. It is now read-only.

Commit 7182657

Browse files
authored
Workload forecasting script (#1403)
1 parent 520e0da commit 7182657

File tree

12 files changed

+1045
-38
lines changed

12 files changed

+1045
-38
lines changed

Jenkinsfile

Lines changed: 79 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -423,45 +423,88 @@ pipeline {
423423
}
424424
}
425425

426-
stage('Self-Driving End-to-End Test') {
427-
agent {
428-
docker {
429-
image 'noisepage:focal'
430-
args '--cap-add sys_ptrace -v /jenkins/ccache:/home/jenkins/.ccache'
431-
}
432-
}
433-
steps {
434-
sh 'echo $NODE_NAME'
426+
stage('Self-Driving') {
427+
parallel {
428+
stage('Workload Forecasting'){
429+
agent {
430+
docker {
431+
image 'noisepage:focal'
432+
args '--cap-add sys_ptrace -v /jenkins/ccache:/home/jenkins/.ccache'
433+
}
434+
}
435+
steps {
436+
sh 'echo $NODE_NAME'
435437

436-
script{
437-
utils = utils ?: load(utilsFileName)
438-
utils.noisePageBuild(buildType:utils.RELEASE_BUILD, isBuildTests:false, isBuildSelfDrivingTests: true)
438+
script{
439+
utils = utils ?: load(utilsFileName)
440+
utils.noisePageBuild(buildType:utils.RELEASE_BUILD, isBuildTests:false)
441+
}
442+
443+
// This scripts runs TPCC benchmark with query trace enabled. It also uses SET command to turn
444+
// on query trace.
445+
// --pattern_iter determines how many times a sequence of TPCC phases is run. Set to 3 so that
446+
// enough trace could be generated for training and testing.
447+
sh script :'''
448+
cd script/forecasting
449+
./forecaster.py --gen_data --pattern_iter=3 --model_save_path=model.pickle --models=LSTM
450+
''', label: 'Generate trace and perform training'
451+
452+
sh script: 'sudo lsof -i -P -n | grep LISTEN || true', label: 'Check ports.'
453+
454+
sh script: '''
455+
cd script/forecasting
456+
./forecaster.py --test_file=query_trace.csv --model_load_path=model.pickle --test_model=LSTM
457+
''', label: 'Perform inference on the trained model'
458+
459+
sh script: 'sudo lsof -i -P -n | grep LISTEN || true', label: 'Check ports.'
460+
}
461+
post {
462+
cleanup {
463+
deleteDir()
464+
}
465+
}
439466
}
467+
stage('Modeling'){
468+
agent {
469+
docker {
470+
image 'noisepage:focal'
471+
args '--cap-add sys_ptrace -v /jenkins/ccache:/home/jenkins/.ccache'
472+
}
473+
}
474+
steps {
475+
sh 'echo $NODE_NAME'
440476

441-
// The parameters to the mini_runners target are (arbitrarily picked to complete tests within a reasonable time / picked to exercise all OUs).
442-
// Specifically, the parameters chosen are:
443-
// - mini_runner_rows_limit=100, which sets the maximal number of rows/tuples processed to be 100 (small table)
444-
// - rerun=0, which skips rerun since we are not testing benchmark performance here
445-
// - warm_num=1, which also tests the warm up phase for the mini_runners.
446-
// With the current set of parameters, the input generation process will finish under 10min
447-
sh script :'''
448-
cd build/bin
449-
../benchmark/mini_runners --mini_runner_rows_limit=100 --rerun=0 --warm_num=1
450-
''', label: 'Mini-trainer input generation'
451-
452-
sh script: 'sudo lsof -i -P -n | grep LISTEN || true', label: 'Check ports.'
453-
454-
sh script: '''
455-
cd build
456-
export BUILD_ABS_PATH=`pwd`
457-
timeout 10m ninja self_driving_test
458-
''', label: 'Running self-driving test'
459-
460-
sh script: 'sudo lsof -i -P -n | grep LISTEN || true', label: 'Check ports.'
461-
}
462-
post {
463-
cleanup {
464-
deleteDir()
477+
script{
478+
utils = utils ?: load(utilsFileName)
479+
utils.noisePageBuild(buildType:utils.RELEASE_BUILD, isBuildTests:false, isBuildSelfDrivingTests: true)
480+
}
481+
482+
// The parameters to the mini_runners target are (arbitrarily picked to complete tests within a reasonable time / picked to exercise all OUs).
483+
// Specifically, the parameters chosen are:
484+
// - mini_runner_rows_limit=100, which sets the maximal number of rows/tuples processed to be 100 (small table)
485+
// - rerun=0, which skips rerun since we are not testing benchmark performance here
486+
// - warm_num=1, which also tests the warm up phase for the mini_runners.
487+
// With the current set of parameters, the input generation process will finish under 10min
488+
sh script :'''
489+
cd build/bin
490+
../benchmark/mini_runners --mini_runner_rows_limit=100 --rerun=0 --warm_num=1
491+
''', label: 'Mini-trainer input generation'
492+
493+
sh script: 'sudo lsof -i -P -n | grep LISTEN || true', label: 'Check ports.'
494+
495+
sh script: '''
496+
cd build
497+
export BUILD_ABS_PATH=`pwd`
498+
timeout 10m ninja self_driving_test
499+
''', label: 'Running self-driving test'
500+
501+
sh script: 'sudo lsof -i -P -n | grep LISTEN || true', label: 'Check ports.'
502+
}
503+
post {
504+
cleanup {
505+
deleteDir()
506+
}
507+
}
465508
}
466509
}
467510
}

script/forecasting/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import sys
2+
from pathlib import Path
3+
sys.path.insert(0, str((Path.cwd() / '..' / 'testing').absolute()))

script/forecasting/cluster.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#!/usr/bin/env python3
2+
"""
3+
This file contains cluster related codes for the forecasting query traces. QueryCluster represents query traces of
4+
multiple queries in the same cluster.
5+
6+
TODO: clustering implementation
7+
"""
8+
9+
from typing import Dict, List
10+
import numpy as np
11+
12+
13+
class QueryCluster:
14+
"""
15+
Represents query traces from a single cluster. For queries in the same cluster, they will be aggregated
16+
into a single time-series to be used as training input for training. The time-series predicted by the model will
17+
then be converted to different query traces for a query cluster
18+
"""
19+
20+
def __init__(self, traces: Dict):
21+
"""
22+
NOTE(ricky): I believe this per-cluster query representation will change once we have clustering component
23+
added. For now, it simply takes a map of time-series data for each query id.
24+
25+
:param traces: Map of (id -> timeseries) for each query id
26+
"""
27+
self._traces = traces
28+
self._aggregate()
29+
30+
def _aggregate(self) -> None:
31+
"""
32+
Aggregate time-series of multiple queries in the same cluster into one time-series
33+
It stores the aggregated times-eries at self._timeseries, and the ratio map of queries in the
34+
same cluster at self._ratio_map
35+
"""
36+
cnt_map = {}
37+
total_cnt = 0
38+
all_series = []
39+
for qid, timeseries in self._traces.items():
40+
cnt = sum(timeseries)
41+
all_series.append(timeseries)
42+
total_cnt += cnt
43+
44+
cnt_map[qid] = cnt
45+
46+
# Sum all timeseries element-wise
47+
self._timeseries = np.array([sum(x) for x in zip(*all_series)])
48+
49+
# Compute distribution of each query id in the cluster
50+
self._ratio_map = {}
51+
for qid, cnt in cnt_map.items():
52+
self._ratio_map[qid] = cnt / total_cnt
53+
54+
def get_timeseries(self) -> np.ndarray:
55+
"""
56+
Get the aggregate time-series for this cluster
57+
:return: Time-series for the cluster
58+
"""
59+
return self._timeseries
60+
61+
def segregate(self, timeseries: List[float]) -> Dict:
62+
"""
63+
From an aggregated time-series, segregate it into multiple time-series, one for each query in the cluster.
64+
:param timeseries: Aggregated time-series
65+
:return: Time-series for each query id, dict{query id: time-series}
66+
"""
67+
result = {}
68+
for qid, ratio in self._ratio_map.items():
69+
result[qid] = list([x * ratio for x in timeseries])
70+
return result

script/forecasting/data_loader.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#!/usr/bin/env python3
2+
"""
3+
This file contains data loading logic from the query trace file produced. Hardcoded CSV format needs to be synced with
4+
query trace producer.
5+
"""
6+
7+
from util.constants import LOG
8+
9+
from typing import Dict, Tuple
10+
import numpy as np
11+
from sklearn.preprocessing import MinMaxScaler
12+
import csv
13+
14+
15+
class DataLoader:
16+
# Hardcoded query_id column index in the query_trace file
17+
QID_IDX = 0
18+
# Hardcoded timestamp column index in the query_trace file
19+
TS_IDX = 1
20+
21+
def __init__(self,
22+
interval_us: int,
23+
query_trace_file: str,
24+
) -> None:
25+
"""
26+
A Dataloader represents a query trace file. The format of the CSV is hardcoded as class attributes, e.g QID_IDX
27+
The loader transforms the timestamps in the original file into time-series for each query id.
28+
:param interval_us: Interval for the time-series
29+
:param query_trace_file: Query trace CSV file
30+
"""
31+
self._query_trace_file = query_trace_file
32+
self._interval_us = interval_us
33+
34+
data = self._load_data()
35+
self._to_timeseries(data)
36+
37+
def _load_data(self) -> np.ndarray:
38+
"""
39+
Load data from csv
40+
:return: Loaded 2D numpy array of [query_id, timestamp]
41+
"""
42+
LOG.info(f"Loading data from {self._query_trace_file}")
43+
# Load data from the files
44+
with open(self._query_trace_file, newline='') as csvfile:
45+
reader = csv.DictReader(csvfile)
46+
data = np.array(
47+
[[int(r['query_id']), int(r[' timestamp'])] for r in reader])
48+
49+
if len(data) == 0:
50+
raise ValueError("Empty trace file")
51+
52+
return data
53+
54+
def _to_timeseries(self, data: np.ndarray) -> None:
55+
"""
56+
Convert the 2D array with query id and timestamps into a map of time-series for each query id
57+
:param data: Loaded 2D numpy array of [query_id, timestamp]
58+
:return: None
59+
"""
60+
# Query trace file is sorted by timestamps
61+
start_timestamp = data[0][self.TS_IDX]
62+
end_timestamp = data[-1][self.TS_IDX]
63+
64+
if end_timestamp - start_timestamp <= 1:
65+
raise ValueError(
66+
"Empty data set with start timestamp >= end timestamp.")
67+
68+
# Number of data points in the new time-series
69+
num_buckets = (end_timestamp - start_timestamp -
70+
1) // self._interval_us + 1
71+
72+
# Iterate through the timestamps
73+
self._ts_data = {}
74+
for i in range(len(data)):
75+
t = data[i][self.TS_IDX]
76+
qid = data[i][self.QID_IDX]
77+
78+
# Initialize a new query's time-series
79+
if self._ts_data.get(qid) is None:
80+
self._ts_data[qid] = np.zeros(num_buckets)
81+
82+
# Bucket index
83+
bi = (t - start_timestamp) // self._interval_us
84+
self._ts_data[qid][bi] += 1
85+
86+
def get_ts_data(self) -> Dict:
87+
return self._ts_data

0 commit comments

Comments
 (0)