Skip to content

Commit 6b66c22

Browse files
authored
Add AWS CDK-based benchmarking environment (#227)
* Add cdk sample app * Add bucket and ec2 instances * Use a static channel resolver in benchmarks * Add worker.rs * Use an ec2 api based channel resolver * Revert "Use a static channel resolver in benchmarks" This reverts commit 5bdec16 * Refactor ec2 channel resolver * Add datafusion-bench script * Fix worker ec2 resolver * Improve benchmarking script and cdk tests * Show diff with previous run * Better logs for the benchmarks * Improve readme * Allays drop tables * Improve Cfn output
1 parent 28a278c commit 6b66c22

File tree

16 files changed

+6655
-48
lines changed

16 files changed

+6655
-48
lines changed

Cargo.lock

Lines changed: 1159 additions & 48 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,18 @@ chrono = "0.4.41"
2020
futures = "0.3.31"
2121
dashmap = "6.1.0"
2222
prost = "0.13.5"
23+
url = "2.5.4"
24+
arrow-flight = "56.1.0"
25+
tonic = { version = "0.13.1", features = ["transport"] }
26+
axum = "0.7"
27+
object_store = { version = "0.12.4", features = ["aws"] }
28+
aws-config = "<1.1.1"
29+
aws-sdk-ec2 = "<1.50.1"
2330

2431
[[bin]]
2532
name = "dfbench"
2633
path = "src/bin/dfbench.rs"
34+
35+
[[bin]]
36+
name = "worker"
37+
path = "cdk/bin/worker.rs"

benchmarks/cdk/.gitignore

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
*.js
2+
!jest.config.js
3+
*.d.ts
4+
node_modules
5+
6+
# CDK asset staging directory
7+
.cdk.staging
8+
cdk.out

benchmarks/cdk/.npmignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*.ts
2+
!*.d.ts
3+
4+
# CDK asset staging directory
5+
.cdk.staging
6+
cdk.out

benchmarks/cdk/README.md

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# AWS CDK code for DataFusion distributed benchmarks
2+
3+
Creates automatically the appropriate infrastructure in AWS for running benchmarks.
4+
5+
---
6+
7+
# Deploy
8+
9+
## Prerequisites
10+
11+
Cargo zigbuild needs to be installed in the system for cross-compiling to Linux x86_64, which
12+
is what the benchmarking machines in AWS run on.
13+
14+
```shell
15+
cargo install --locked cargo-zigbuild
16+
```
17+
18+
Make sure to also have the `x86_64-unknown-linux-gnu` target installed in
19+
your Rust toolchain:
20+
21+
```shell
22+
rustup target add x86_64-unknown-linux-gnu
23+
```
24+
25+
Ensure that you can cross-compile to Linux x86_64 before performing any deployments:
26+
27+
```shell
28+
cargo zigbuild -p datafusion-distributed-benchmarks --release --bin worker --target x86_64-unknown-linux-gnu
29+
```
30+
31+
## CDK deploy
32+
33+
```shell
34+
npm run cdk deploy
35+
```
36+
37+
## Populating the bucket with TPCH data
38+
39+
```shell
40+
npm run sync-bucket
41+
```
42+
43+
---
44+
45+
# Connect to instances
46+
47+
## Prerequisites
48+
49+
The session manager plugin for the AWS CLI needs to be installed, as that's what is used for
50+
connecting to the EC2 machines instead of SSH.
51+
52+
These are the docs with installation instructions:
53+
54+
https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-working-with-install-plugin.html
55+
56+
On Mac with an Apple Silicon processor, it can be installed with:
57+
58+
```shell
59+
curl "https://s3.amazonaws.com/session-manager-downloads/plugin/latest/mac_arm64/session-manager-plugin.pkg" -o "session-manager-plugin.pkg"
60+
sudo installer -pkg session-manager-plugin.pkg -target
61+
sudo ln -s /usr/local/sessionmanagerplugin/bin/session-manager-plugin /usr/local/bin/session-manager-plugin
62+
```
63+
64+
## Port Forward
65+
66+
After performing a CDK deploy, a CNF output will be printed to stdout with instructions for port-forwarding to them.
67+
68+
```shell
69+
export INSTANCE_ID=i-0000000000000000
70+
71+
aws ssm start-session --target $INSTANCE_ID --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000"
72+
```
73+
74+
Just port-forwarding the first instance is enough for issuing queries.
75+
76+
## Connect
77+
78+
After performing a CDK deploy, a CNF output will be printed to stdout with instructions for connecting
79+
to all the machines, something like this:
80+
81+
```shell
82+
export INSTANCE_ID=i-0000000000000000
83+
84+
aws ssm start-session --target $INSTANCE_ID
85+
```
86+
87+
The logs can be streamed with:
88+
89+
```shell
90+
sudo journalctl -u worker.service -f -o cat
91+
```
92+
93+
---
94+
95+
# Running benchmarks
96+
97+
There's a script that will run the TPCH benchmarks against the remote cluster:
98+
99+
In one terminal, perform a port-forward of one machine in the cluster, something like this:
100+
101+
```shell
102+
export INSTANCE_ID=i-0000000000000000
103+
aws ssm start-session --target $INSTANCE_ID --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000"
104+
```
105+
106+
In another terminal, navigate to the benchmarks/cdk folder:
107+
108+
```shell
109+
cd benchmarks/cdk
110+
```
111+
112+
And run the benchmarking script
113+
114+
```shell
115+
npm run datafusion-bench
116+
```
117+
118+
Several arguments can be passed for running the benchmarks against different scale factors and with different configs,
119+
for example:
120+
121+
```shell
122+
npm run datafusion-bench -- --sf 10 --files-per-task 4 --query 7
123+
```

benchmarks/cdk/bin/cdk.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/usr/bin/env node
2+
import * as cdk from 'aws-cdk-lib/core';
3+
import { CdkStack } from '../lib/cdk-stack';
4+
5+
const app = new cdk.App();
6+
7+
const config = {
8+
instanceType: 't3.xlarge',
9+
instanceCount: 4,
10+
};
11+
12+
new CdkStack(app, 'DataFusionDistributedBenchmarks', { config });
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import path from "path";
2+
import fs from "fs/promises";
3+
import { Command } from "commander";
4+
import { z } from 'zod';
5+
6+
const ROOT = path.join(__dirname, '../../..')
7+
8+
// Remember to port-forward a worker with
9+
// aws ssm start-session --target {host-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000"
10+
async function main () {
11+
const program = new Command();
12+
13+
program
14+
.option('--sf <number>', 'Scale factor', '1')
15+
.option('-i, --iterations <number>', 'Number of iterations', '3')
16+
.option('--files-per-task <number>', 'Files per task', '4')
17+
.option('--cardinality-task-sf <number>', 'Cardinality task scale factor', '2')
18+
.option('--query <number>', 'A specific query to run', undefined)
19+
.parse(process.argv);
20+
21+
const options = program.opts();
22+
23+
const sf = parseInt(options.sf);
24+
const iterations = parseInt(options.iterations);
25+
const filesPerTask = parseInt(options.filesPerTask);
26+
const cardinalityTaskSf = parseInt(options.cardinalityTaskSf);
27+
28+
// Compare with previous results first
29+
const results: BenchmarkResults = { queries: [] };
30+
const queriesPath = path.join(ROOT, "testdata", "tpch", "queries")
31+
32+
console.log("Creating tables...")
33+
await query(createTablesSql(sf))
34+
await query(`
35+
SET distributed.files_per_task=${filesPerTask};
36+
SET distributed.cardinality_task_count_factor=${cardinalityTaskSf}
37+
`)
38+
39+
for (let id of IDS) {
40+
if (options.query && parseInt(options.query) !== id) {
41+
continue
42+
}
43+
44+
const queryId = `q${id}`;
45+
const filePath = path.join(queriesPath, `${queryId}.sql`)
46+
const content = await fs.readFile(filePath, 'utf-8')
47+
48+
const queryResult: QueryResult = {
49+
query: queryId,
50+
iterations: []
51+
};
52+
53+
for (let i = 0; i < iterations; i++) {
54+
const start = new Date()
55+
const response = await query(content)
56+
const elapsed = Math.round(new Date().getTime() - start.getTime())
57+
58+
queryResult.iterations.push({
59+
elapsed,
60+
row_count: response.count
61+
});
62+
63+
console.log(
64+
`Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.count} rows`
65+
);
66+
}
67+
68+
const avg = Math.round(
69+
queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length
70+
);
71+
console.log(`Query ${id} avg time: ${avg} ms`);
72+
73+
results.queries.push(queryResult);
74+
}
75+
76+
// Write results and compare
77+
const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json");
78+
await compareWithPrevious(results, outputPath);
79+
await writeJson(results, outputPath);
80+
}
81+
82+
// Simple data structures
83+
type QueryResult = {
84+
query: string;
85+
iterations: { elapsed: number; row_count: number }[];
86+
}
87+
88+
type BenchmarkResults = {
89+
queries: QueryResult[];
90+
}
91+
92+
const BenchmarkResults = z.object({
93+
queries: z.array(z.object({
94+
query: z.string(),
95+
iterations: z.array(z.object({
96+
elapsed: z.number(),
97+
row_count: z.number()
98+
}))
99+
}))
100+
})
101+
102+
async function writeJson (results: BenchmarkResults, outputPath?: string) {
103+
if (!outputPath) return;
104+
await fs.writeFile(outputPath, JSON.stringify(results, null, 2));
105+
}
106+
107+
async function compareWithPrevious (results: BenchmarkResults, outputPath: string) {
108+
let prevResults: BenchmarkResults;
109+
try {
110+
const prevContent = await fs.readFile(outputPath, 'utf-8');
111+
prevResults = BenchmarkResults.parse(JSON.parse(prevContent));
112+
} catch {
113+
return; // No previous results to compare
114+
}
115+
116+
console.log('\n==== Comparison with previous run ====');
117+
118+
for (const query of results.queries) {
119+
const prevQuery = prevResults.queries.find(q => q.query === query.query);
120+
if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) {
121+
continue;
122+
}
123+
124+
const avgPrev = Math.round(
125+
prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length
126+
);
127+
const avg = Math.round(
128+
query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length
129+
);
130+
131+
const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev;
132+
const tag = avg < avgPrev ? "faster" : "slower";
133+
const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖");
134+
135+
console.log(
136+
`${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}`
137+
);
138+
}
139+
}
140+
141+
142+
function createTablesSql (sf: number): string {
143+
let stmt = ''
144+
for (const tbl of [
145+
"lineitem",
146+
"orders",
147+
"part",
148+
"partsupp",
149+
"customer",
150+
"nation",
151+
"region",
152+
"supplier",
153+
]) {
154+
// language=SQL format=false
155+
stmt += `
156+
DROP TABLE IF EXISTS ${tbl};
157+
CREATE EXTERNAL TABLE IF NOT EXISTS ${tbl} STORED AS PARQUET LOCATION 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/${tbl}/';
158+
`
159+
}
160+
return stmt
161+
}
162+
163+
const IDS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]
164+
165+
const QueryResponse = z.object({
166+
count: z.number(),
167+
plan: z.string()
168+
})
169+
type QueryResponse = z.infer<typeof QueryResponse>
170+
171+
async function query (sql: string): Promise<QueryResponse> {
172+
const url = new URL('http://localhost:9000')
173+
url.searchParams.set('sql', sql)
174+
175+
const response = await fetch(url.toString())
176+
177+
if (!response.ok) {
178+
const msg = await response.text()
179+
throw new Error(`Query failed: ${response.status} ${msg}`)
180+
}
181+
182+
const unparsed = await response.json()
183+
return QueryResponse.parse(unparsed)
184+
}
185+
186+
main()
187+
.catch(err => {
188+
console.error(err)
189+
process.exit(1)
190+
})

0 commit comments

Comments
 (0)