Skip to content

Commit 1fb4daa

Browse files
authored
Adapt remote benchmarks to support more datasets (#276)
* Vendor openssl in benchmarks * Support any dataset in benchmarks * Add TPC-DS schemas * Pass runtime env in Worker
1 parent 48910a6 commit 1fb4daa

File tree

8 files changed

+1002
-347
lines changed

8 files changed

+1002
-347
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ axum = "0.7"
2727
object_store = { version = "0.12.4", features = ["aws"] }
2828
aws-config = "1"
2929
aws-sdk-ec2 = "1"
30+
openssl = { version = "0.10", features = ["vendored"] }
3031

3132
[[bin]]
3233
name = "dfbench"

benchmarks/cdk/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,5 +119,5 @@ Several arguments can be passed for running the benchmarks against different sca
119119
for example:
120120

121121
```shell
122-
npm run datafusion-bench -- --sf 10 --files-per-task 4 --query 7
122+
npm run datafusion-bench -- --datset tpch_sf10 --files-per-task 4 --query 7
123123
```
Lines changed: 166 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,135 +1,204 @@
11
import path from "path";
22
import fs from "fs/promises";
3-
import { z } from 'zod';
3+
import {z} from 'zod';
44

55
export const ROOT = path.join(__dirname, '../../..')
6+
export const BUCKET = 's3://datafusion-distributed-benchmarks' // hardcoded in CDK code
67

78
// Simple data structures
89
export type QueryResult = {
9-
query: string;
10-
iterations: { elapsed: number; row_count: number }[];
10+
query: string;
11+
iterations: { elapsed: number; row_count: number }[];
12+
failure?: string
1113
}
1214

1315
export type BenchmarkResults = {
14-
queries: QueryResult[];
16+
queries: QueryResult[];
1517
}
1618

1719
export const BenchmarkResults = z.object({
18-
queries: z.array(z.object({
19-
query: z.string(),
20-
iterations: z.array(z.object({
21-
elapsed: z.number(),
22-
row_count: z.number()
20+
queries: z.array(z.object({
21+
query: z.string(),
22+
iterations: z.array(z.object({
23+
elapsed: z.number(),
24+
row_count: z.number()
25+
})),
26+
failed: z.string().optional()
2327
}))
24-
}))
2528
})
2629

27-
export const IDS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]
28-
2930
export async function writeJson(results: BenchmarkResults, outputPath?: string) {
30-
if (!outputPath) return;
31-
await fs.mkdir(path.dirname(outputPath), { recursive: true });
32-
await fs.writeFile(outputPath, JSON.stringify(results, null, 2));
31+
if (!outputPath) return;
32+
await fs.mkdir(path.dirname(outputPath), { recursive: true });
33+
await fs.writeFile(outputPath, JSON.stringify(results, null, 2));
3334
}
3435

3536
export async function compareWithPrevious(results: BenchmarkResults, outputPath: string) {
36-
let prevResults: BenchmarkResults;
37-
try {
38-
const prevContent = await fs.readFile(outputPath, 'utf-8');
39-
prevResults = BenchmarkResults.parse(JSON.parse(prevContent));
40-
} catch {
41-
return; // No previous results to compare
42-
}
43-
44-
console.log('\n==== Comparison with previous run ====');
45-
46-
for (const query of results.queries) {
47-
const prevQuery = prevResults.queries.find(q => q.query === query.query);
48-
if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) {
49-
continue;
37+
let prevResults: BenchmarkResults;
38+
try {
39+
const prevContent = await fs.readFile(outputPath, 'utf-8');
40+
prevResults = BenchmarkResults.parse(JSON.parse(prevContent));
41+
} catch {
42+
return; // No previous results to compare
5043
}
5144

52-
const avgPrev = Math.round(
53-
prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length
54-
);
55-
const avg = Math.round(
56-
query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length
57-
);
58-
59-
const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev;
60-
const tag = avg < avgPrev ? "faster" : "slower";
61-
const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖");
62-
63-
console.log(
64-
`${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}`
65-
);
66-
}
67-
}
68-
69-
export interface BenchmarkRunner {
70-
createTables(sf: number): Promise<void>;
71-
72-
executeQuery(query: string): Promise<{ rowCount: number }>;
73-
}
45+
console.log('\n==== Comparison with previous run ====');
7446

75-
export async function runBenchmark(
76-
runner: BenchmarkRunner,
77-
options: {
78-
sf: number;
79-
iterations: number;
80-
specificQuery?: number;
81-
outputPath: string;
82-
}
83-
) {
84-
const { sf, iterations, specificQuery, outputPath } = options;
47+
for (const query of results.queries) {
48+
const prevQuery = prevResults.queries.find(q => q.query === query.query);
49+
if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) {
50+
continue;
51+
}
8552

86-
const results: BenchmarkResults = { queries: [] };
87-
const queriesPath = path.join(ROOT, "testdata", "tpch", "queries")
53+
const avgPrev = Math.round(
54+
prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length
55+
);
56+
const avg = Math.round(
57+
query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length
58+
);
8859

89-
console.log("Creating tables...");
90-
await runner.createTables(sf);
60+
const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev;
61+
const tag = avg < avgPrev ? "faster" : "slower";
62+
const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖");
9163

92-
for (let id of IDS) {
93-
if (specificQuery && specificQuery !== id) {
94-
continue;
64+
console.log(
65+
`${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}`
66+
);
9567
}
68+
}
9669

97-
const queryId = `q${id}`;
98-
const filePath = path.join(queriesPath, `${queryId}.sql`)
99-
const queryToExecute = await fs.readFile(filePath, 'utf-8')
70+
export interface TableSpec {
71+
schema: string
72+
name: string
73+
s3Path: string
74+
}
75+
76+
export interface BenchmarkRunner {
77+
createTables(s3Paths: TableSpec[]): Promise<void>;
10078

101-
const queryResult: QueryResult = {
102-
query: queryId,
103-
iterations: []
104-
};
79+
executeQuery(query: string): Promise<{ rowCount: number }>;
80+
}
10581

106-
console.log(`Warming up query ${id}...`)
107-
await runner.executeQuery(queryToExecute);
82+
async function tablePathsForDataset(dataset: string): Promise<TableSpec[]> {
83+
const datasetPath = path.join(ROOT, "benchmarks", "data", dataset)
84+
85+
const result: TableSpec[] = []
86+
for (const entryName of await fs.readdir(datasetPath)) {
87+
const dir = path.join(datasetPath, entryName)
88+
if (await isDirWithAllParquetFiles(dir)) {
89+
result.push({
90+
name: entryName,
91+
schema: dataset,
92+
s3Path: `${BUCKET}/${dataset}/${entryName}/`
93+
})
94+
}
95+
}
96+
return result
97+
}
10898

109-
for (let i = 0; i < iterations; i++) {
110-
const start = new Date()
111-
const response = await runner.executeQuery(queryToExecute);
112-
const elapsed = Math.round(new Date().getTime() - start.getTime())
99+
async function isDirWithAllParquetFiles(dir: string): Promise<boolean> {
100+
let readDir
101+
try {
102+
readDir = await fs.readdir(dir)
103+
} catch (e) {
104+
return false
105+
}
106+
for (const file of readDir) {
107+
if (!file.endsWith(".parquet")) {
108+
return false
109+
}
110+
}
111+
return true
112+
}
113113

114-
queryResult.iterations.push({
115-
elapsed,
116-
row_count: response.rowCount
117-
});
114+
async function queriesForDataset(dataset: string): Promise<[string, string][]> {
115+
const datasetSuffix = dataset.split("_")[0]
116+
const queriesPath = path.join(ROOT, "testdata", datasetSuffix, "queries")
118117

119-
console.log(
120-
`Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.rowCount} rows`
121-
);
118+
const queries: [string, string][] = []
119+
for (const queryName of await fs.readdir(queriesPath)) {
120+
const sql = await fs.readFile(path.join(queriesPath, queryName), 'utf-8');
121+
queries.push([queryName, sql])
122122
}
123+
queries.sort(([name1], [name2]) => numericId(name1) > numericId(name2) ? 1 : -1)
124+
return queries
125+
}
123126

124-
const avg = Math.round(
125-
queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length
126-
);
127-
console.log(`Query ${id} avg time: ${avg} ms`);
127+
function numericId(queryName: string): number {
128+
return parseInt([...queryName.matchAll(/(\d+)/g)][0][0])
129+
}
128130

129-
results.queries.push(queryResult);
130-
}
131+
export async function runBenchmark(
132+
runner: BenchmarkRunner,
133+
options: {
134+
dataset: string
135+
iterations: number;
136+
queries: number[];
137+
outputPath: string;
138+
}
139+
) {
140+
const { dataset, iterations, queries, outputPath } = options;
141+
142+
const results: BenchmarkResults = { queries: [] };
143+
144+
console.log("Creating tables...");
145+
const s3Paths = await tablePathsForDataset(dataset)
146+
await runner.createTables(s3Paths);
147+
148+
for (const [queryName, sql] of await queriesForDataset(dataset)) {
149+
const id = numericId(queryName)
150+
151+
if (queries.length > 0 && !queries.includes(id)) {
152+
continue;
153+
}
154+
155+
const queryResult: QueryResult = {
156+
query: queryName,
157+
iterations: [],
158+
};
159+
160+
console.log(`Warming up query ${id}...`)
161+
try {
162+
await runner.executeQuery(sql);
163+
} catch (e: any) {
164+
queryResult.failure = e.toString();
165+
console.error(`Query ${queryResult.query} failed: ${queryResult.failure}`)
166+
continue
167+
}
168+
169+
for (let i = 0; i < iterations; i++) {
170+
const start = new Date()
171+
let response
172+
try {
173+
response = await runner.executeQuery(sql);
174+
} catch (e: any) {
175+
queryResult.failure = e.toString();
176+
break
177+
}
178+
const elapsed = Math.round(new Date().getTime() - start.getTime())
179+
180+
queryResult.iterations.push({
181+
elapsed,
182+
row_count: response.rowCount
183+
});
184+
185+
console.log(
186+
`Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.rowCount} rows`
187+
);
188+
}
189+
190+
const avg = Math.round(
191+
queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length
192+
);
193+
console.log(`Query ${id} avg time: ${avg} ms`);
194+
195+
if (queryResult.failure) {
196+
console.error(`Query ${queryResult.query} failed: ${queryResult.failure}`)
197+
}
198+
results.queries.push(queryResult);
199+
}
131200

132-
// Write results and compare
133-
await compareWithPrevious(results, outputPath);
134-
await writeJson(results, outputPath);
201+
// Write results and compare
202+
await compareWithPrevious(results, outputPath);
203+
await writeJson(results, outputPath);
135204
}

0 commit comments

Comments
 (0)