Skip to content

Commit a42d4a2

Browse files
committed
Add ballista to benchmarks
1 parent 1fb4daa commit a42d4a2

File tree

12 files changed

+2641
-624
lines changed

12 files changed

+2641
-624
lines changed

Cargo.lock

Lines changed: 1725 additions & 377 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/Cargo.toml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ default-run = "dfbench"
88
datafusion = { workspace = true }
99
datafusion-proto = { workspace = true }
1010
datafusion-distributed = { path = "..", features = ["integration"] }
11+
ballista = { version = "50" }
12+
ballista-executor = { version = "50" }
13+
ballista-scheduler = { version = "50" }
14+
ballista-core = "50"
1115
tokio = { version = "1.46.1", features = ["full"] }
1216
parquet = { version = "57.1.0" }
1317
structopt = { version = "0.3.26" }
@@ -28,6 +32,7 @@ object_store = { version = "0.12.4", features = ["aws"] }
2832
aws-config = "1"
2933
aws-sdk-ec2 = "1"
3034
openssl = { version = "0.10", features = ["vendored"] }
35+
clap = "4.5"
3136

3237
[[bin]]
3338
name = "dfbench"
@@ -36,3 +41,15 @@ path = "src/main.rs"
3641
[[bin]]
3742
name = "worker"
3843
path = "cdk/bin/worker.rs"
44+
45+
[[bin]]
46+
name = "ballista-http"
47+
path = "cdk/bin/ballista_http.rs"
48+
49+
[[bin]]
50+
name = "ballista-executor"
51+
path = "cdk/bin/ballista_executor.rs"
52+
53+
[[bin]]
54+
name = "ballista-scheduler"
55+
path = "cdk/bin/ballista_scheduler.rs"
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import path from "path";
2+
import {Command} from "commander";
3+
import {z} from 'zod';
4+
import {BenchmarkRunner, ROOT, runBenchmark, TableSpec} from "./@bench-common";
5+
6+
// Remember to port-forward the ballista HTTP server with
7+
// aws ssm start-session --target {host-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=9002,localPortNumber=9002"
8+
9+
async function main() {
10+
const program = new Command();
11+
12+
program
13+
.option('--dataset <string>', 'Dataset to run queries on')
14+
.option('-i, --iterations <number>', 'Number of iterations', '3')
15+
.option('--query <number>', 'A specific query to run', undefined)
16+
.parse(process.argv);
17+
18+
const options = program.opts();
19+
20+
const dataset: string = options.dataset
21+
const iterations = parseInt(options.iterations);
22+
const queries = options.query ? [parseInt(options.query)] : [];
23+
24+
const runner = new BallistaRunner({});
25+
26+
const datasetPath = path.join(ROOT, "benchmarks", "data", dataset);
27+
const outputPath = path.join(datasetPath, "remote-results.json")
28+
29+
await runBenchmark(runner, {
30+
dataset,
31+
iterations,
32+
queries,
33+
outputPath,
34+
});
35+
}
36+
37+
const QueryResponse = z.object({
38+
count: z.number(),
39+
plan: z.string()
40+
})
41+
type QueryResponse = z.infer<typeof QueryResponse>
42+
43+
class BallistaRunner implements BenchmarkRunner {
44+
private url = 'http://localhost:9002';
45+
46+
constructor(private readonly options: {}) {
47+
}
48+
49+
async executeQuery(sql: string): Promise<{ rowCount: number }> {
50+
let response
51+
if (sql.includes("create view")) {
52+
// This is query 15
53+
let [createView, query, dropView] = sql.split(";")
54+
await this.query(createView);
55+
response = await this.query(query)
56+
await this.query(dropView);
57+
} else {
58+
response = await this.query(sql)
59+
}
60+
61+
return { rowCount: response.count };
62+
}
63+
64+
private async query(sql: string): Promise<QueryResponse> {
65+
const url = new URL(this.url);
66+
url.searchParams.set('sql', sql);
67+
68+
const response = await fetch(url.toString());
69+
70+
if (!response.ok) {
71+
const msg = await response.text();
72+
throw new Error(`Query failed: ${response.status} ${msg}`);
73+
}
74+
75+
const unparsed = await response.json();
76+
return QueryResponse.parse(unparsed);
77+
}
78+
79+
async createTables(tables: TableSpec[]): Promise<void> {
80+
let stmt = '';
81+
for (const table of tables) {
82+
// language=SQL format=false
83+
stmt += `
84+
DROP TABLE IF EXISTS ${table.name};
85+
CREATE EXTERNAL TABLE IF NOT EXISTS ${table.name} STORED AS PARQUET LOCATION '${table.s3Path}';
86+
`;
87+
}
88+
await this.query(stmt);
89+
}
90+
91+
}
92+
93+
main()
94+
.catch(err => {
95+
console.error(err)
96+
process.exit(1)
97+
})
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use ballista::datafusion::execution::runtime_env::RuntimeEnv;
2+
use ballista::datafusion::prelude::SessionConfig;
3+
use ballista_executor::config::Config;
4+
use ballista_executor::executor_process::{ExecutorProcessConfig, start_executor_process};
5+
use clap::Parser;
6+
use object_store::aws::AmazonS3Builder;
7+
use std::env;
8+
use std::sync::Arc;
9+
use url::Url;
10+
11+
#[tokio::main]
12+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
13+
let opt = Config::parse();
14+
15+
let mut config: ExecutorProcessConfig = opt.try_into()?;
16+
17+
let bucket = env::var("BUCKET").unwrap_or("datafusion-distributed-benchmarks".to_string());
18+
let s3_url = Url::parse(&format!("s3://{bucket}"))?;
19+
20+
let s3 = Arc::new(
21+
AmazonS3Builder::from_env()
22+
.with_bucket_name(s3_url.host().unwrap().to_string())
23+
.build()?,
24+
);
25+
let runtime_env = Arc::new(RuntimeEnv::default());
26+
runtime_env.register_object_store(&s3_url, s3);
27+
28+
config.override_runtime_producer = Some(Arc::new(
29+
move |_: &SessionConfig| -> ballista::datafusion::common::Result<Arc<RuntimeEnv>> {
30+
Ok(runtime_env.clone())
31+
},
32+
));
33+
34+
start_executor_process(Arc::new(config)).await?;
35+
Ok(())
36+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use axum::{Json, Router, extract::Query, http::StatusCode, routing::get};
2+
use ballista::datafusion::common::instant::Instant;
3+
use ballista::datafusion::execution::SessionStateBuilder;
4+
use ballista::datafusion::execution::runtime_env::RuntimeEnv;
5+
use ballista::datafusion::physical_plan::displayable;
6+
use ballista::datafusion::physical_plan::execute_stream;
7+
use ballista::datafusion::prelude::SessionConfig;
8+
use ballista::datafusion::prelude::SessionContext;
9+
use ballista::prelude::*;
10+
use futures::{StreamExt, TryFutureExt};
11+
use log::{error, info};
12+
use object_store::aws::AmazonS3Builder;
13+
use serde::Serialize;
14+
use std::collections::HashMap;
15+
use std::error::Error;
16+
use std::fmt::Display;
17+
use std::sync::Arc;
18+
use structopt::StructOpt;
19+
use url::Url;
20+
21+
#[derive(Serialize)]
22+
struct QueryResult {
23+
plan: String,
24+
count: usize,
25+
}
26+
27+
#[derive(Debug, StructOpt, Clone)]
28+
#[structopt(about = "worker spawn command")]
29+
struct Cmd {
30+
/// The bucket name.
31+
#[structopt(long, default_value = "datafusion-distributed-benchmarks")]
32+
bucket: String,
33+
}
34+
35+
#[tokio::main]
36+
async fn main() -> Result<(), Box<dyn Error>> {
37+
env_logger::builder()
38+
.filter_level(log::LevelFilter::Info)
39+
.parse_default_env()
40+
.init();
41+
42+
let cmd = Cmd::from_args();
43+
44+
const LISTENER_ADDR: &str = "0.0.0.0:9002";
45+
46+
info!("Starting HTTP listener on {LISTENER_ADDR}...");
47+
let listener = tokio::net::TcpListener::bind(LISTENER_ADDR).await?;
48+
49+
// Register S3 object store
50+
let s3_url = Url::parse(&format!("s3://{}", cmd.bucket))?;
51+
52+
info!("Building shared SessionContext for the whole lifetime of the HTTP listener...");
53+
let s3 = Arc::new(
54+
AmazonS3Builder::from_env()
55+
.with_bucket_name(s3_url.host().unwrap().to_string())
56+
.build()?,
57+
);
58+
let runtime_env = Arc::new(RuntimeEnv::default());
59+
runtime_env.register_object_store(&s3_url, s3);
60+
61+
let config = SessionConfig::new_with_ballista().with_ballista_job_name("Benchmarks");
62+
63+
let state = SessionStateBuilder::new()
64+
.with_config(config)
65+
.with_default_features()
66+
.with_runtime_env(Arc::clone(&runtime_env))
67+
.build();
68+
let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;
69+
70+
let http_server = axum::serve(
71+
listener,
72+
Router::new().route(
73+
"/",
74+
get(move |Query(params): Query<HashMap<String, String>>| {
75+
let ctx = ctx.clone();
76+
77+
async move {
78+
let sql = params.get("sql").ok_or(err("Missing 'sql' parameter"))?;
79+
80+
let mut df_opt = None;
81+
for sql in sql.split(";") {
82+
if sql.trim().is_empty() {
83+
continue;
84+
}
85+
let df = ctx.sql(sql).await.map_err(err)?;
86+
df_opt = Some(df);
87+
}
88+
let Some(df) = df_opt else {
89+
return Err(err("Empty 'sql' parameter"));
90+
};
91+
92+
let start = Instant::now();
93+
94+
info!("Executing query...");
95+
let physical = df.create_physical_plan().await.map_err(err)?;
96+
let mut stream =
97+
execute_stream(physical.clone(), ctx.task_ctx()).map_err(err)?;
98+
let mut count = 0;
99+
while let Some(batch) = stream.next().await {
100+
count += batch.map_err(err)?.num_rows();
101+
info!("Gathered {count} rows, query still in progress..")
102+
}
103+
let plan = displayable(physical.as_ref()).indent(true).to_string();
104+
let elapsed = start.elapsed();
105+
let ms = elapsed.as_secs_f64() * 1000.0;
106+
info!("Returned {count} rows in {ms} ms");
107+
108+
Ok::<_, (StatusCode, String)>(Json(QueryResult { count, plan }))
109+
}
110+
.inspect_err(|(_, msg)| {
111+
error!("Error executing query: {msg}");
112+
})
113+
}),
114+
),
115+
);
116+
117+
info!("Started listener HTTP server in {LISTENER_ADDR}");
118+
http_server.await?;
119+
Ok(())
120+
}
121+
122+
fn err(s: impl Display) -> (StatusCode, String) {
123+
(StatusCode::INTERNAL_SERVER_ERROR, s.to_string())
124+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use ballista::datafusion::execution::runtime_env::RuntimeEnv;
2+
use ballista::datafusion::execution::{SessionState, SessionStateBuilder};
3+
use ballista::datafusion::prelude::SessionConfig;
4+
use ballista_core::error::BallistaError;
5+
use ballista_core::extension::SessionConfigExt;
6+
use ballista_scheduler::cluster::BallistaCluster;
7+
use ballista_scheduler::config::{Config, SchedulerConfig};
8+
use ballista_scheduler::scheduler_process::start_server;
9+
use clap::Parser;
10+
use object_store::aws::AmazonS3Builder;
11+
use std::env;
12+
use std::sync::Arc;
13+
use url::Url;
14+
15+
fn main() -> Result<(), Box<dyn std::error::Error>> {
16+
let runtime = tokio::runtime::Builder::new_multi_thread()
17+
.enable_io()
18+
.enable_time()
19+
.thread_stack_size(32 * 1024 * 1024) // 32MB
20+
.build()?;
21+
22+
runtime.block_on(inner())
23+
}
24+
25+
async fn inner() -> Result<(), Box<dyn std::error::Error>> {
26+
let opt = Config::parse();
27+
28+
let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
29+
let addr = addr
30+
.parse()
31+
.map_err(|e: std::net::AddrParseError| BallistaError::Configuration(e.to_string()))?;
32+
33+
let bucket = env::var("BUCKET").unwrap_or("datafusion-distributed-benchmarks".to_string());
34+
let s3_url = Url::parse(&format!("s3://{bucket}"))?;
35+
36+
let s3 = Arc::new(
37+
AmazonS3Builder::from_env()
38+
.with_bucket_name(s3_url.host().unwrap().to_string())
39+
.build()?,
40+
);
41+
let runtime_env = Arc::new(RuntimeEnv::default());
42+
runtime_env.register_object_store(&s3_url, s3);
43+
44+
let config: SchedulerConfig = opt.try_into()?;
45+
let config = config.with_override_config_producer(Arc::new(|| {
46+
SessionConfig::new_with_ballista().with_information_schema(true)
47+
}));
48+
let config = config.with_override_session_builder(Arc::new(
49+
move |cfg: SessionConfig| -> ballista::datafusion::common::Result<SessionState> {
50+
Ok(SessionStateBuilder::new()
51+
.with_config(cfg)
52+
.with_runtime_env(runtime_env.clone())
53+
.with_default_features()
54+
.build())
55+
},
56+
));
57+
58+
let cluster = BallistaCluster::new_from_config(&config).await?;
59+
start_server(cluster, addr, Arc::new(config)).await?;
60+
61+
Ok(())
62+
}

benchmarks/cdk/bin/cdk.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11
#!/usr/bin/env node
22
import * as cdk from 'aws-cdk-lib/core';
3-
import { CdkStack } from '../lib/cdk-stack';
3+
import {CdkStack} from '../lib/cdk-stack';
4+
import {DATAFUSION_DISTRIBUTED_ENGINE} from "../lib/datafusion-distributed";
5+
import {TRINO_ENGINE} from "../lib/trino";
6+
import {BALLISTA_ENGINE} from "../lib/ballista";
47

58
const app = new cdk.App();
69

710
const config = {
8-
instanceType: 't3.xlarge',
9-
instanceCount: 4,
11+
instanceType: 't3.xlarge',
12+
instanceCount: 4,
13+
engines: [
14+
DATAFUSION_DISTRIBUTED_ENGINE,
15+
TRINO_ENGINE,
16+
BALLISTA_ENGINE
17+
]
1018
};
1119

1220
new CdkStack(app, 'DataFusionDistributedBenchmarks', { config });

0 commit comments

Comments
 (0)