diff --git a/Makefile b/Makefile index 6d7978771c..031ad94c3b 100644 --- a/Makefile +++ b/Makefile @@ -184,3 +184,34 @@ test-exp-ssh: test-pipelines: make test TEST_PACKAGES="./cmd/pipelines/..." ACCEPTANCE_TEST_FILTER="TestAccept/pipelines" + + +# Benchmarks: + +bench1k: + BENCHMARK_PARAMS="--jobs 1000" go test ./acceptance -v -tail -run TestAccept/bundle/benchmarks -timeout=120m + +bench100: + BENCHMARK_PARAMS="--jobs 100" go test ./acceptance -v -tail -run TestAccept/bundle/benchmarks -timeout=120m + +# small benchmark to quickly test benchmark-related code +bench10: + BENCHMARK_PARAMS="--jobs 10" go test ./acceptance -v -tail -run TestAccept/bundle/benchmarks -timeout=120m + +bench1k.log: + make bench1k | tee $@ + +bench100.log: + make bench100 | tee $@ + +bench10.log: + make bench10 | tee $@ + +bench1k_summary: bench1k.log + ./tools/bench_parse.py $< + +bench100_summary: bench100.log + ./tools/bench_parse.py $< + +bench10_summary: bench10.log + ./tools/bench_parse.py $< diff --git a/acceptance/README.md b/acceptance/README.md index c8c17e310c..8e33d273a7 100644 --- a/acceptance/README.md +++ b/acceptance/README.md @@ -22,6 +22,19 @@ Any file starting with "LOG" will be logged to test log (visible with go test -v See [selftest](./selftest) for more examples. +## Benchmarks + +Benchmarks are regular acceptance test that log measurements in certain format. The output can be fed to `tools/bench_parse.py` to print a summary table. + +Test runner recognizes benchmark as having "benchmark" anywhere in the path. For these tests parallel execution is disabled if and only if BENCHMARK\_PARAMS variable is set. + +The benchmarks make use of two scripts: + +- `gen_config.py —jobs N` to generate a config with N jobs +- `benchmark.py` command to run command a few times and log the time measurements. + +The default number of runs in benchmark.py depends on BENCHMARK\_PARAMS variable. If it’s set, the default number is 5. Otherwise it is 1. + ## Running acceptance tests on Windows To run the acceptance tests from a terminal on Windows (eg. Git Bash from VS Code), diff --git a/acceptance/acceptance_test.go b/acceptance/acceptance_test.go index de02e455b8..0885d0c6e4 100644 --- a/acceptance/acceptance_test.go +++ b/acceptance/acceptance_test.go @@ -61,6 +61,9 @@ var InprocessMode bool // lines with this prefix are not recorded in output.txt but logged instead const TestLogPrefix = "TESTLOG: " +// In benchmark mode we disable parallel run of all tests that contain work "benchmark" in their path +var benchmarkMode = os.Getenv("BENCHMARK_PARAMS") != "" + func init() { flag.BoolVar(&InprocessMode, "inprocess", false, "Run CLI in the same process as test (for debugging)") flag.BoolVar(&KeepTmp, "keeptmp", false, "Do not delete TMP directory after run") @@ -328,7 +331,13 @@ func testAccept(t *testing.T, inprocessMode bool, singleTest string) int { t.Skip(skipReason) } - if !inprocessMode { + runParallel := !inprocessMode + + if benchmarkMode && strings.Contains(dir, "benchmark") { + runParallel = false + } + + if runParallel { t.Parallel() } @@ -344,7 +353,7 @@ func testAccept(t *testing.T, inprocessMode bool, singleTest string) int { for ind, envset := range expanded { envname := strings.Join(envset, "/") t.Run(envname, func(t *testing.T) { - if !inprocessMode { + if runParallel { t.Parallel() } runTest(t, dir, ind, coverDir, repls.Clone(), config, envset, envFilters) diff --git a/acceptance/bin/benchmark.py b/acceptance/bin/benchmark.py new file mode 100755 index 0000000000..bad87fd8b2 --- /dev/null +++ b/acceptance/bin/benchmark.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +import argparse +import subprocess +import time +import statistics +import sys +import os +import json + +try: + import resource +except ImportError: + # n/a on windows + resource = None + + +def run_benchmark(command, warmup, runs): + times = [] + + for i in range(runs): + # double fork to reset max statistics like ru_maxrss + cp = subprocess.run([sys.executable, sys.argv[0], "--once"] + command, stdout=subprocess.PIPE) + if cp.returncode != 0: + sys.exit(cp.returncode) + + try: + result = json.loads(cp.stdout) + except Exception: + print(f"Failed to parse: {cp.stdout!r}") + raise + + run = f"Run #{i} (warm): " if i < warmup else f"Run #{i} (count):" + + result_formatted = " ".join(f"{key}={value}" for (key, value) in result.items()) + + print(f"TESTLOG: {run} {result_formatted}") + + if i >= warmup: + times.append(result["wall"]) + + if not times: + print("No times recorded") + return + + if len(times) > 1: + mean = statistics.mean(times) + stdev = statistics.stdev(times) + min_time = min(times) + max_time = max(times) + + print(f"TESTLOG: Benchmark: {command}") + print(f"TESTLOG: Time (mean ± σ): {mean:.3f} s ± {stdev:.3f} s") + print(f"TESTLOG: Range (min … max): {min_time:.3f} s … {max_time:.3f} s {len(times)} runs", flush=True) + + +def run_once(command): + if len(command) == 1 and " " in command[0] or ">" in command[0]: + shell = True + command = command[0] + else: + shell = False + + if resource: + rusage_before = resource.getrusage(resource.RUSAGE_CHILDREN) + + with open("LOG.process", "a") as log: + start = time.perf_counter() + result = subprocess.run(command, shell=shell, stdout=log, stderr=log) + end = time.perf_counter() + + if result.returncode != 0: + print(f"Error: command failed with exit code {result.returncode}", file=sys.stderr) + sys.exit(result.returncode) + + result = {"wall": end - start} + + if resource: + rusage_after = resource.getrusage(resource.RUSAGE_CHILDREN) + + result.update( + { + "ru_utime": rusage_after.ru_utime - rusage_before.ru_utime, + "ru_stime": rusage_after.ru_stime - rusage_before.ru_stime, + # maxrss returns largest process, so subtracting is not correct since rusage_before will be reporting different process + "ru_maxrss": rusage_after.ru_maxrss, + } + ) + + return result + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--warmup", type=int, default=1) + parser.add_argument("--runs", type=int) + parser.add_argument("--once", action="store_true") + parser.add_argument("command", nargs="+") + args = parser.parse_args() + + if args.once: + assert not args.runs + result = run_once(args.command) + print(json.dumps(result)) + return + + if args.runs is None: + if os.environ.get("BENCHMARK_PARAMS"): + args.runs = 5 + else: + args.runs = 1 + + if args.warmup >= args.runs: + args.warmup = min(1, args.runs - 1) + + run_benchmark(args.command, args.warmup, args.runs) + + +if __name__ == "__main__": + main() diff --git a/acceptance/bin/gen_config.py b/acceptance/bin/gen_config.py new file mode 100755 index 0000000000..84a7262271 --- /dev/null +++ b/acceptance/bin/gen_config.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +import argparse +import json +import copy + +JOB_TEMPLATE_BASE = { + "description": "This job contain multiple tasks that are required to produce the weekly shark sightings report.", + "email_notifications": { + "no_alert_for_skipped_runs": False, + "on_failure": ["user.name@databricks.com"], + "on_success": ["user.name@databricks.com"], + }, + "job_clusters": [ + { + "job_cluster_key": "auto_scaling_cluster", + "new_cluster": { + "autoscale": {"max_workers": 16, "min_workers": 2}, + "node_type_id": "i3.xlarge", + "spark_conf": {"spark.speculation": "true"}, + "spark_version": "13.3.x-scala2.12", + }, + } + ], + "max_concurrent_runs": 10, + "name": "A multitask job", + "notification_settings": {"no_alert_for_canceled_runs": False, "no_alert_for_skipped_runs": False}, + "parameters": [{"default": "users", "name": "table"}], + "tags": {"cost-center": "engineering", "team": "jobs"}, + "tasks": [ + { + "depends_on": [], + "description": "Extracts session data from events", + "job_cluster_key": "auto_scaling_cluster", + "libraries": [{"jar": "dbfs:/mnt/databricks/Sessionize.jar"}], + "max_retries": 3, + "min_retry_interval_millis": 2000, + "retry_on_timeout": False, + "spark_jar_task": { + "main_class_name": "com.databricks.Sessionize", + "parameters": ["--data", "dbfs:/path/to/data.json"], + }, + "task_key": "Sessionize", + "timeout_seconds": 86400, + }, + { + "depends_on": [], + "description": "Ingests order data", + "job_cluster_key": "auto_scaling_cluster", + "libraries": [{"jar": "dbfs:/mnt/databricks/OrderIngest.jar"}], + "max_retries": 3, + "min_retry_interval_millis": 2000, + "retry_on_timeout": False, + "spark_jar_task": { + "main_class_name": "com.databricks.OrdersIngest", + "parameters": ["--data", "dbfs:/path/to/order-data.json"], + }, + "task_key": "Orders_Ingest", + "timeout_seconds": 86400, + }, + { + "depends_on": [{"task_key": "Orders_Ingest"}, {"task_key": "Sessionize"}], + "description": "Matches orders with user sessions", + "max_retries": 3, + "min_retry_interval_millis": 2000, + "new_cluster": { + "autoscale": {"max_workers": 16, "min_workers": 2}, + "node_type_id": "i3.xlarge", + "spark_conf": {"spark.speculation": "true"}, + "spark_version": "13.3.x-scala2.12", + }, + "notebook_task": { + "base_parameters": {"age": "35", "name": "John Doe"}, + "notebook_path": "/Users/user.name@databricks.com/Match", + }, + "retry_on_timeout": False, + "run_if": "ALL_SUCCESS", + "task_key": "Match", + "timeout_seconds": 86400, + }, + ], + "timeout_seconds": 86400, +} + + +def gen_config(n): + jobs = {} + for i in range(n): + job = copy.deepcopy(JOB_TEMPLATE_BASE) + job["name"] = f"job_{i}" + + # Odd jobs use continuous, even jobs use schedule + if i % 2 == 1: + job["continuous"] = {"pause_status": "UNPAUSED"} + else: + job["schedule"] = { + "pause_status": "UNPAUSED", + "quartz_cron_expression": "20 30 * * * ?", + "timezone_id": "Europe/London", + } + + jobs[f"job_{i}"] = job + + config = {"bundle": {"name": "test-bundle"}, "resources": {"jobs": jobs}} + + return config + + +def print_yaml(obj, indent=0, list_item=False): + indent_str = " " * indent + + if isinstance(obj, dict): + first = True + for key, value in obj.items(): + if list_item and first: + prefix = indent_str + "- " + first = False + elif list_item: + prefix = indent_str + " " + else: + prefix = indent_str + nested_indent = indent + 2 if list_item else indent + 1 + if isinstance(value, (dict, list)) and value: + print(f"{prefix}{key}:") + print_yaml(value, nested_indent) + else: + print(f"{prefix}{key}: {json.dumps(value)}") + elif isinstance(obj, list): + for item in obj: + if isinstance(item, (dict, list)): + print_yaml(item, indent, list_item=True) + else: + print(f"{indent_str}- {json.dumps(item)}") + else: + prefix = f"{indent_str}- " if list_item else indent_str + print(f"{prefix}{json.dumps(obj)}") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--jobs", type=int, default=10, help="Number of jobs to generate") + args = parser.parse_args() + + print_yaml(gen_config(args.jobs)) + + +if __name__ == "__main__": + main() diff --git a/acceptance/bundle/benchmarks/deploy/out.test.toml b/acceptance/bundle/benchmarks/deploy/out.test.toml new file mode 100644 index 0000000000..40bb0d1047 --- /dev/null +++ b/acceptance/bundle/benchmarks/deploy/out.test.toml @@ -0,0 +1,8 @@ +Local = true +Cloud = false + +[GOOS] + windows = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/bundle/benchmarks/deploy/output.txt b/acceptance/bundle/benchmarks/deploy/output.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/acceptance/bundle/benchmarks/deploy/script b/acceptance/bundle/benchmarks/deploy/script new file mode 100755 index 0000000000..5de5a7d955 --- /dev/null +++ b/acceptance/bundle/benchmarks/deploy/script @@ -0,0 +1,6 @@ +gen_config.py ${BENCHMARK_PARAMS:-} > databricks.yml +wc -l databricks.yml >> LOG.wc +# Note, since testserver persists state for the duration of the test, .databricks is kept and benchmark.py skips first run as a warmup, this measures time +# it takes for no-changes deploy. +# Note, terraform is set up by the test runner, so this time does not include TF download time. +benchmark.py $CLI bundle deploy diff --git a/acceptance/bundle/benchmarks/plan/out.test.toml b/acceptance/bundle/benchmarks/plan/out.test.toml new file mode 100644 index 0000000000..40bb0d1047 --- /dev/null +++ b/acceptance/bundle/benchmarks/plan/out.test.toml @@ -0,0 +1,8 @@ +Local = true +Cloud = false + +[GOOS] + windows = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/bundle/benchmarks/plan/output.txt b/acceptance/bundle/benchmarks/plan/output.txt new file mode 100644 index 0000000000..89b1365657 --- /dev/null +++ b/acceptance/bundle/benchmarks/plan/output.txt @@ -0,0 +1,2 @@ + +>>> benchmark.py $CLI bundle plan > /dev/null diff --git a/acceptance/bundle/benchmarks/plan/script b/acceptance/bundle/benchmarks/plan/script new file mode 100755 index 0000000000..e3d6828f19 --- /dev/null +++ b/acceptance/bundle/benchmarks/plan/script @@ -0,0 +1,3 @@ +gen_config.py ${BENCHMARK_PARAMS:-} > databricks.yml +wc -l databricks.yml > LOG.wc +trace benchmark.py '$CLI bundle plan > /dev/null' diff --git a/acceptance/bundle/benchmarks/test.toml b/acceptance/bundle/benchmarks/test.toml new file mode 100644 index 0000000000..9f94c55600 --- /dev/null +++ b/acceptance/bundle/benchmarks/test.toml @@ -0,0 +1,5 @@ +Timeout = '4h' +Ignore = ["databricks.yml"] + +# Disabled because it fails on CI. We don't need this to work on Windows. +GOOS.windows = false diff --git a/acceptance/bundle/benchmarks/validate/out.test.toml b/acceptance/bundle/benchmarks/validate/out.test.toml new file mode 100644 index 0000000000..40bb0d1047 --- /dev/null +++ b/acceptance/bundle/benchmarks/validate/out.test.toml @@ -0,0 +1,8 @@ +Local = true +Cloud = false + +[GOOS] + windows = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/bundle/benchmarks/validate/output.txt b/acceptance/bundle/benchmarks/validate/output.txt new file mode 100644 index 0000000000..1ef997149b --- /dev/null +++ b/acceptance/bundle/benchmarks/validate/output.txt @@ -0,0 +1,14 @@ + +>>> head -n 10 databricks.yml +bundle: + name: "test-bundle" +resources: + jobs: + job_0: + description: "This job contain multiple tasks that are required to produce the weekly shark sightings report." + email_notifications: + no_alert_for_skipped_runs: false + on_failure: + - "user.name@databricks.com" + +>>> benchmark.py [CLI] bundle validate diff --git a/acceptance/bundle/benchmarks/validate/script b/acceptance/bundle/benchmarks/validate/script new file mode 100755 index 0000000000..07a8f30603 --- /dev/null +++ b/acceptance/bundle/benchmarks/validate/script @@ -0,0 +1,4 @@ +gen_config.py ${BENCHMARK_PARAMS:-} > databricks.yml +wc -l databricks.yml > LOG.wc +trace head -n 10 databricks.yml +trace benchmark.py $CLI bundle validate diff --git a/acceptance/selftest/benchmark/out.test.toml b/acceptance/selftest/benchmark/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/selftest/benchmark/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/selftest/benchmark/output.txt b/acceptance/selftest/benchmark/output.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/acceptance/selftest/benchmark/script b/acceptance/selftest/benchmark/script new file mode 100644 index 0000000000..be2387c22b --- /dev/null +++ b/acceptance/selftest/benchmark/script @@ -0,0 +1 @@ +benchmark.py --runs 3 'true' diff --git a/acceptance/selftest/gen_config/out.test.toml b/acceptance/selftest/gen_config/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/selftest/gen_config/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/selftest/gen_config/output.txt b/acceptance/selftest/gen_config/output.txt new file mode 100644 index 0000000000..15041262c6 --- /dev/null +++ b/acceptance/selftest/gen_config/output.txt @@ -0,0 +1,266 @@ +bundle: + name: "test-bundle" +resources: + jobs: + job_0: + description: "This job contain multiple tasks that are required to produce the weekly shark sightings report." + email_notifications: + no_alert_for_skipped_runs: false + on_failure: + - "user.name@databricks.com" + on_success: + - "user.name@databricks.com" + job_clusters: + - job_cluster_key: "auto_scaling_cluster" + new_cluster: + autoscale: + max_workers: 16 + min_workers: 2 + node_type_id: "[NODE_TYPE_ID]" + spark_conf: + spark.speculation: "true" + spark_version: "13.3.x-scala2.12" + max_concurrent_runs: 10 + name: "job_0" + notification_settings: + no_alert_for_canceled_runs: false + no_alert_for_skipped_runs: false + parameters: + - default: "users" + name: "table" + tags: + cost-center: "engineering" + team: "jobs" + tasks: + - depends_on: [] + description: "Extracts session data from events" + job_cluster_key: "auto_scaling_cluster" + libraries: + - jar: "dbfs:/mnt/databricks/Sessionize.jar" + max_retries: 3 + min_retry_interval_millis: 2000 + retry_on_timeout: false + spark_jar_task: + main_class_name: "com.databricks.Sessionize" + parameters: + - "--data" + - "dbfs:/path/to/data.json" + task_key: "Sessionize" + timeout_seconds: 86400 + - depends_on: [] + description: "Ingests order data" + job_cluster_key: "auto_scaling_cluster" + libraries: + - jar: "dbfs:/mnt/databricks/OrderIngest.jar" + max_retries: 3 + min_retry_interval_millis: 2000 + retry_on_timeout: false + spark_jar_task: + main_class_name: "com.databricks.OrdersIngest" + parameters: + - "--data" + - "dbfs:/path/to/order-data.json" + task_key: "Orders_Ingest" + timeout_seconds: 86400 + - depends_on: + - task_key: "Orders_Ingest" + - task_key: "Sessionize" + description: "Matches orders with user sessions" + max_retries: 3 + min_retry_interval_millis: 2000 + new_cluster: + autoscale: + max_workers: 16 + min_workers: 2 + node_type_id: "[NODE_TYPE_ID]" + spark_conf: + spark.speculation: "true" + spark_version: "13.3.x-scala2.12" + notebook_task: + base_parameters: + age: "35" + name: "John Doe" + notebook_path: "/Users/user.name@databricks.com/Match" + retry_on_timeout: false + run_if: "ALL_SUCCESS" + task_key: "Match" + timeout_seconds: 86400 + timeout_seconds: 86400 + schedule: + pause_status: "UNPAUSED" + quartz_cron_expression: "20 30 * * * ?" + timezone_id: "Europe/London" + job_1: + description: "This job contain multiple tasks that are required to produce the weekly shark sightings report." + email_notifications: + no_alert_for_skipped_runs: false + on_failure: + - "user.name@databricks.com" + on_success: + - "user.name@databricks.com" + job_clusters: + - job_cluster_key: "auto_scaling_cluster" + new_cluster: + autoscale: + max_workers: 16 + min_workers: 2 + node_type_id: "[NODE_TYPE_ID]" + spark_conf: + spark.speculation: "true" + spark_version: "13.3.x-scala2.12" + max_concurrent_runs: 10 + name: "job_1" + notification_settings: + no_alert_for_canceled_runs: false + no_alert_for_skipped_runs: false + parameters: + - default: "users" + name: "table" + tags: + cost-center: "engineering" + team: "jobs" + tasks: + - depends_on: [] + description: "Extracts session data from events" + job_cluster_key: "auto_scaling_cluster" + libraries: + - jar: "dbfs:/mnt/databricks/Sessionize.jar" + max_retries: 3 + min_retry_interval_millis: 2000 + retry_on_timeout: false + spark_jar_task: + main_class_name: "com.databricks.Sessionize" + parameters: + - "--data" + - "dbfs:/path/to/data.json" + task_key: "Sessionize" + timeout_seconds: 86400 + - depends_on: [] + description: "Ingests order data" + job_cluster_key: "auto_scaling_cluster" + libraries: + - jar: "dbfs:/mnt/databricks/OrderIngest.jar" + max_retries: 3 + min_retry_interval_millis: 2000 + retry_on_timeout: false + spark_jar_task: + main_class_name: "com.databricks.OrdersIngest" + parameters: + - "--data" + - "dbfs:/path/to/order-data.json" + task_key: "Orders_Ingest" + timeout_seconds: 86400 + - depends_on: + - task_key: "Orders_Ingest" + - task_key: "Sessionize" + description: "Matches orders with user sessions" + max_retries: 3 + min_retry_interval_millis: 2000 + new_cluster: + autoscale: + max_workers: 16 + min_workers: 2 + node_type_id: "[NODE_TYPE_ID]" + spark_conf: + spark.speculation: "true" + spark_version: "13.3.x-scala2.12" + notebook_task: + base_parameters: + age: "35" + name: "John Doe" + notebook_path: "/Users/user.name@databricks.com/Match" + retry_on_timeout: false + run_if: "ALL_SUCCESS" + task_key: "Match" + timeout_seconds: 86400 + timeout_seconds: 86400 + continuous: + pause_status: "UNPAUSED" + job_2: + description: "This job contain multiple tasks that are required to produce the weekly shark sightings report." + email_notifications: + no_alert_for_skipped_runs: false + on_failure: + - "user.name@databricks.com" + on_success: + - "user.name@databricks.com" + job_clusters: + - job_cluster_key: "auto_scaling_cluster" + new_cluster: + autoscale: + max_workers: 16 + min_workers: 2 + node_type_id: "[NODE_TYPE_ID]" + spark_conf: + spark.speculation: "true" + spark_version: "13.3.x-scala2.12" + max_concurrent_runs: 10 + name: "job_2" + notification_settings: + no_alert_for_canceled_runs: false + no_alert_for_skipped_runs: false + parameters: + - default: "users" + name: "table" + tags: + cost-center: "engineering" + team: "jobs" + tasks: + - depends_on: [] + description: "Extracts session data from events" + job_cluster_key: "auto_scaling_cluster" + libraries: + - jar: "dbfs:/mnt/databricks/Sessionize.jar" + max_retries: 3 + min_retry_interval_millis: 2000 + retry_on_timeout: false + spark_jar_task: + main_class_name: "com.databricks.Sessionize" + parameters: + - "--data" + - "dbfs:/path/to/data.json" + task_key: "Sessionize" + timeout_seconds: 86400 + - depends_on: [] + description: "Ingests order data" + job_cluster_key: "auto_scaling_cluster" + libraries: + - jar: "dbfs:/mnt/databricks/OrderIngest.jar" + max_retries: 3 + min_retry_interval_millis: 2000 + retry_on_timeout: false + spark_jar_task: + main_class_name: "com.databricks.OrdersIngest" + parameters: + - "--data" + - "dbfs:/path/to/order-data.json" + task_key: "Orders_Ingest" + timeout_seconds: 86400 + - depends_on: + - task_key: "Orders_Ingest" + - task_key: "Sessionize" + description: "Matches orders with user sessions" + max_retries: 3 + min_retry_interval_millis: 2000 + new_cluster: + autoscale: + max_workers: 16 + min_workers: 2 + node_type_id: "[NODE_TYPE_ID]" + spark_conf: + spark.speculation: "true" + spark_version: "13.3.x-scala2.12" + notebook_task: + base_parameters: + age: "35" + name: "John Doe" + notebook_path: "/Users/user.name@databricks.com/Match" + retry_on_timeout: false + run_if: "ALL_SUCCESS" + task_key: "Match" + timeout_seconds: 86400 + timeout_seconds: 86400 + schedule: + pause_status: "UNPAUSED" + quartz_cron_expression: "20 30 * * * ?" + timezone_id: "Europe/London" diff --git a/acceptance/selftest/gen_config/script b/acceptance/selftest/gen_config/script new file mode 100644 index 0000000000..c2f3177005 --- /dev/null +++ b/acceptance/selftest/gen_config/script @@ -0,0 +1 @@ +gen_config.py --jobs 3 diff --git a/tools/bench_parse.py b/tools/bench_parse.py new file mode 100755 index 0000000000..534fbe36af --- /dev/null +++ b/tools/bench_parse.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" +Parses output of benchmark runs (e.g. "make bench100") and prints a summary table. +""" + +import sys +import re +import statistics +from collections import defaultdict + + +def parse_key_values(text): + """Parse key=value pairs from a string. + + >>> parse_key_values("wall=10.316 ru_utime=19.207 ru_stime=0.505 ru_maxrss=573079552") + {'wall': 10.316, 'ru_utime': 19.207, 'ru_stime': 0.505, 'ru_maxrss': 573079552.0} + """ + result = {} + for kv_pair in text.split(): + if "=" in kv_pair: + key, value = kv_pair.split("=", 1) + try: + result[key] = float(value) + except ValueError: + result[key] = value + return result + + +def parse_bench_output(file_path): + """Parse benchmark output and extract test results.""" + results = defaultdict(lambda: defaultdict(list)) + + current_test = None + + with open(file_path) as f: + for line in f: + test_match = re.match(r"=== RUN\s+(.+)", line) + if test_match: + current_test = test_match.group(1) + current_test = current_test.removeprefix("TestAccept/bundle/benchmarks/") + continue + + if "TESTLOG: Run #" in line and "(count)" in line: + if current_test: + # Extract everything after the run label + parts = line.split("(count):") + if len(parts) == 2: + kv_data = parse_key_values(parts[1].strip()) + for key, value in kv_data.items(): + results[current_test][key].append(value) + + return results + + +def calculate_means(results): + """Calculate mean values for each metric.""" + means = {} + for test_name, metrics in results.items(): + means[test_name] = {metric: statistics.mean(values) if values else 0 for metric, values in metrics.items()} + return means + + +def print_results(results): + """Output table for single file.""" + means = calculate_means(results) + + all_metrics = {} + for metrics in means.values(): + for key in metrics: + all_metrics.setdefault(key, None) + all_metrics = list(all_metrics.keys()) + + testname_width = max(len("testname"), max((len(name) for name in means.keys()), default=0)) + metric_width = 12 + + header = f"{'testname':<{testname_width}}" + for metric in all_metrics: + header += f" {metric:>{metric_width}}" + print(header) + print("-" * len(header)) + + for test_name in sorted(means.keys()): + m = means[test_name] + row = f"{test_name:<{testname_width}}" + for metric in all_metrics: + value = m.get(metric, 0) + if isinstance(value, float) and value > 1000000: + row += f" {value:>{metric_width}.0f}" + else: + row += f" {value:>{metric_width}.3f}" + print(row) + + +def main(): + for filename in sys.argv[1:]: + results = parse_bench_output(filename) + print_results(results) + + +if __name__ == "__main__": + main()