Skip to content

Commit 0cf4523

Browse files
[perfstress] Add support for per-operation latency tracking and results file output (#42554)
1 parent 20a21f4 commit 0cf4523

File tree

3 files changed

+53
-7
lines changed

3 files changed

+53
-7
lines changed

tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_batch_perf_test.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def __init__(self, arguments):
2323
self._test_proxy_policy: Optional[PerfTestProxyPolicy] = None
2424
self._client_kwargs: Dict[str, Any] = {}
2525
self._recording_id: Optional[str] = None
26+
self._latencies: List[float] = []
2627

2728
if self.args.insecure:
2829
# Disable SSL verification for SDK Client
@@ -147,8 +148,12 @@ def run_all_sync(self, duration: int, *, run_profiler: bool = False, **kwargs) -
147148
self._save_profile("sync", output_path=self.args.profile_path)
148149
self._print_profile_stats()
149150
else:
151+
self._latencies = []
150152
while self._last_completion_time < duration:
153+
start = time.perf_counter_ns()
151154
self._completed_operations += self.run_batch_sync()
155+
if self.args.latency:
156+
self._latencies.append((time.perf_counter_ns() - start) / 1_000_000)
152157
self._last_completion_time = time.time() - starttime
153158

154159
async def run_all_async(self, duration: int, *, run_profiler: bool = False, **kwargs) -> None:
@@ -168,6 +173,10 @@ async def run_all_async(self, duration: int, *, run_profiler: bool = False, **kw
168173
self._save_profile("async", output_path=self.args.profile_path)
169174
self._print_profile_stats()
170175
else:
176+
self._latencies = []
171177
while self._last_completion_time < duration:
178+
start = time.perf_counter_ns()
172179
self._completed_operations += await self.run_batch_async()
180+
if self.args.latency:
181+
self._latencies.append((time.perf_counter_ns() - start) / 1_000_000)
173182
self._last_completion_time = time.time() - starttime

tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_perf_stress_proc.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async def _start_tests(index, test_class, num_tests, args, test_stages, results,
6464
if args.warmup:
6565
# Waiting till all processes are ready to start "Warmup"
6666
_synchronize(test_stages["Warmup"])
67-
await _run_tests(args.warmup, args, tests, results, status, with_profiler=False)
67+
await _run_tests(args.warmup, args, tests, results, status, with_profiler=False, warmup=True)
6868

6969
# Waiting till all processes are ready to start "Tests"
7070
_synchronize(test_stages["Tests"])
@@ -108,7 +108,7 @@ async def _start_tests(index, test_class, num_tests, args, test_stages, results,
108108
print(f"Failed to close tests: {e}")
109109

110110

111-
async def _run_tests(duration: int, args, tests, results, status, *, with_profiler: bool = False) -> None:
111+
async def _run_tests(duration: int, args, tests, results, status, *, with_profiler: bool = False, warmup: bool = False) -> None:
112112
"""Run the listed tests either in parallel asynchronously or in a thread pool."""
113113
# Kick of a status monitoring thread.
114114
stop_status = threading.Event()
@@ -133,7 +133,9 @@ async def _run_tests(duration: int, args, tests, results, status, *, with_profil
133133

134134
# Add final test results to the results queue to be accumulated by the parent process.
135135
for test in tests:
136-
results.put((test._parallel_index, test.completed_operations, test.last_completion_time))
136+
# Don't report latencies for warmup
137+
latencies = test._latencies if not warmup else []
138+
results.put((test._parallel_index, test.completed_operations, test.last_completion_time, latencies))
137139
finally:
138140
# Clean up status reporting thread.
139141
stop_status.set()

tools/azure-sdk-tools/devtools_testutils/perfstress_tests/_perf_stress_runner.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import argparse
77
import inspect
8+
import json
89
import logging
910
import math
1011
import os
@@ -114,6 +115,14 @@ def _parse_args(self) -> str:
114115
per_test_arg_parser.add_argument(
115116
"--insecure", action="store_true", help="Disable SSL validation. Default is False.", default=False
116117
)
118+
per_test_arg_parser.add_argument(
119+
"-l", "--latency", action="store_true", help="Track per-operation latency statistics.", default=False
120+
)
121+
per_test_arg_parser.add_argument(
122+
"--results-file",
123+
type=str,
124+
help="File path location to store the results for the test run.",
125+
)
117126

118127
# Per-test args
119128
self._test_class_to_run.add_arguments(per_test_arg_parser)
@@ -264,13 +273,16 @@ async def start(self):
264273

265274
def _report_results(self):
266275
"""Calculate and log the test run results across all child processes"""
267-
operations = []
276+
total_operations = 0
277+
operations_per_second = 0.0
278+
latencies = []
268279
while not self.results.empty():
269-
operations.append(self.results.get())
280+
result: Tuple[int, int, float, List[float]] = self.results.get()
281+
total_operations += result[1]
282+
operations_per_second += result[1] / result[2] if result[2] else 0
283+
latencies.extend(result[3])
270284

271-
total_operations = self._get_completed_operations(operations)
272285
self.logger.info("")
273-
operations_per_second = self._get_operations_per_second(operations)
274286
if operations_per_second:
275287
seconds_per_operation = 1 / operations_per_second
276288
weighted_average_seconds = total_operations / operations_per_second
@@ -282,6 +294,14 @@ def _report_results(self):
282294
self._format_number(seconds_per_operation, 4),
283295
)
284296
)
297+
298+
if self.per_test_args.latency and len(latencies) > 0:
299+
self.logger.info("")
300+
self._print_latencies(latencies)
301+
if self.per_test_args.results_file:
302+
# Not all tests will have a size argument
303+
size = getattr(self.per_test_args, "size", None)
304+
self._write_results_file(self.per_test_args.results_file, latencies, size)
285305
else:
286306
self.logger.info("Completed without generating operation statistics.")
287307
self.logger.info("")
@@ -335,3 +355,18 @@ def _format_number(self, value, min_significant_digits):
335355
decimals = max(0, significant_digits - math.floor(log) - 1)
336356

337357
return ("{:,." + str(decimals) + "f}").format(rounded)
358+
359+
def _print_latencies(self, latencies: List[float]):
360+
self.logger.info("=== Latency Distribution ===")
361+
latencies.sort()
362+
363+
percentiles = [50.0, 75.0, 90.0, 95.0, 99.0, 99.9, 100.0]
364+
for p in percentiles:
365+
index = math.ceil(p / 100 * len(latencies)) - 1
366+
self.logger.info(f"{p:5.1f}% {latencies[index]:10.2f}ms")
367+
368+
def _write_results_file(self, path: str, latencies: List[float], size):
369+
data = [{"Time": l, "Size": size} for l in latencies]
370+
output = json.dumps(data, indent=2)
371+
with open(path, 'w', encoding='utf-8') as f:
372+
f.write(output)

0 commit comments

Comments
 (0)