Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,207 changes: 1,159 additions & 48 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,18 @@ chrono = "0.4.41"
futures = "0.3.31"
dashmap = "6.1.0"
prost = "0.13.5"
url = "2.5.4"
arrow-flight = "56.1.0"
tonic = { version = "0.13.1", features = ["transport"] }
axum = "0.7"
object_store = { version = "0.12.4", features = ["aws"] }
aws-config = "<1.1.1"
aws-sdk-ec2 = "<1.50.1"

[[bin]]
name = "dfbench"
path = "src/bin/dfbench.rs"

[[bin]]
name = "worker"
path = "cdk/bin/worker.rs"
8 changes: 8 additions & 0 deletions benchmarks/cdk/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
*.js
!jest.config.js
*.d.ts
node_modules

# CDK asset staging directory
.cdk.staging
cdk.out
6 changes: 6 additions & 0 deletions benchmarks/cdk/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*.ts
!*.d.ts

# CDK asset staging directory
.cdk.staging
cdk.out
123 changes: 123 additions & 0 deletions benchmarks/cdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# AWS CDK code for DataFusion distributed benchmarks

Creates automatically the appropriate infrastructure in AWS for running benchmarks.

---

# Deploy

## Prerequisites

Cargo zigbuild needs to be installed in the system for cross-compiling to Linux x86_64, which
is what the benchmarking machines in AWS run on.

```shell
cargo install --locked cargo-zigbuild
```

Make sure to also have the `x86_64-unknown-linux-gnu` target installed in
your Rust toolchain:

```shell
rustup target add x86_64-unknown-linux-gnu
```

Ensure that you can cross-compile to Linux x86_64 before performing any deployments:

```shell
cargo zigbuild -p datafusion-distributed-benchmarks --release --bin worker --target x86_64-unknown-linux-gnu
```

## CDK deploy

```shell
npm run cdk deploy
```

## Populating the bucket with TPCH data

```shell
npm run sync-bucket
```

---

# Connect to instances

## Prerequisites

The session manager plugin for the AWS CLI needs to be installed, as that's what is used for
connecting to the EC2 machines instead of SSH.

These are the docs with installation instructions:

https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-working-with-install-plugin.html

On Mac with an Apple Silicon processor, it can be installed with:

```shell
curl "https://s3.amazonaws.com/session-manager-downloads/plugin/latest/mac_arm64/session-manager-plugin.pkg" -o "session-manager-plugin.pkg"
sudo installer -pkg session-manager-plugin.pkg -target
sudo ln -s /usr/local/sessionmanagerplugin/bin/session-manager-plugin /usr/local/bin/session-manager-plugin
```

## Port Forward

After performing a CDK deploy, a CNF output will be printed to stdout with instructions for port-forwarding to them.

```shell
export INSTANCE_ID=i-0000000000000000

aws ssm start-session --target $INSTANCE_ID --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000"
```

Just port-forwarding the first instance is enough for issuing queries.

## Connect

After performing a CDK deploy, a CNF output will be printed to stdout with instructions for connecting
to all the machines, something like this:

```shell
export INSTANCE_ID=i-0000000000000000

aws ssm start-session --target $INSTANCE_ID
```

The logs can be streamed with:

```shell
sudo journalctl -u worker.service -f -o cat
```

---

# Running benchmarks

There's a script that will run the TPCH benchmarks against the remote cluster:

In one terminal, perform a port-forward of one machine in the cluster, something like this:

```shell
export INSTANCE_ID=i-0000000000000000
aws ssm start-session --target $INSTANCE_ID --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000"
```

In another terminal, navigate to the benchmarks/cdk folder:

```shell
cd benchmarks/cdk
```

And run the benchmarking script

```shell
npm run datafusion-bench
```

Several arguments can be passed for running the benchmarks against different scale factors and with different configs,
for example:

```shell
npm run datafusion-bench -- --sf 10 --files-per-task 4 --query 7
```
12 changes: 12 additions & 0 deletions benchmarks/cdk/bin/cdk.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env node
import * as cdk from 'aws-cdk-lib/core';
import { CdkStack } from '../lib/cdk-stack';

const app = new cdk.App();

const config = {
instanceType: 't3.xlarge',
instanceCount: 4,
};

new CdkStack(app, 'DataFusionDistributedBenchmarks', { config });
190 changes: 190 additions & 0 deletions benchmarks/cdk/bin/datafusion-bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import path from "path";
import fs from "fs/promises";
import { Command } from "commander";
import { z } from 'zod';

