Skip to content

Commit 9747a93

Browse files
authored
Add benchmarks for DABs (#4194)
## Changes - Extend acceptance test runner to support benchmarks. - Add benchmarks for bundle validate, plan, deploy (against testserver) with 100, 1000 jobs. - New make commands to quickly run the benchmark: - make bench100_summary - make bench1k_summary Benchmarks are regular acceptance test that log measurements in certain format. The output can be fed to "tools/bench_parse.py" to print a summary table. Test runner recognizes benchmark as having “benchmark” anywhere in the path. For these tests parallel execution is disabled if and only if BENCHMARK_PARAMS variable is set. The benchmarks make use of two scripts: - `gen_config.py —jobs N` to generate a config with N jobs - `benchmark.py command` to run command a few times and log the time measurements. The default number of runs in benchmark.py depends on BENCHMARK_PARAMS variable. If it’s set, the default number is 5. Otherwise it is 1. Note, bundle/benchmarks/deploy benchmarks the case when there are no changes to be deployed. If we actually measure first deployment, the time goes up from 40s to 140s for terraform but time for direct is more or less the same. ## Why Understand choke points, set the baseline for further optimizations. ## Tests Each benchmark by default is run as regular acceptance test (with a single run).
1 parent 52cb1a7 commit 9747a93

File tree

22 files changed

+758
-2
lines changed

22 files changed

+758
-2
lines changed

Makefile

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,34 @@ test-exp-ssh:
184184

185185
test-pipelines:
186186
make test TEST_PACKAGES="./cmd/pipelines/..." ACCEPTANCE_TEST_FILTER="TestAccept/pipelines"
187+
188+
189+
# Benchmarks:
190+
191+
bench1k:
192+
BENCHMARK_PARAMS="--jobs 1000" go test ./acceptance -v -tail -run TestAccept/bundle/benchmarks -timeout=120m
193+
194+
bench100:
195+
BENCHMARK_PARAMS="--jobs 100" go test ./acceptance -v -tail -run TestAccept/bundle/benchmarks -timeout=120m
196+
197+
# small benchmark to quickly test benchmark-related code
198+
bench10:
199+
BENCHMARK_PARAMS="--jobs 10" go test ./acceptance -v -tail -run TestAccept/bundle/benchmarks -timeout=120m
200+
201+
bench1k.log:
202+
make bench1k | tee $@
203+
204+
bench100.log:
205+
make bench100 | tee $@
206+
207+
bench10.log:
208+
make bench10 | tee $@
209+
210+
bench1k_summary: bench1k.log
211+
./tools/bench_parse.py $<
212+
213+
bench100_summary: bench100.log
214+
./tools/bench_parse.py $<
215+
216+
bench10_summary: bench10.log
217+
./tools/bench_parse.py $<

acceptance/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,19 @@ Any file starting with "LOG" will be logged to test log (visible with go test -v
2222

2323
See [selftest](./selftest) for more examples.
2424

25+
## Benchmarks
26+
27+
Benchmarks are regular acceptance test that log measurements in certain format. The output can be fed to `tools/bench_parse.py` to print a summary table.
28+
29+
Test runner recognizes benchmark as having "benchmark" anywhere in the path. For these tests parallel execution is disabled if and only if BENCHMARK\_PARAMS variable is set.
30+
31+
The benchmarks make use of two scripts:
32+
33+
- `gen_config.py —jobs N` to generate a config with N jobs
34+
- `benchmark.py` command to run command a few times and log the time measurements.
35+
36+
The default number of runs in benchmark.py depends on BENCHMARK\_PARAMS variable. If it’s set, the default number is 5. Otherwise it is 1.
37+
2538
## Running acceptance tests on Windows
2639

2740
To run the acceptance tests from a terminal on Windows (eg. Git Bash from VS Code),

acceptance/acceptance_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ var InprocessMode bool
6161
// lines with this prefix are not recorded in output.txt but logged instead
6262
const TestLogPrefix = "TESTLOG: "
6363

64+
// In benchmark mode we disable parallel run of all tests that contain work "benchmark" in their path
65+
var benchmarkMode = os.Getenv("BENCHMARK_PARAMS") != ""
66+
6467
func init() {
6568
flag.BoolVar(&InprocessMode, "inprocess", false, "Run CLI in the same process as test (for debugging)")
6669
flag.BoolVar(&KeepTmp, "keeptmp", false, "Do not delete TMP directory after run")
@@ -328,7 +331,13 @@ func testAccept(t *testing.T, inprocessMode bool, singleTest string) int {
328331
t.Skip(skipReason)
329332
}
330333

331-
if !inprocessMode {
334+
runParallel := !inprocessMode
335+
336+
if benchmarkMode && strings.Contains(dir, "benchmark") {
337+
runParallel = false
338+
}
339+
340+
if runParallel {
332341
t.Parallel()
333342
}
334343

@@ -344,7 +353,7 @@ func testAccept(t *testing.T, inprocessMode bool, singleTest string) int {
344353
for ind, envset := range expanded {
345354
envname := strings.Join(envset, "/")
346355
t.Run(envname, func(t *testing.T) {
347-
if !inprocessMode {
356+
if runParallel {
348357
t.Parallel()
349358
}
350359
runTest(t, dir, ind, coverDir, repls.Clone(), config, envset, envFilters)

acceptance/bin/benchmark.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
#!/usr/bin/env python3
2+
import argparse
3+
import subprocess
4+
import time
5+
import statistics
6+
import sys
7+
import os
8+
import json
9+
10+
try:
11+
import resource
12+
except ImportError:
13+
# n/a on windows
14+
resource = None
15+
16+
17+
def run_benchmark(command, warmup, runs):
18+
times = []
19+
20+
for i in range(runs):
21+
# double fork to reset max statistics like ru_maxrss
22+
cp = subprocess.run([sys.executable, sys.argv[0], "--once"] + command, stdout=subprocess.PIPE)
23+
if cp.returncode != 0:
24+
sys.exit(cp.returncode)
25+
26+
try:
27+
result = json.loads(cp.stdout)
28+
except Exception:
29+
print(f"Failed to parse: {cp.stdout!r}")
30+
raise
31+
32+
run = f"Run #{i} (warm): " if i < warmup else f"Run #{i} (count):"
33+
34+
result_formatted = " ".join(f"{key}={value}" for (key, value) in result.items())
35+
36+
print(f"TESTLOG: {run} {result_formatted}")
37+
38+
if i >= warmup:
39+
times.append(result["wall"])
40+
41+
if not times:
42+
print("No times recorded")
43+
return
44+
45+
if len(times) > 1:
46+
mean = statistics.mean(times)
47+
stdev = statistics.stdev(times)
48+
min_time = min(times)
49+
max_time = max(times)
50+
51+
print(f"TESTLOG: Benchmark: {command}")
52+
print(f"TESTLOG: Time (mean ± σ): {mean:.3f} s ± {stdev:.3f} s")
53+
print(f"TESTLOG: Range (min … max): {min_time:.3f} s … {max_time:.3f} s {len(times)} runs", flush=True)
54+
55+
56+
def run_once(command):
57+
if len(command) == 1 and " " in command[0] or ">" in command[0]:
58+
shell = True
59+
command = command[0]
60+
else:
61+
shell = False
62+
63+
if resource:
64+
rusage_before = resource.getrusage(resource.RUSAGE_CHILDREN)
65+
66+
with open("LOG.process", "a") as log:
67+
start = time.perf_counter()
68+
result = subprocess.run(command, shell=shell, stdout=log, stderr=log)
69+
end = time.perf_counter()
70+
71+
if result.returncode != 0:
72+
print(f"Error: command failed with exit code {result.returncode}", file=sys.stderr)
73+
sys.exit(result.returncode)
74+
75+
result = {"wall": end - start}
76+
77+
if resource:
78+
rusage_after = resource.getrusage(resource.RUSAGE_CHILDREN)
79+
80+
result.update(
81+
{
82+
"ru_utime": rusage_after.ru_utime - rusage_before.ru_utime,
83+
"ru_stime": rusage_after.ru_stime - rusage_before.ru_stime,
84+
# maxrss returns largest process, so subtracting is not correct since rusage_before will be reporting different process
85+
"ru_maxrss": rusage_after.ru_maxrss,
86+
}
87+
)
88+
89+
return result
90+
91+
92+
def main():
93+
parser = argparse.ArgumentParser()
94+
parser.add_argument("--warmup", type=int, default=1)
95+
parser.add_argument("--runs", type=int)
96+
parser.add_argument("--once", action="store_true")
97+
parser.add_argument("command", nargs="+")
98+
args = parser.parse_args()
99+
100+
if args.once:
101+
assert not args.runs
102+
result = run_once(args.command)
103+
print(json.dumps(result))
104+
return
105+
106+
if args.runs is None:
107+
if os.environ.get("BENCHMARK_PARAMS"):
108+
args.runs = 5
109+
else:
110+
args.runs = 1
111+
112+
if args.warmup >= args.runs:
113+
args.warmup = min(1, args.runs - 1)
114+
115+
run_benchmark(args.command, args.warmup, args.runs)
116+
117+
118+
if __name__ == "__main__":
119+
main()

acceptance/bin/gen_config.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#!/usr/bin/env python3
2+
import argparse
3+
import json
4+
import copy
5+
6+
JOB_TEMPLATE_BASE = {
7+
"description": "This job contain multiple tasks that are required to produce the weekly shark sightings report.",
8+
"email_notifications": {
9+
"no_alert_for_skipped_runs": False,
10+
"on_failure": ["[email protected]"],
11+
"on_success": ["[email protected]"],
12+
},
13+
"job_clusters": [
14+
{
15+
"job_cluster_key": "auto_scaling_cluster",
16+
"new_cluster": {
17+
"autoscale": {"max_workers": 16, "min_workers": 2},
18+
"node_type_id": "i3.xlarge",
19+
"spark_conf": {"spark.speculation": "true"},
20+
"spark_version": "13.3.x-scala2.12",
21+
},
22+
}
23+
],
24+
"max_concurrent_runs": 10,
25+
"name": "A multitask job",
26+
"notification_settings": {"no_alert_for_canceled_runs": False, "no_alert_for_skipped_runs": False},
27+
"parameters": [{"default": "users", "name": "table"}],
28+
"tags": {"cost-center": "engineering", "team": "jobs"},
29+
"tasks": [
30+
{
31+
"depends_on": [],
32+
"description": "Extracts session data from events",
33+
"job_cluster_key": "auto_scaling_cluster",
34+
"libraries": [{"jar": "dbfs:/mnt/databricks/Sessionize.jar"}],
35+
"max_retries": 3,
36+
"min_retry_interval_millis": 2000,
37+
"retry_on_timeout": False,
38+
"spark_jar_task": {
39+
"main_class_name": "com.databricks.Sessionize",
40+
"parameters": ["--data", "dbfs:/path/to/data.json"],
41+
},
42+
"task_key": "Sessionize",
43+
"timeout_seconds": 86400,
44+
},
45+
{
46+
"depends_on": [],
47+
"description": "Ingests order data",
48+
"job_cluster_key": "auto_scaling_cluster",
49+
"libraries": [{"jar": "dbfs:/mnt/databricks/OrderIngest.jar"}],
50+
"max_retries": 3,
51+
"min_retry_interval_millis": 2000,
52+
"retry_on_timeout": False,
53+
"spark_jar_task": {
54+
"main_class_name": "com.databricks.OrdersIngest",
55+
"parameters": ["--data", "dbfs:/path/to/order-data.json"],
56+
},
57+
"task_key": "Orders_Ingest",
58+
"timeout_seconds": 86400,
59+
},
60+
{
61+
"depends_on": [{"task_key": "Orders_Ingest"}, {"task_key": "Sessionize"}],
62+
"description": "Matches orders with user sessions",
63+
"max_retries": 3,
64+
"min_retry_interval_millis": 2000,
65+
"new_cluster": {
66+
"autoscale": {"max_workers": 16, "min_workers": 2},
67+
"node_type_id": "i3.xlarge",
68+
"spark_conf": {"spark.speculation": "true"},
69+
"spark_version": "13.3.x-scala2.12",
70+
},
71+
"notebook_task": {
72+
"base_parameters": {"age": "35", "name": "John Doe"},
73+
"notebook_path": "/Users/[email protected]/Match",
74+
},
75+
"retry_on_timeout": False,
76+
"run_if": "ALL_SUCCESS",
77+
"task_key": "Match",
78+
"timeout_seconds": 86400,
79+
},
80+
],
81+
"timeout_seconds": 86400,
82+
}
83+
84+
85+
def gen_config(n):
86+
jobs = {}
87+
for i in range(n):
88+
job = copy.deepcopy(JOB_TEMPLATE_BASE)
89+
job["name"] = f"job_{i}"
90+
91+
# Odd jobs use continuous, even jobs use schedule
92+
if i % 2 == 1:
93+
job["continuous"] = {"pause_status": "UNPAUSED"}
94+
else:
95+
job["schedule"] = {
96+
"pause_status": "UNPAUSED",
97+
"quartz_cron_expression": "20 30 * * * ?",
98+
"timezone_id": "Europe/London",
99+
}
100+
101+
jobs[f"job_{i}"] = job
102+
103+
config = {"bundle": {"name": "test-bundle"}, "resources": {"jobs": jobs}}
104+
105+
return config
106+
107+
108+
def print_yaml(obj, indent=0, list_item=False):
109+
indent_str = " " * indent
110+
111+
if isinstance(obj, dict):
112+
first = True
113+
for key, value in obj.items():
114+
if list_item and first:
115+
prefix = indent_str + "- "
116+
first = False
117+
elif list_item:
118+
prefix = indent_str + " "
119+
else:
120+
prefix = indent_str
121+
nested_indent = indent + 2 if list_item else indent + 1
122+
if isinstance(value, (dict, list)) and value:
123+
print(f"{prefix}{key}:")
124+
print_yaml(value, nested_indent)
125+
else:
126+
print(f"{prefix}{key}: {json.dumps(value)}")
127+
elif isinstance(obj, list):
128+
for item in obj:
129+
if isinstance(item, (dict, list)):
130+
print_yaml(item, indent, list_item=True)
131+
else:
132+
print(f"{indent_str}- {json.dumps(item)}")
133+
else:
134+
prefix = f"{indent_str}- " if list_item else indent_str
135+
print(f"{prefix}{json.dumps(obj)}")
136+
137+
138+
def main():
139+
parser = argparse.ArgumentParser()
140+
parser.add_argument("--jobs", type=int, default=10, help="Number of jobs to generate")
141+
args = parser.parse_args()
142+
143+
print_yaml(gen_config(args.jobs))
144+
145+
146+
if __name__ == "__main__":
147+
main()

acceptance/bundle/benchmarks/deploy/out.test.toml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

acceptance/bundle/benchmarks/deploy/output.txt

Whitespace-only changes.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
gen_config.py ${BENCHMARK_PARAMS:-} > databricks.yml
2+
wc -l databricks.yml >> LOG.wc
3+
# Note, since testserver persists state for the duration of the test, .databricks is kept and benchmark.py skips first run as a warmup, this measures time
4+
# it takes for no-changes deploy.
5+
# Note, terraform is set up by the test runner, so this time does not include TF download time.
6+
benchmark.py $CLI bundle deploy

acceptance/bundle/benchmarks/plan/out.test.toml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
>>> benchmark.py $CLI bundle plan > /dev/null

0 commit comments

Comments
 (0)