|
19 | 19 | import pathlib
|
20 | 20 | import subprocess
|
21 | 21 | import sys
|
| 22 | +import tempfile |
22 | 23 | from typing import Dict, List, Union
|
23 | 24 |
|
24 | 25 | import numpy as np
|
@@ -50,7 +51,7 @@ def run_benchmark_subprocess(args, log_env_name_var, filename=None, region=None)
|
50 | 51 | subprocess.run(args, env=env, check=True)
|
51 | 52 |
|
52 | 53 |
|
53 |
| -def collect_benchmark_result(benchmark_path: str) -> pd.DataFrame: |
| 54 | +def collect_benchmark_result(benchmark_path: str, iterations: int) -> pd.DataFrame: |
54 | 55 | """Generate a DataFrame report on HTTP queries, bytes processed, slot time and execution time from log files."""
|
55 | 56 | path = pathlib.Path(benchmark_path)
|
56 | 57 | try:
|
@@ -100,28 +101,23 @@ def collect_benchmark_result(benchmark_path: str) -> pd.DataFrame:
|
100 | 101 |
|
101 | 102 | with open(bytes_file, "r") as file:
|
102 | 103 | lines = file.read().splitlines()
|
103 |
| - query_count = len(lines) |
104 |
| - total_bytes = sum(int(line) for line in lines) |
| 104 | + query_count = len(lines) / iterations |
| 105 | + total_bytes = sum(int(line) for line in lines) / iterations |
105 | 106 |
|
106 | 107 | with open(millis_file, "r") as file:
|
107 | 108 | lines = file.read().splitlines()
|
108 |
| - total_slot_millis = sum(int(line) for line in lines) |
| 109 | + total_slot_millis = sum(int(line) for line in lines) / iterations |
109 | 110 |
|
110 | 111 | if has_local_seconds:
|
111 |
| - # 'local_seconds' captures the total execution time for a benchmark as it |
112 |
| - # starts timing immediately before the benchmark code begins and stops |
113 |
| - # immediately after it ends. Unlike other metrics that might accumulate |
114 |
| - # values proportional to the number of queries executed, 'local_seconds' is |
115 |
| - # a singular measure of the time taken for the complete execution of the |
116 |
| - # benchmark, from start to finish. |
117 | 112 | with open(local_seconds_file, "r") as file:
|
118 |
| - local_seconds = float(file.readline().strip()) |
| 113 | + lines = file.read().splitlines() |
| 114 | + local_seconds = sum(float(line) for line in lines) / iterations |
119 | 115 | else:
|
120 | 116 | local_seconds = None
|
121 | 117 |
|
122 | 118 | with open(bq_seconds_file, "r") as file:
|
123 | 119 | lines = file.read().splitlines()
|
124 |
| - bq_seconds = sum(float(line) for line in lines) |
| 120 | + bq_seconds = sum(float(line) for line in lines) / iterations |
125 | 121 |
|
126 | 122 | results_dict[str(filename)] = [
|
127 | 123 | query_count,
|
@@ -154,7 +150,12 @@ def collect_benchmark_result(benchmark_path: str) -> pd.DataFrame:
|
154 | 150 | columns=columns,
|
155 | 151 | )
|
156 | 152 |
|
157 |
| - print("---BIGQUERY USAGE REPORT---") |
| 153 | + report_title = ( |
| 154 | + "---BIGQUERY USAGE REPORT---" |
| 155 | + if iterations == 1 |
| 156 | + else f"---BIGQUERY USAGE REPORT (Averages over {iterations} Iterations)---" |
| 157 | + ) |
| 158 | + print(report_title) |
158 | 159 | for index, row in benchmark_metrics.iterrows():
|
159 | 160 | formatted_local_exec_time = (
|
160 | 161 | f"{round(row['Local_Execution_Time_Sec'], 1)} seconds"
|
@@ -259,32 +260,53 @@ def find_config(start_path):
|
259 | 260 | return None
|
260 | 261 |
|
261 | 262 |
|
262 |
| -def run_benchmark_from_config(benchmark: str): |
| 263 | +def publish_to_bigquery(dataframe, notebook, project_name="bigframes-metrics"): |
| 264 | + bigquery_table = ( |
| 265 | + f"{project_name}.benchmark_report.notebook_benchmark" |
| 266 | + if notebook |
| 267 | + else f"{project_name}.benchmark_report.benchmark" |
| 268 | + ) |
| 269 | + |
| 270 | + repo_status = get_repository_status() |
| 271 | + for idx, col in enumerate(repo_status.keys()): |
| 272 | + dataframe.insert(idx, col, repo_status[col]) |
| 273 | + |
| 274 | + pandas_gbq.to_gbq( |
| 275 | + dataframe=dataframe, |
| 276 | + destination_table=bigquery_table, |
| 277 | + if_exists="append", |
| 278 | + ) |
| 279 | + print(f"Results have been successfully uploaded to {bigquery_table}.") |
| 280 | + |
| 281 | + |
| 282 | +def run_benchmark_from_config(benchmark: str, iterations: int): |
263 | 283 | print(benchmark)
|
264 | 284 | config_path = find_config(benchmark)
|
265 | 285 |
|
266 | 286 | if config_path:
|
267 | 287 | benchmark_configs = []
|
268 | 288 | with open(config_path, "r") as f:
|
269 | 289 | for line in f:
|
270 |
| - config = json.loads(line) |
271 |
| - python_args = [f"--{key}={value}" for key, value in config.items()] |
272 |
| - suffix = ( |
273 |
| - config["benchmark_suffix"] |
274 |
| - if "benchmark_suffix" in config |
275 |
| - else "_".join(f"{key}_{value}" for key, value in config.items()) |
276 |
| - ) |
277 |
| - benchmark_configs.append((suffix, python_args)) |
| 290 | + if line.strip(): |
| 291 | + config = json.loads(line) |
| 292 | + python_args = [f"--{key}={value}" for key, value in config.items()] |
| 293 | + suffix = ( |
| 294 | + config["benchmark_suffix"] |
| 295 | + if "benchmark_suffix" in config |
| 296 | + else "_".join(f"{key}_{value}" for key, value in config.items()) |
| 297 | + ) |
| 298 | + benchmark_configs.append((suffix, python_args)) |
278 | 299 | else:
|
279 | 300 | benchmark_configs = [(None, [])]
|
280 | 301 |
|
281 |
| - for benchmark_config in benchmark_configs: |
282 |
| - args = ["python", str(benchmark)] |
283 |
| - args.extend(benchmark_config[1]) |
284 |
| - log_env_name_var = str(benchmark) |
285 |
| - if benchmark_config[0] is not None: |
286 |
| - log_env_name_var += f"_{benchmark_config[0]}" |
287 |
| - run_benchmark_subprocess(args=args, log_env_name_var=log_env_name_var) |
| 302 | + for _ in range(iterations): |
| 303 | + for benchmark_config in benchmark_configs: |
| 304 | + args = ["python", str(benchmark)] |
| 305 | + args.extend(benchmark_config[1]) |
| 306 | + log_env_name_var = str(benchmark) |
| 307 | + if benchmark_config[0] is not None: |
| 308 | + log_env_name_var += f"_{benchmark_config[0]}" |
| 309 | + run_benchmark_subprocess(args=args, log_env_name_var=log_env_name_var) |
288 | 310 |
|
289 | 311 |
|
290 | 312 | def run_notebook_benchmark(benchmark_file: str, region: str):
|
@@ -341,35 +363,59 @@ def parse_arguments():
|
341 | 363 | help="Set the benchmarks to be published to BigQuery.",
|
342 | 364 | )
|
343 | 365 |
|
| 366 | + parser.add_argument( |
| 367 | + "--iterations", |
| 368 | + type=int, |
| 369 | + default=1, |
| 370 | + help="Number of iterations to run each benchmark.", |
| 371 | + ) |
| 372 | + parser.add_argument( |
| 373 | + "--output-csv", |
| 374 | + type=str, |
| 375 | + default=None, |
| 376 | + help="Determines whether to output results to a CSV file. If no location is provided, a temporary location is automatically generated.", |
| 377 | + ) |
| 378 | + |
344 | 379 | return parser.parse_args()
|
345 | 380 |
|
346 | 381 |
|
347 | 382 | def main():
|
348 | 383 | args = parse_arguments()
|
349 | 384 |
|
350 | 385 | if args.publish_benchmarks:
|
351 |
| - bigquery_table = ( |
352 |
| - "bigframes-metrics.benchmark_report.notebook_benchmark" |
353 |
| - if args.notebook |
354 |
| - else "bigframes-metrics.benchmark_report.benchmark" |
| 386 | + benchmark_metrics = collect_benchmark_result( |
| 387 | + args.publish_benchmarks, args.iterations |
355 | 388 | )
|
356 |
| - benchmark_metrics = collect_benchmark_result(args.publish_benchmarks) |
357 |
| - |
358 |
| - if os.getenv("BENCHMARK_AND_PUBLISH", "false") == "true": |
359 |
| - repo_status = get_repository_status() |
360 |
| - for idx, col in enumerate(repo_status.keys()): |
361 |
| - benchmark_metrics.insert(idx, col, repo_status[col]) |
362 |
| - |
363 |
| - pandas_gbq.to_gbq( |
364 |
| - dataframe=benchmark_metrics, |
365 |
| - destination_table=bigquery_table, |
366 |
| - if_exists="append", |
| 389 | + # Output results to CSV without specifying a location |
| 390 | + if args.output_csv == "True": |
| 391 | + current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") |
| 392 | + temp_file = tempfile.NamedTemporaryFile( |
| 393 | + prefix=f"benchmark_{current_time}_", delete=False, suffix=".csv" |
367 | 394 | )
|
368 |
| - print("Results have been successfully uploaded to BigQuery.") |
| 395 | + benchmark_metrics.to_csv(temp_file.name, index=False) |
| 396 | + print( |
| 397 | + f"Benchmark result is saved to a temporary location: {temp_file.name}" |
| 398 | + ) |
| 399 | + temp_file.close() |
| 400 | + # Output results to CSV with specified a custom location |
| 401 | + elif args.output_csv != "False": |
| 402 | + benchmark_metrics.to_csv(args.output_csv, index=False) |
| 403 | + print(f"Benchmark result is saved to: {args.output_csv}") |
| 404 | + |
| 405 | + # Publish the benchmark metrics to BigQuery under the 'bigframes-metrics' project. |
| 406 | + # The 'BENCHMARK_AND_PUBLISH' environment variable should be set to 'true' only |
| 407 | + # in specific Kokoro sessions. |
| 408 | + if os.getenv("BENCHMARK_AND_PUBLISH", "false") == "true": |
| 409 | + publish_to_bigquery(benchmark_metrics, args.notebook) |
| 410 | + # If the 'GCLOUD_BENCH_PUBLISH_PROJECT' environment variable is set, publish the |
| 411 | + # benchmark metrics to a specified BigQuery table in the provided project. This is |
| 412 | + # intended for local testing where the default behavior is not to publish results. |
| 413 | + elif project := os.getenv("GCLOUD_BENCH_PUBLISH_PROJECT", ""): |
| 414 | + publish_to_bigquery(benchmark_metrics, args.notebook, project) |
369 | 415 | elif args.notebook:
|
370 | 416 | run_notebook_benchmark(args.benchmark_path, args.region)
|
371 | 417 | else:
|
372 |
| - run_benchmark_from_config(args.benchmark_path) |
| 418 | + run_benchmark_from_config(args.benchmark_path, args.iterations) |
373 | 419 |
|
374 | 420 |
|
375 | 421 | if __name__ == "__main__":
|
|
0 commit comments