const ROOT = path.join(__dirname, '../../..')

// Remember to port-forward a worker with
// aws ssm start-session --target {host-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000"
async function main () {
const program = new Command();

program
.option('--sf <number>', 'Scale factor', '1')
.option('-i, --iterations <number>', 'Number of iterations', '3')
.option('--files-per-task <number>', 'Files per task', '4')
.option('--cardinality-task-sf <number>', 'Cardinality task scale factor', '2')
.option('--query <number>', 'A specific query to run', undefined)
.parse(process.argv);

const options = program.opts();

const sf = parseInt(options.sf);
const iterations = parseInt(options.iterations);
const filesPerTask = parseInt(options.filesPerTask);
const cardinalityTaskSf = parseInt(options.cardinalityTaskSf);

// Compare with previous results first
const results: BenchmarkResults = { queries: [] };
const queriesPath = path.join(ROOT, "testdata", "tpch", "queries")

console.log("Creating tables...")
await query(createTablesSql(sf))
await query(`
SET distributed.files_per_task=${filesPerTask};
SET distributed.cardinality_task_count_factor=${cardinalityTaskSf}
`)

for (let id of IDS) {
if (options.query && parseInt(options.query) !== id) {
continue
}

const queryId = `q${id}`;
const filePath = path.join(queriesPath, `${queryId}.sql`)
const content = await fs.readFile(filePath, 'utf-8')

const queryResult: QueryResult = {
query: queryId,
iterations: []
};

for (let i = 0; i < iterations; i++) {
const start = new Date()
const response = await query(content)
const elapsed = Math.round(new Date().getTime() - start.getTime())

queryResult.iterations.push({
elapsed,
row_count: response.count
});

console.log(
`Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.count} rows`
);
}

const avg = Math.round(
queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length
);
console.log(`Query ${id} avg time: ${avg} ms`);

results.queries.push(queryResult);
}

// Write results and compare
const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json");
await compareWithPrevious(results, outputPath);
await writeJson(results, outputPath);
}

// Simple data structures
type QueryResult = {
query: string;
iterations: { elapsed: number; row_count: number }[];
}

type BenchmarkResults = {
queries: QueryResult[];
}

const BenchmarkResults = z.object({
queries: z.array(z.object({
query: z.string(),
iterations: z.array(z.object({
elapsed: z.number(),
row_count: z.number()
}))
}))
})

async function writeJson (results: BenchmarkResults, outputPath?: string) {
if (!outputPath) return;
await fs.writeFile(outputPath, JSON.stringify(results, null, 2));
}

async function compareWithPrevious (results: BenchmarkResults, outputPath: string) {
let prevResults: BenchmarkResults;
try {
const prevContent = await fs.readFile(outputPath, 'utf-8');
prevResults = BenchmarkResults.parse(JSON.parse(prevContent));
} catch {
return; // No previous results to compare
}

console.log('\n==== Comparison with previous run ====');

for (const query of results.queries) {
const prevQuery = prevResults.queries.find(q => q.query === query.query);
if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) {
continue;
}

const avgPrev = Math.round(
prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length
);
const avg = Math.round(
query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length
);

const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev;
const tag = avg < avgPrev ? "faster" : "slower";
const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖");

console.log(
`${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}`
);
}
}


function createTablesSql (sf: number): string {
let stmt = ''
for (const tbl of [
"lineitem",
"orders",
"part",
"partsupp",
"customer",
"nation",
"region",
"supplier",
]) {
// language=SQL format=false
stmt += `
DROP TABLE IF EXISTS ${tbl};
CREATE EXTERNAL TABLE IF NOT EXISTS ${tbl} STORED AS PARQUET LOCATION 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/${tbl}/';
`
}
return stmt
}

const IDS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]

const QueryResponse = z.object({
count: z.number(),
plan: z.string()
})
type QueryResponse = z.infer<typeof QueryResponse>

async function query (sql: string): Promise<QueryResponse> {
const url = new URL('http://localhost:9000')
url.searchParams.set('sql', sql)

const response = await fetch(url.toString())

if (!response.ok) {
const msg = await response.text()
throw new Error(`Query failed: ${response.status} ${msg}`)
}

const unparsed = await response.json()
return QueryResponse.parse(unparsed)
}

main()
.catch(err => {
console.error(err)
process.exit(1)
})
Loading