|
8 | 8 | import multiprocessing as mp |
9 | 9 | import multiprocessing.connection as mpc |
10 | 10 | from dataclasses import dataclass |
| 11 | +from typing import Any |
11 | 12 |
|
12 | 13 | import sqlalchemy |
13 | 14 |
|
14 | 15 | import dpbench.config as cfg |
15 | | -from dpbench.infrastructure.benchmark import Benchmark, _exec, _exec_simple |
| 16 | +from dpbench.infrastructure.benchmark import Benchmark |
16 | 17 | from dpbench.infrastructure.benchmark_results import BenchmarkResults |
17 | 18 | from dpbench.infrastructure.benchmark_validation import validate_results |
18 | 19 | from dpbench.infrastructure.datamodel import store_results |
19 | 20 | from dpbench.infrastructure.enums import ErrorCodes, ValidationStatusCodes |
20 | 21 | from dpbench.infrastructure.frameworks import Framework |
21 | 22 | from dpbench.infrastructure.frameworks.fabric import build_framework |
22 | 23 | from dpbench.infrastructure.runner import _print_results |
| 24 | +from dpbench.infrastructure.timer import timer |
23 | 25 |
|
24 | 26 | """ |
25 | 27 | Send on process creation: |
@@ -387,3 +389,175 @@ def run_benchmark_and_save( |
387 | 389 | else "n/a", |
388 | 390 | ), |
389 | 391 | ) |
| 392 | + |
| 393 | + |
| 394 | +def _set_input_args( |
| 395 | + bench: Benchmark, framework: Framework, np_input_data: dict |
| 396 | +): |
| 397 | + inputs = dict() |
| 398 | + |
| 399 | + for arg in bench.info.input_args: |
| 400 | + if arg in bench.info.array_args: |
| 401 | + inputs[arg] = framework.copy_to_func()(np_input_data[arg]) |
| 402 | + else: |
| 403 | + inputs[arg] = np_input_data[arg] |
| 404 | + |
| 405 | + return inputs |
| 406 | + |
| 407 | + |
| 408 | +def _reset_output_args( |
| 409 | + bench: Benchmark, framework: Framework, inputs: dict, np_input_data: dict |
| 410 | +): |
| 411 | + for arg in bench.info.output_args: |
| 412 | + overwritten_data = inputs.get(arg, None) |
| 413 | + if overwritten_data is None or arg not in bench.info.array_args: |
| 414 | + continue |
| 415 | + inputs[arg] = framework.copy_to_func()(np_input_data[arg]) |
| 416 | + |
| 417 | + |
| 418 | +def _array_size(array: Any) -> int: |
| 419 | + try: |
| 420 | + return array.nbytes |
| 421 | + except AttributeError: |
| 422 | + return array.size * array.itemsize |
| 423 | + |
| 424 | + |
| 425 | +def _exec_simple( |
| 426 | + bench: Benchmark, |
| 427 | + framework: Framework, |
| 428 | + impl_postfix: str, |
| 429 | + preset: str, |
| 430 | +): |
| 431 | + np_input_data = bench.get_input_data(preset=preset) |
| 432 | + inputs = _set_input_args(bench, framework, np_input_data) |
| 433 | + impl_fn = bench.get_implementation(impl_postfix) |
| 434 | + |
| 435 | + try: |
| 436 | + retval = framework.execute(impl_fn, inputs) |
| 437 | + results_dict = {} |
| 438 | + |
| 439 | + _exec_copy_output(bench, framework, retval, inputs, results_dict) |
| 440 | + |
| 441 | + return results_dict["outputs"] |
| 442 | + except Exception: |
| 443 | + logging.exception("Benchmark execution failed at the warmup step.") |
| 444 | + return None |
| 445 | + |
| 446 | + |
| 447 | +def _exec( |
| 448 | + bench: Benchmark, |
| 449 | + framework: Framework, |
| 450 | + impl_postfix: str, |
| 451 | + preset: str, |
| 452 | + repeat: int, |
| 453 | + results_dict: dict, |
| 454 | + copy_output: bool, |
| 455 | +): |
| 456 | + """Executes a benchmark for a given implementation. |
| 457 | +
|
| 458 | + A helper function to execute a benchmark. The function is called in a |
| 459 | + separate sub-process by a BenchmarkRunner instance. The ``_exec`` function |
| 460 | + first runs the benchmark implementation function once as a warmup and then |
| 461 | + performs the specified number of repetitions. The output results are reset |
| 462 | + before each repetition and the final output is serialized into a npz |
| 463 | + (compressed NumPy data file) file. |
| 464 | +
|
| 465 | + All timing results and the path to the serialized results are written to |
| 466 | + the results_dict input argument that is managed by the calling process. |
| 467 | +
|
| 468 | + Args: |
| 469 | + bench : A Benchmark object representing the benchmark to be executed. |
| 470 | + framework : A Framework for which the benchmark is to be executed. |
| 471 | + impl_postfix : The identifier for the benchmark implementation. |
| 472 | + preset : A problem size entry defined in the bench_info JSON. |
| 473 | + timeout : Number of seconds after which the execution is killed. |
| 474 | + repeat : Number of repetitions of the benchmark execution. |
| 475 | + precision: The precision to use for benchmark input data. |
| 476 | + args : Input arguments to benchmark implementation function. |
| 477 | + results_dict : A dictionary where timing and other results are stored. |
| 478 | + copy_output : A flag that controls copying output. |
| 479 | + """ |
| 480 | + np_input_data = bench.get_input_data(preset=preset) |
| 481 | + |
| 482 | + with timer() as t: |
| 483 | + inputs = _set_input_args(bench, framework, np_input_data) |
| 484 | + results_dict["setup_time"] = t.get_elapsed_time() |
| 485 | + |
| 486 | + input_size = 0 |
| 487 | + for arg in bench.info.array_args: |
| 488 | + input_size += _array_size(bench.bdata[preset][arg]) |
| 489 | + |
| 490 | + results_dict["input_size"] = input_size |
| 491 | + |
| 492 | + impl_fn = bench.get_implementation(impl_postfix) |
| 493 | + |
| 494 | + # Warmup |
| 495 | + with timer() as t: |
| 496 | + try: |
| 497 | + framework.execute(impl_fn, inputs) |
| 498 | + except Exception: |
| 499 | + logging.exception("Benchmark execution failed at the warmup step.") |
| 500 | + results_dict["error_state"] = ErrorCodes.FAILED_EXECUTION |
| 501 | + results_dict["error_msg"] = "Execution failed" |
| 502 | + return |
| 503 | + |
| 504 | + results_dict["warmup_time"] = t.get_elapsed_time() |
| 505 | + |
| 506 | + _reset_output_args(bench, framework, inputs, np_input_data) |
| 507 | + |
| 508 | + exec_times = [0] * repeat |
| 509 | + |
| 510 | + retval = None |
| 511 | + for i in range(repeat): |
| 512 | + with timer() as t: |
| 513 | + retval = framework.execute(impl_fn, inputs) |
| 514 | + exec_times[i] = t.get_elapsed_time() |
| 515 | + |
| 516 | + # Do not reset the output from the last repeat |
| 517 | + if i < repeat - 1: |
| 518 | + _reset_output_args(bench, framework, inputs, np_input_data) |
| 519 | + |
| 520 | + results_dict["exec_times"] = exec_times |
| 521 | + |
| 522 | + # Get the output data |
| 523 | + results_dict["teardown_time"] = 0.0 |
| 524 | + if copy_output: |
| 525 | + _exec_copy_output(bench, framework, retval, inputs, results_dict) |
| 526 | + |
| 527 | + results_dict["error_state"] = ErrorCodes.SUCCESS |
| 528 | + results_dict["error_msg"] = "" |
| 529 | + |
| 530 | + |
| 531 | +def _exec_copy_output( |
| 532 | + bench: Benchmark, |
| 533 | + fmwrk: Framework, |
| 534 | + retval, |
| 535 | + inputs: dict, |
| 536 | + results_dict: dict, |
| 537 | +): |
| 538 | + output_arrays = dict() |
| 539 | + with timer() as t: |
| 540 | + for out_arg in bench.info.output_args: |
| 541 | + if out_arg in bench.info.array_args: |
| 542 | + output_arrays[out_arg] = fmwrk.copy_from_func()(inputs[out_arg]) |
| 543 | + |
| 544 | + # Special case: if the benchmark implementation returns anything, then |
| 545 | + # add that to the results dict |
| 546 | + if retval is not None: |
| 547 | + output_arrays["return-value"] = convert_to_numpy(retval, fmwrk) |
| 548 | + |
| 549 | + results_dict["outputs"] = output_arrays |
| 550 | + results_dict["teardown_time"] = t.get_elapsed_time() |
| 551 | + |
| 552 | + |
| 553 | +def convert_to_numpy(value: any, fmwrk: Framework) -> any: |
| 554 | + """Calls copy_from_func on all array values.""" |
| 555 | + if isinstance(value, tuple): |
| 556 | + retval_list = list(value) |
| 557 | + for i, _ in enumerate(retval_list): |
| 558 | + retval_list[i] = fmwrk.copy_from_func()(retval_list[i]) |
| 559 | + value = tuple(retval_list) |
| 560 | + else: |
| 561 | + value = fmwrk.copy_from_func()(value) |
| 562 | + |
| 563 | + return value |
0 commit comments