Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,102 changes: 1,725 additions & 377 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ default-run = "dfbench"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-distributed = { path = "..", features = ["integration"] }
ballista = { version = "50" }
ballista-executor = { version = "50" }
ballista-scheduler = { version = "50" }
ballista-core = "50"
tokio = { version = "1.46.1", features = ["full"] }
parquet = { version = "57.1.0" }
structopt = { version = "0.3.26" }
Expand All @@ -28,6 +32,7 @@ object_store = { version = "0.12.4", features = ["aws"] }
aws-config = "1"
aws-sdk-ec2 = "1"
openssl = { version = "0.10", features = ["vendored"] }
clap = "4.5"

[[bin]]
name = "dfbench"
Expand All @@ -36,3 +41,15 @@ path = "src/main.rs"
[[bin]]
name = "worker"
path = "cdk/bin/worker.rs"

[[bin]]
name = "ballista-http"
path = "cdk/bin/ballista_http.rs"

[[bin]]
name = "ballista-executor"
path = "cdk/bin/ballista_executor.rs"

[[bin]]
name = "ballista-scheduler"
path = "cdk/bin/ballista_scheduler.rs"
97 changes: 97 additions & 0 deletions benchmarks/cdk/bin/ballista-bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import path from "path";
import {Command} from "commander";
import {z} from 'zod';
import {BenchmarkRunner, ROOT, runBenchmark, TableSpec} from "./@bench-common";

// Remember to port-forward the ballista HTTP server with
// aws ssm start-session --target {host-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=9002,localPortNumber=9002"

async function main() {
const program = new Command();

program
.option('--dataset <string>', 'Dataset to run queries on')
.option('-i, --iterations <number>', 'Number of iterations', '3')
.option('--query <number>', 'A specific query to run', undefined)
.parse(process.argv);

const options = program.opts();

const dataset: string = options.dataset
const iterations = parseInt(options.iterations);
const queries = options.query ? [parseInt(options.query)] : [];

const runner = new BallistaRunner({});

const datasetPath = path.join(ROOT, "benchmarks", "data", dataset);
const outputPath = path.join(datasetPath, "remote-results.json")

await runBenchmark(runner, {
dataset,
iterations,
queries,
outputPath,
});
}

const QueryResponse = z.object({
count: z.number(),
plan: z.string()
})
type QueryResponse = z.infer<typeof QueryResponse>

class BallistaRunner implements BenchmarkRunner {
private url = 'http://localhost:9002';

constructor(private readonly options: {}) {
}

async executeQuery(sql: string): Promise<{ rowCount: number }> {
let response
if (sql.includes("create view")) {
// This is query 15
let [createView, query, dropView] = sql.split(";")
await this.query(createView);
response = await this.query(query)
await this.query(dropView);
} else {
response = await this.query(sql)
}

return { rowCount: response.count };
}

private async query(sql: string): Promise<QueryResponse> {
const url = new URL(this.url);
url.searchParams.set('sql', sql);

const response = await fetch(url.toString());

if (!response.ok) {
const msg = await response.text();
throw new Error(`Query failed: ${response.status} ${msg}`);
}

const unparsed = await response.json();
return QueryResponse.parse(unparsed);
}

async createTables(tables: TableSpec[]): Promise<void> {
let stmt = '';
for (const table of tables) {
// language=SQL format=false
stmt += `
DROP TABLE IF EXISTS ${table.name};
CREATE EXTERNAL TABLE IF NOT EXISTS ${table.name} STORED AS PARQUET LOCATION '${table.s3Path}';
`;
}
await this.query(stmt);
}

}

main()
.catch(err => {
console.error(err)
process.exit(1)
})
36 changes: 36 additions & 0 deletions benchmarks/cdk/bin/ballista_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use ballista::datafusion::execution::runtime_env::RuntimeEnv;
use ballista::datafusion::prelude::SessionConfig;
use ballista_executor::config::Config;
use ballista_executor::executor_process::{ExecutorProcessConfig, start_executor_process};
use clap::Parser;
use object_store::aws::AmazonS3Builder;
use std::env;
use std::sync::Arc;
use url::Url;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let opt = Config::parse();

