Skip to content

Commit 8d54790

Browse files
committed
winmssql scale support
1 parent 5765516 commit 8d54790

File tree

5 files changed

+109
-55
lines changed

5 files changed

+109
-55
lines changed

benchmark_runner/common/template_operations/templates/winmssql/windows_benchmark_runner/analyze_windows_hammerdb.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,15 @@ def get_json_files(self, hammerdb_results_path: str) -> list[str]:
2323
path = Path(hammerdb_results_path)
2424
return [str(file) for file in path.iterdir() if file.is_file() and file.suffix == '.json']
2525

26-
import re
27-
2826
def get_tpm_per_worker(self, json_files):
2927
"""
30-
Extract the maximum TPM per worker from a list of JSON files.
28+
Extract the average TPM per worker from a list of JSON files.
3129
Args:
3230
json_files (list[str]): List of JSON file paths containing HammerDB results.
3331
Returns:
34-
dict[int, int]: Dictionary mapping worker ID to its maximum TPM.
32+
dict[int, int]: Dictionary mapping worker ID to its average TPM (rounded to int).
3533
"""
36-
results = {}
34+
results = {} # worker_id -> list of TPM values
3735
for json_file in json_files:
3836
logger.info(f"Analyzing: {json_file}")
3937
try:
@@ -43,27 +41,29 @@ def get_tpm_per_worker(self, json_files):
4341
raise ValueError(f"Cannot extract worker ID from filename: {json_file}")
4442
current_worker = int(match.group(1))
4543

46-
current_max_tpm = self.extract_max_tpm(json_file)
44+
current_avg_tpm = self.extract_avg_tpm(json_file)
4745

48-
if results.get(current_worker):
49-
if current_max_tpm > results.get(current_worker):
50-
results[current_worker] = current_max_tpm
51-
else:
52-
results[current_worker] = current_max_tpm
46+
if current_worker not in results:
47+
results[current_worker] = []
48+
results[current_worker].append(current_avg_tpm)
5349

5450
except Exception as e:
5551
logger.error(f"Skipping file due to error: {json_file} -> {e}")
5652

57-
return results
53+
# Convert lists to average TPM per worker (same format as before: int values)
54+
return {
55+
worker: int(round(sum(tpms) / len(tpms))) if tpms else 0
56+
for worker, tpms in results.items()
57+
}
5858

59-
def extract_max_tpm(self, json_file):
59+
def extract_avg_tpm(self, json_file):
6060
"""
61-
Extract the maximum TPM value from a single JSON file.
61+
Extract the average TPM value from a single JSON file.
6262
Handles Windows and Linux line endings and different encodings.
6363
Args:
6464
json_file (str): Path to the JSON file containing HammerDB results.
6565
Returns:
66-
int: Maximum TPM value found in the file.
66+
float: Average TPM value found in the file.
6767
"""
6868
# Detect possible encoding
6969
try:
@@ -93,16 +93,18 @@ def extract_max_tpm(self, json_file):
9393
tpm_dict = data["MSSQLServer tpm"]
9494

9595
# Convert values to integers
96-
tpm_values = [int(v) for v in tpm_dict.values()]
97-
return max(tpm_values)
96+
tpm_values = [int(v) for v in tpm_dict.values() if int(v) > 0]
97+
if not tpm_values:
98+
return 0
99+
return sum(tpm_values) / len(tpm_values)
98100