let mut config: ExecutorProcessConfig = opt.try_into()?;

let bucket = env::var("BUCKET").unwrap_or("datafusion-distributed-benchmarks".to_string());
let s3_url = Url::parse(&format!("s3://{bucket}"))?;

let s3 = Arc::new(
AmazonS3Builder::from_env()
.with_bucket_name(s3_url.host().unwrap().to_string())
.build()?,
);
let runtime_env = Arc::new(RuntimeEnv::default());
runtime_env.register_object_store(&s3_url, s3);

config.override_runtime_producer = Some(Arc::new(
move |_: &SessionConfig| -> ballista::datafusion::common::Result<Arc<RuntimeEnv>> {
Ok(runtime_env.clone())
},
));

start_executor_process(Arc::new(config)).await?;
Ok(())
}
124 changes: 124 additions & 0 deletions benchmarks/cdk/bin/ballista_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use axum::{Json, Router, extract::Query, http::StatusCode, routing::get};
use ballista::datafusion::common::instant::Instant;
use ballista::datafusion::execution::SessionStateBuilder;
use ballista::datafusion::execution::runtime_env::RuntimeEnv;
use ballista::datafusion::physical_plan::displayable;
use ballista::datafusion::physical_plan::execute_stream;
use ballista::datafusion::prelude::SessionConfig;
use ballista::datafusion::prelude::SessionContext;
use ballista::prelude::*;
use futures::{StreamExt, TryFutureExt};
use log::{error, info};
use object_store::aws::AmazonS3Builder;
use serde::Serialize;
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Display;
use std::sync::Arc;
use structopt::StructOpt;
use url::Url;

#[derive(Serialize)]
struct QueryResult {
plan: String,
count: usize,
}

#[derive(Debug, StructOpt, Clone)]
#[structopt(about = "worker spawn command")]
struct Cmd {
/// The bucket name.
#[structopt(long, default_value = "datafusion-distributed-benchmarks")]
bucket: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.init();

let cmd = Cmd::from_args();

const LISTENER_ADDR: &str = "0.0.0.0:9002";

info!("Starting HTTP listener on {LISTENER_ADDR}...");
let listener = tokio::net::TcpListener::bind(LISTENER_ADDR).await?;

// Register S3 object store
let s3_url = Url::parse(&format!("s3://{}", cmd.bucket))?;

info!("Building shared SessionContext for the whole lifetime of the HTTP listener...");
let s3 = Arc::new(
AmazonS3Builder::from_env()
.with_bucket_name(s3_url.host().unwrap().to_string())
.build()?,
);
let runtime_env = Arc::new(RuntimeEnv::default());
runtime_env.register_object_store(&s3_url, s3);

let config = SessionConfig::new_with_ballista().with_ballista_job_name("Benchmarks");

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.with_runtime_env(Arc::clone(&runtime_env))
.build();
let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

let http_server = axum::serve(
listener,
Router::new().route(
"/",
get(move |Query(params): Query<HashMap<String, String>>| {
let ctx = ctx.clone();

async move {
let sql = params.get("sql").ok_or(err("Missing 'sql' parameter"))?;

let mut df_opt = None;
for sql in sql.split(";") {
if sql.trim().is_empty() {
continue;
}
let df = ctx.sql(sql).await.map_err(err)?;
df_opt = Some(df);
}
let Some(df) = df_opt else {
return Err(err("Empty 'sql' parameter"));
};

let start = Instant::now();

info!("Executing query...");
let physical = df.create_physical_plan().await.map_err(err)?;
let mut stream =
execute_stream(physical.clone(), ctx.task_ctx()).map_err(err)?;
let mut count = 0;
while let Some(batch) = stream.next().await {
count += batch.map_err(err)?.num_rows();
info!("Gathered {count} rows, query still in progress..")
}
let plan = displayable(physical.as_ref()).indent(true).to_string();
let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
info!("Returned {count} rows in {ms} ms");

Ok::<_, (StatusCode, String)>(Json(QueryResult { count, plan }))
}
.inspect_err(|(_, msg)| {
error!("Error executing query: {msg}");
})
}),
),
);

info!("Started listener HTTP server in {LISTENER_ADDR}");
http_server.await?;
Ok(())
}

fn err(s: impl Display) -> (StatusCode, String) {
(StatusCode::INTERNAL_SERVER_ERROR, s.to_string())
}
62 changes: 62 additions & 0 deletions benchmarks/cdk/bin/ballista_scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use ballista::datafusion::execution::runtime_env::RuntimeEnv;
use ballista::datafusion::execution::{SessionState, SessionStateBuilder};
use ballista::datafusion::prelude::SessionConfig;
use ballista_core::error::BallistaError;
use ballista_core::extension::SessionConfigExt;
use ballista_scheduler::cluster::BallistaCluster;
use ballista_scheduler::config::{Config, SchedulerConfig};
use ballista_scheduler::scheduler_process::start_server;
use clap::Parser;
use object_store::aws::AmazonS3Builder;
use std::env;
use std::sync::Arc;
use url::Url;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_stack_size(32 * 1024 * 1024) // 32MB
.build()?;

runtime.block_on(inner())
}

async fn inner() -> Result<(), Box<dyn std::error::Error>> {
let opt = Config::parse();

let addr = format!("{}:{}", opt.bind_host, opt.bind_port);
let addr = addr
.parse()
.map_err(|e: std::net::AddrParseError| BallistaError::Configuration(e.to_string()))?;

let bucket = env::var("BUCKET").unwrap_or("datafusion-distributed-benchmarks".to_string());
let s3_url = Url::parse(&format!("s3://{bucket}"))?;

let s3 = Arc::new(
AmazonS3Builder::from_env()
.with_bucket_name(s3_url.host().unwrap().to_string())
.build()?,
);
let runtime_env = Arc::new(RuntimeEnv::default());
runtime_env.register_object_store(&s3_url, s3);

let config: SchedulerConfig = opt.try_into()?;
let config = config.with_override_config_producer(Arc::new(|| {
SessionConfig::new_with_ballista().with_information_schema(true)
}));
let config = config.with_override_session_builder(Arc::new(
move |cfg: SessionConfig| -> ballista::datafusion::common::Result<SessionState> {
Ok(SessionStateBuilder::new()
.with_config(cfg)
.with_runtime_env(runtime_env.clone())
.with_default_features()
.build())
},
));

let cluster = BallistaCluster::new_from_config(&config).await?;
start_server(cluster, addr, Arc::new(config)).await?;

Ok(())
}
14 changes: 11 additions & 3 deletions benchmarks/cdk/bin/cdk.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
#!/usr/bin/env node
import * as cdk from 'aws-cdk-lib/core';
import { CdkStack } from '../lib/cdk-stack';
import {CdkStack} from '../lib/cdk-stack';
import {DATAFUSION_DISTRIBUTED_ENGINE} from "../lib/datafusion-distributed";
import {TRINO_ENGINE} from "../lib/trino";
import {BALLISTA_ENGINE} from "../lib/ballista";

const app = new cdk.App();

const config = {
instanceType: 't3.xlarge',
instanceCount: 4,
instanceType: 't3.xlarge',
instanceCount: 4,
engines: [
DATAFUSION_DISTRIBUTED_ENGINE,
TRINO_ENGINE,
BALLISTA_ENGINE
]
};

new CdkStack(app, 'DataFusionDistributedBenchmarks', { config });
Loading