99101
def hammerdb_results_for_elasticsearch(self, hammerdb_results: dict, output_file: str) -> list[dict]:
100102
"""
101103
Prepare HammerDB results in a format suitable for Elasticsearch
102104
and write them to a JSON file.
103105
104106
Args:
105-
hammerdb_results (dict): Dictionary of worker_id -> max TPM
107+
hammerdb_results (dict): Dictionary of worker_id -> average TPM
106108
output_file (str): Path to output JSON file (default: hammerdb_result.json)
107109
108110
Returns:

benchmark_runner/common/template_operations/templates/winmssql/windows_benchmark_runner/elasticsearch_uploader.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,17 @@ def upload_to_elasticsearch(self, index: str, data: dict) -> list[str]:
8686
for row in data:
8787
# Update row with uuid
8888
uuid = str(uuid4())
89-
data.update({'uuid': uuid, 'status': 'Succeeded'})
90-
response = self._elasticsearch.upload_to_elasticsearch(index=index, data=data, timestamp=datetime.now(timezone.utc)-timedelta(hours=8))
89+
row.update({'uuid': uuid, 'status': 'Succeeded'})
90+
response = self._elasticsearch.upload_to_elasticsearch(index=index, data=row, timestamp=datetime.now(timezone.utc)-timedelta(hours=8))
9191
uuids.append(uuid)
92+
# Log success
93+
logger.info(f"Uploaded to Elasticsearch index '{index}', response: {response}")
94+
# Update that vm data succeeded uploaded to Elasticsearch
95+
uuid = str(uuid4())
96+
response = self._elasticsearch.upload_to_elasticsearch(index=index, data={'uuid': uuid, 'vm': 'Succeeded', 'vm_os_version': 'winmssql2022'},
97+
timestamp=datetime.now(timezone.utc) - timedelta(
98+
hours=8))
99+
uuids.append(uuid)
92100
# Log success
93101
logger.info(f"Uploaded to Elasticsearch index '{index}', response: {response}")
94102
return uuids

benchmark_runner/workloads/bootstorm_vm.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,9 @@ def run_vm_workload(self):
426426
# Create VMs
427427
if self._create_vms_only:
428428
steps = (self._create_vm_scale, )
429+
elif self._only_delete_all:
430+
steps = (self._stop_vm_scale,
431+
self._wait_for_stop_vm_scale,self._delete_vm_scale, self._wait_for_delete_vm_scale)
429432
# Run VMs without deleting
430433
elif not self._delete_all:
431434
steps = (self._create_vm_scale, self._run_vm_scale)

benchmark_runner/workloads/winmssql_vm.py

Lines changed: 71 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

22
import os
33
import time
4+
from datetime import datetime, timezone, timedelta
45

56
from benchmark_runner.common.logger.logger_time_stamp import logger_time_stamp, logger
67
from benchmark_runner.common.elasticsearch.elasticsearch_exceptions import ElasticSearchDataNotUploaded
@@ -18,36 +19,57 @@ def __init__(self):
1819
if not self._windows_url:
1920
raise ValueError('Missing Windows DV URL')
2021

21-
def wait_for_windows_hammerdb_finished(self):
22+
def wait_for_windows_hammerdb_finished(self, vm_nums: int = 1):
2223
"""
23-
Wait until the Windows HammerDB workload finishes by checking the 'status' key in Elasticsearch
24+
Wait until the Windows HammerDB workload finishes by checking the 'vm' key in Elasticsearch
2425
and verifying that data is not already updated by checking key data_updated.
26+
27+
Args:
28+
vm_nums: Expected number of VMs to complete (for scale support)
29+
2530
Returns:
2631
True if the workload succeeded.
32+
2733
Raises:
2834
Windows_HammerDB_NOT_Succeeded: If the workload did not succeed within the timeout.
2935
"""
3036
current_wait_time = 0
3137

3238
while True:
33-
response = self._get_latest_resource_with_key(index=self._es_index, key='status')
34-
# Verify that winmssl elasticsearch data is uploaded by checking 'status', 'vm_os_version'
35-
# Checking that this is the latest data by verify that 'data_updated' is not True
36-
if response.get('status') == 'Succeeded' and response.get('vm_os_version') == 'winmssql2022' and str(response.get('data_updated', '')).lower() != 'true':
39+
# Get ALL documents in time window
40+
current_datetime = datetime.now(timezone.utc)
41+
start_datetime = current_datetime - timedelta(hours=1)
42+
end_datetime = current_datetime + timedelta(hours=1)
43+
44+
es_data = self._es_operations.get_index_data_between_dates(
45+
index=self._es_index,
46+
start_datetime=start_datetime,
47+
end_datetime=end_datetime
48+
)
49+
50+
# Count documents with 'vm': 'Succeeded' and data_updated != True
51+
succeeded_count = sum(
52+
1 for doc in es_data
53+
if doc.get('_source', {}).get('vm') == 'Succeeded'
54+
and doc.get('_source', {}).get('vm_os_version') == 'winmssql2022'
55+
and str(doc.get('_source', {}).get('data_updated', '')).lower() != 'true'
56+
)
57+
58+
logger.info(f'Found {succeeded_count}/{vm_nums} successful VMs')
59+
60+
if succeeded_count >= vm_nums:
3761
return True
38-
else:
39-
logger.info('Waiting for the Windows HammerDB run to finish successfully...')
4062

41-
# check timeout
63+
# Check timeout
4264
if self._timeout > 0 and current_wait_time >= self._timeout:
4365
break
4466

45-
# sleep before next check
67+
# Sleep before next check
4668
time.sleep(OC.DELAY)
4769
current_wait_time += OC.DELAY
4870

4971
raise Windows_HammerDB_NOT_Succeeded(
50-
f"HammerDB did not succeed within {self._timeout} seconds"
72+
f"Only {succeeded_count}/{vm_nums} VMs completed HammerDB successfully within {self._timeout} seconds"
5173
)
5274

5375
@logger_time_stamp
@@ -62,32 +84,47 @@ def run(self):
6284
else:
6385
self._es_index = 'hammerdb-results'
6486
self._initialize_run()
65-
# create windows dv
66-
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
67-
self._oc.wait_for_dv_status(status='Succeeded')
68-
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', f'{self._name}.yaml'))
69-
self._oc.wait_for_vm_status(vm_name=f'{self._workload_name}-{self._trunc_uuid}', status=VMStatus.Stopped)
70-
self._set_bootstorm_vm_first_run_time()
71-
self._set_bootstorm_vm_start_time(vm_name=self._vm_name)
72-
self._virtctl.start_vm_sync(vm_name=self._vm_name)
73-
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=self._vm_name, vm_node='')
74-
self._data_dict['run_artifacts_url'] = os.path.join(self._run_artifacts_url,
75-
f'{self._get_run_artifacts_hierarchy(workload_name=self._workload_name, is_file=True)}-{self._time_stamp_format}.tar.gz')
76-
self.wait_for_windows_hammerdb_finished()
77-
ids = self._get_index_ids_between_dates(index=self._es_index, key='status')
78-
# Adding data_updated=True to stamp that this data is already updated and enrich with new product versions fields
79-
for id in ids:
80-
self._update_elasticsearch_index(index=self._es_index, id=id, kind='vm', status='Succeeded', run_artifacts_url=self._data_dict['run_artifacts_url'], database='mssql', vm_name=f'{self._workload_name}-{self._trunc_uuid}', data_updated=True)
81-
self._finalize_vm()
82-
if self._delete_all:
87+
if not self._verification_only:
88+
# create windows dv
89+
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
90+
self._oc.wait_for_dv_status(status='Succeeded')
91+
if self._scale:
92+
# Just create the vms
93+
self._create_vms_only = True
94+
self.run_vm_workload()
95+
self._create_vms_only = False
96+
else:
97+
self._oc.create_async(yaml=os.path.join(f'{self._run_artifacts_path}', f'{self._name}.yaml'))
98+
self._oc.wait_for_vm_status(vm_name=f'{self._workload_name}-{self._trunc_uuid}',
99+
status=VMStatus.Stopped)
100+
self._set_bootstorm_vm_first_run_time()
101+
self._set_bootstorm_vm_start_time(vm_name=self._vm_name)
102+
self._virtctl.start_vm_sync(vm_name=self._vm_name)
103+
self._data_dict = self._get_bootstorm_vm_elapsed_time(vm_name=self._vm_name, vm_node='')
104+
vm_count = int(self._scale)*len(self._scale_node_list) if self._scale else 1
105+
self.wait_for_windows_hammerdb_finished(vm_nums=vm_count)
106+
if self._scale:
107+
ids = self._get_index_ids_between_dates(index=self._es_index, key='status')
108+
# Adding data_updated=True to stamp that this data is already updated and enrich with new product versions fields
109+
self._data_dict['run_artifacts_url'] = os.path.join(self._run_artifacts_url, f'{self._get_run_artifacts_hierarchy(workload_name=self._workload_name, is_file=True)}-{self._time_stamp_format}.tar.gz')
110+
for id in ids:
111+
self._update_elasticsearch_index(index=self._es_index, id=id, kind='vm', status='Succeeded', run_artifacts_url=self._data_dict['run_artifacts_url'], database='mssql', vm_name=f'{self._workload_name}-{self._trunc_uuid}', data_updated=True, scale=int(self._scale)*len(self._scale_node_list))
112+
self._only_delete_all=True
113+
self.run_vm_workload()
114+
else:
115+
ids = self._get_index_ids_between_dates(index=self._es_index, key='status')
116+
# Adding data_updated=True to stamp that this data is already updated and enrich with new product versions fields
117+
self._data_dict['run_artifacts_url'] = os.path.join(self._run_artifacts_url, f'{self._get_run_artifacts_hierarchy(workload_name=self._workload_name, is_file=True)}-{self._time_stamp_format}.tar.gz')
118+
for id in ids:
119+
self._update_elasticsearch_index(index=self._es_index, id=id, kind='vm', status='Succeeded', run_artifacts_url=self._data_dict['run_artifacts_url'], database='mssql', vm_name=f'{self._workload_name}-{self._trunc_uuid}', data_updated=True)
120+
self._finalize_vm()
83121
self._oc.delete_vm_sync(
84122
yaml=os.path.join(f'{self._run_artifacts_path}', f'{self._name}.yaml'),
85123
vm_name=self._vm_name)
86-
if self._delete_all:
87-
# delete windows dv
88-
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
89-
# delete namespace
90-
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))
124+
# delete windows dv
125+
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'windows_dv.yaml'))
126+
# delete namespace
127+
self._oc.delete_async(yaml=os.path.join(f'{self._run_artifacts_path}', 'namespace.yaml'))
91128
except ElasticSearchDataNotUploaded as err:
92129
self._oc.delete_vm_sync(
93130
yaml=os.path.join(f'{self._run_artifacts_path}', f'{self._name}.yaml'),

benchmark_runner/workloads/workloads_operations.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ def __init__(self):
104104
self._windows_url = self._environment_variables_dict.get('windows_url', '')
105105
self._create_vms_only = self._environment_variables_dict.get('create_vms_only', '')
106106
self._delete_all = self._environment_variables_dict.get('delete_all', '')
107+
self._only_delete_all = False
107108
self._verification_only = self._environment_variables_dict.get('verification_only', '')
108109
self._must_gather_log = self._environment_variables_dict.get('must_gather_log', '')
109110
self._test_name = self._environment_variables_dict.get('test_name', '')
@@ -485,7 +486,7 @@ def _upload_to_elasticsearch(self, index: str, kind: str, status: str, result: d
485486
self._es_operations.upload_to_elasticsearch(index=index, data=self.__get_metadata(kind=kind, status=status, result=result))
486487

487488
@logger_time_stamp
488-
def _update_elasticsearch_index(self, index: str, id: str, kind: str, status: str, run_artifacts_url: str, database: str = '', vm_name: str = '', data_updated: bool = False):
489+
def _update_elasticsearch_index(self, index: str, id: str, kind: str, status: str, run_artifacts_url: str, database: str = '', vm_name: str = '', data_updated: bool = False, scale: int = None):
489490
"""
490491
This method updates elasticsearch id
491492
:param index:
@@ -495,12 +496,15 @@ def _update_elasticsearch_index(self, index: str, id: str, kind: str, status: st
495496
:param status:
496497
:param run_artifacts_url:
497498
:param data_updated: check if data was updated
499+
:param scale: scale number
498500
:return:
499501
"""
500502
metadata = self.__get_metadata(kind=kind, database=database, status=status, run_artifacts_url=run_artifacts_url)
501503
if vm_name:
502504
metadata.update({'vm_name': vm_name})
503505
metadata.update({'data_updated': data_updated})
506+
if scale is not None:
507+
metadata.update({'scale': scale})
504508
self._es_operations.update_elasticsearch_index(index=index, id=id, metadata=metadata)
505509

506510
def _verify_elasticsearch_data_uploaded(self, index: str, uuid: str):

0 commit comments

Comments
 (0)