diff --git a/benchmarks/cdk/bin/spark-bench.ts b/benchmarks/cdk/bin/spark-bench.ts new file mode 100644 index 0000000..2588e0d --- /dev/null +++ b/benchmarks/cdk/bin/spark-bench.ts @@ -0,0 +1,109 @@ +import path from "path"; +import {Command} from "commander"; +import {z} from 'zod'; +import {BenchmarkRunner, ROOT, runBenchmark, TableSpec} from "./@bench-common"; + +// Remember to port-forward the Spark HTTP server with +// aws ssm start-session --target {host-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=9003,localPortNumber=9003" + +async function main() { + const program = new Command(); + + program + .option('--dataset ', 'Dataset to run queries on') + .option('-i, --iterations ', 'Number of iterations', '3') + .option('--query ', 'A specific query to run', undefined) + .parse(process.argv); + + const options = program.opts(); + + const dataset: string = options.dataset + const iterations = parseInt(options.iterations); + const queries = options.query ? [parseInt(options.query)] : []; + + const runner = new SparkRunner({}); + + const datasetPath = path.join(ROOT, "benchmarks", "data", dataset); + const outputPath = path.join(datasetPath, "remote-results.json") + + await runBenchmark(runner, { + dataset, + iterations, + queries, + outputPath, + }); +} + +const QueryResponse = z.object({ + count: z.number() +}) +type QueryResponse = z.infer + +class SparkRunner implements BenchmarkRunner { + private url = 'http://localhost:9003'; + + constructor(private readonly options: {}) { + } + + async executeQuery(sql: string): Promise<{ rowCount: number }> { + // Fix TPCH query 4: Add DATE prefix to date literals + sql = sql.replace(/(? { + const response = await fetch(`${this.url}/query`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + query: sql.trim().replace(/;+$/, '') + }) + }); + + if (!response.ok) { + const msg = await response.text(); + throw new Error(`Query failed: ${response.status} ${msg}`); + } + + return QueryResponse.parse(await response.json()); + } + + async createTables(tables: TableSpec[]): Promise { + for (const table of tables) { + // Spark requires s3a:// protocol, not s3:// + const s3aPath = table.s3Path.replace('s3://', 's3a://'); + + // Create temporary view from Parquet files + const createViewStmt = ` + CREATE OR REPLACE TEMPORARY VIEW ${table.name} + USING parquet + OPTIONS (path '${s3aPath}') + `; + await this.query(createViewStmt); + } + } + +} + +main() + .catch(err => { + console.error(err) + process.exit(1) + }) diff --git a/benchmarks/cdk/bin/spark_http.py b/benchmarks/cdk/bin/spark_http.py new file mode 100644 index 0000000..1b39817 --- /dev/null +++ b/benchmarks/cdk/bin/spark_http.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +import os +from flask import Flask, request, jsonify +from pyspark.sql import SparkSession + +app = Flask(__name__) + +# Initialize Spark session +spark = None + +def get_spark(): + global spark + if spark is None: + master_host = os.environ.get('SPARK_MASTER_HOST', 'localhost') + spark_jars = os.environ.get('SPARK_JARS', '/opt/spark/jars/hadoop-aws-3.4.1.jar,/opt/spark/jars/bundle-2.29.52.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar') + spark = SparkSession.builder \ + .appName("SparkHTTPServer") \ + .master(f"spark://{master_host}:7077") \ + .config("spark.jars", spark_jars) \ + .config("spark.sql.catalogImplementation", "hive") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider") \ + .enableHiveSupport() \ + .getOrCreate() + + # Set log level to reduce noise + spark.sparkContext.setLogLevel("WARN") + return spark + +@app.route('/health', methods=['GET']) +def health(): + """Health check endpoint""" + return jsonify({"status": "healthy"}), 200 + +@app.route('/query', methods=['POST']) +def execute_query(): + """Execute a SQL query on Spark""" + try: + data = request.get_json() + if not data or 'query' not in data: + return jsonify({"error": "Missing 'query' in request body"}), 400 + + query = data['query'] + + # Execute the query + spark_session = get_spark() + df = spark_session.sql(query) + + # Get row count without collecting all data + count = df.count() + + return jsonify({"count": count}), 200 + + except Exception as e: + return str(e), 500 + +if __name__ == '__main__': + # Run Flask server on port 9000 + port = int(os.environ.get('HTTP_PORT', 9003)) + app.run(host='0.0.0.0', port=port, debug=False) diff --git a/benchmarks/cdk/lib/cdk-stack.ts b/benchmarks/cdk/lib/cdk-stack.ts index febeb01..f20ad86 100644 --- a/benchmarks/cdk/lib/cdk-stack.ts +++ b/benchmarks/cdk/lib/cdk-stack.ts @@ -6,6 +6,7 @@ import {Construct} from 'constructs'; import {DATAFUSION_DISTRIBUTED_ENGINE} from "./datafusion-distributed"; import {BALLISTA_ENGINE} from "./ballista"; import {TRINO_ENGINE} from "./trino"; +import {SPARK_ENGINE} from "./spark"; import path from "path"; import * as cr from "aws-cdk-lib/custom-resources"; @@ -17,7 +18,8 @@ if (USER_DATA_CAUSES_REPLACEMENT) { const ENGINES = [ DATAFUSION_DISTRIBUTED_ENGINE, BALLISTA_ENGINE, - TRINO_ENGINE + TRINO_ENGINE, + SPARK_ENGINE ] export const ROOT = path.join(__dirname, '../../..') diff --git a/benchmarks/cdk/lib/spark.ts b/benchmarks/cdk/lib/spark.ts new file mode 100644 index 0000000..bf8a759 --- /dev/null +++ b/benchmarks/cdk/lib/spark.ts @@ -0,0 +1,342 @@ +import { + AfterEc2MachinesContext, + BeforeEc2MachinesContext, + QueryEngine, + ROOT, + sendCommandsUnconditionally, + OnEc2MachinesContext +} from "./cdk-stack"; +import * as s3assets from "aws-cdk-lib/aws-s3-assets"; +import path from "path"; + +const SPARK_VERSION = "4.0.0" +const HADOOP_VERSION = "3" + +let sparkHttpScript: s3assets.Asset +let sparkRequirements: s3assets.Asset + +export const SPARK_ENGINE: QueryEngine = { + beforeEc2Machines(ctx: BeforeEc2MachinesContext): void { + // Upload Python HTTP server script to S3 + sparkHttpScript = new s3assets.Asset(ctx.scope, 'SparkHttpScript', { + path: path.join(ROOT, 'benchmarks/cdk/bin/spark_http.py'), + }) + + // Upload Python requirements file to S3 + sparkRequirements = new s3assets.Asset(ctx.scope, 'SparkRequirements', { + path: path.join(ROOT, 'benchmarks/cdk/requirements.txt'), + }) + + sparkHttpScript.grantRead(ctx.role) + sparkRequirements.grantRead(ctx.role) + }, + onEc2Machine(ctx: OnEc2MachinesContext): void { + const isMaster = ctx.instanceIdx === 0; + ctx.instanceUserData.addCommands( + // Install Java 17 for Spark + 'yum install -y java-17-amazon-corretto-headless python3 python3-pip', + + // Download and install Spark + 'cd /opt', + `curl -L -o spark.tgz https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz`, + 'tar -xzf spark.tgz', + `mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} spark`, + 'rm spark.tgz', + + // Download AWS JARs for S3 access (Hadoop 3.4.1 with AWS SDK V2 for Spark 4.0) + 'cd /opt/spark/jars', + 'curl -L -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.4.1/hadoop-aws-3.4.1.jar', + 'curl -L -O https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.29.52/bundle-2.29.52.jar', + 'curl -L -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar', + + // Create Spark directories + 'mkdir -p /var/spark/logs', + 'mkdir -p /var/spark/work', + 'mkdir -p /var/spark/http', + + // Download Python scripts from S3 + `aws s3 cp s3://${sparkHttpScript.s3BucketName}/${sparkHttpScript.s3ObjectKey} /var/spark/http/spark_http.py`, + 'chmod +x /var/spark/http/spark_http.py', + `aws s3 cp s3://${sparkRequirements.s3BucketName}/${sparkRequirements.s3ObjectKey} /var/spark/http/requirements.txt`, + + // Install Python dependencies + 'pip3 install -r /var/spark/http/requirements.txt', + + // Configure Spark defaults + `cat > /opt/spark/conf/spark-defaults.conf << 'SPARK_EOF' +spark.master spark://localhost:7077 +spark.executor.memory 4g +spark.driver.memory 2g +spark.sql.warehouse.dir /var/spark/warehouse +spark.jars /opt/spark/jars/hadoop-aws-3.4.1.jar,/opt/spark/jars/bundle-2.29.52.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.aws.credentials.provider com.amazonaws.auth.InstanceProfileCredentialsProvider +spark.hadoop.fs.s3a.connection.timeout 60000 +spark.hadoop.fs.s3a.connection.establish.timeout 60000 +spark.hadoop.fs.s3a.attempts.maximum 10 +spark.sql.catalogImplementation hive +spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory +spark.sql.hive.metastore.version 2.3.9 +spark.sql.hive.metastore.jars builtin +SPARK_EOF`, + + // Configure Hadoop core-site.xml for S3A with numeric timeouts + `cat > /opt/spark/conf/core-site.xml << 'CORE_SITE_EOF' + + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.aws.credentials.provider + com.amazonaws.auth.InstanceProfileCredentialsProvider + + + fs.s3a.connection.timeout + 60000 + + + fs.s3a.connection.establish.timeout + 60000 + + + fs.s3a.connection.request.timeout + 60000 + + + fs.s3a.attempts.maximum + 10 + + + fs.s3a.retry.interval + 500 + + + fs.s3a.retry.limit + 10 + + +CORE_SITE_EOF`, + + // Configure Spark environment + `cat > /opt/spark/conf/spark-env.sh << 'SPARK_EOF' +#!/usr/bin/env bash +export SPARK_WORKER_DIR=/var/spark/work +export SPARK_LOG_DIR=/var/spark/logs +export SPARK_MASTER_HOST=localhost +export SPARK_MASTER_PORT=7077 +export SPARK_MASTER_WEBUI_PORT=8082 +export SPARK_WORKER_WEBUI_PORT=8083 +export JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 +SPARK_EOF`, + 'chmod +x /opt/spark/conf/spark-env.sh', + + // Create Spark master systemd service (master only) + ...(isMaster ? [ + `cat > /etc/systemd/system/spark-master.service << 'SPARK_EOF' +[Unit] +Description=Spark Master +After=network.target + +[Service] +Type=forking +ExecStart=/opt/spark/sbin/start-master.sh +ExecStop=/opt/spark/sbin/stop-master.sh +Restart=on-failure +RestartSec=5 +User=root +WorkingDirectory=/opt/spark +Environment="JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64" +Environment="SPARK_LOG_DIR=/var/spark/logs" + +[Install] +WantedBy=multi-user.target +SPARK_EOF` + ] : []), + + // Create Spark worker systemd service (will be reconfigured for workers) + `cat > /etc/systemd/system/spark-worker.service << 'SPARK_EOF' +[Unit] +Description=Spark Worker +After=network.target${isMaster ? ' spark-master.service' : ''} +${isMaster ? 'Requires=spark-master.service' : ''} + +[Service] +Type=forking +ExecStart=/opt/spark/sbin/start-worker.sh spark://localhost:7077 +ExecStop=/opt/spark/sbin/stop-worker.sh +Restart=on-failure +RestartSec=5 +User=root +WorkingDirectory=/opt/spark +Environment="JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64" +Environment="SPARK_LOG_DIR=/var/spark/logs" +Environment="SPARK_WORKER_DIR=/var/spark/work" + +[Install] +WantedBy=multi-user.target +SPARK_EOF`, + + // Create HTTP server systemd service (master only) + ...(isMaster ? [ + `cat > /etc/systemd/system/spark-http.service << 'SPARK_EOF' +[Unit] +Description=Spark HTTP Server +After=network.target spark-master.service +Requires=spark-master.service + +[Service] +Type=simple +ExecStart=/usr/bin/python3 /var/spark/http/spark_http.py +Restart=on-failure +RestartSec=5 +User=root +WorkingDirectory=/var/spark/http +Environment="SPARK_MASTER_HOST=localhost" +Environment="HTTP_PORT=9003" +Environment="JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64" +Environment="SPARK_JARS=/opt/spark/jars/hadoop-aws-3.4.1.jar,/opt/spark/jars/bundle-2.29.52.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar" +StandardOutput=append:/var/spark/logs/http.log +StandardError=append:/var/spark/logs/http.log + +[Install] +WantedBy=multi-user.target +SPARK_EOF` + ] : []), + + // Reload systemd and enable services + 'systemctl daemon-reload', + + // Enable and start master (master only) + ...(isMaster ? [ + 'systemctl enable spark-master', + 'systemctl start spark-master', + 'sleep 5' + ] : []), + + // Enable and start worker (all nodes) + 'systemctl enable spark-worker', + 'systemctl start spark-worker', + + // Enable and start HTTP server (master only) + ...(isMaster ? [ + 'systemctl enable spark-http', + 'systemctl start spark-http' + ] : []) + ) + }, + afterEc2Machines(ctx: AfterEc2MachinesContext): void { + const [master, ...workers] = ctx.instances + + // Reconfigure all workers to point to master IP + sendCommandsUnconditionally( + ctx.scope, + 'ConfigureSparkWorkers', + workers, + [ + `cat > /opt/spark/conf/spark-env.sh << 'SPARK_EOF' +#!/usr/bin/env bash +export SPARK_WORKER_DIR=/var/spark/work +export SPARK_LOG_DIR=/var/spark/logs +export SPARK_MASTER_HOST=${master.instancePrivateIp} +export SPARK_MASTER_PORT=7077 +export SPARK_MASTER_WEBUI_PORT=8082 +export SPARK_WORKER_WEBUI_PORT=8081 +export JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 +SPARK_EOF`, + 'chmod +x /opt/spark/conf/spark-env.sh', + `cat > /etc/systemd/system/spark-worker.service << 'SPARK_EOF' +[Unit] +Description=Spark Worker +After=network.target + +[Service] +Type=forking +ExecStart=/opt/spark/sbin/start-worker.sh spark://${master.instancePrivateIp}:7077 +ExecStop=/opt/spark/sbin/stop-worker.sh +Restart=on-failure +RestartSec=5 +User=root +WorkingDirectory=/opt/spark +Environment="JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64" +Environment="SPARK_LOG_DIR=/var/spark/logs" +Environment="SPARK_WORKER_DIR=/var/spark/work" + +[Install] +WantedBy=multi-user.target +SPARK_EOF`, + 'systemctl daemon-reload', + 'systemctl restart spark-worker' + ] + ) + + // Also update master's spark-env.sh to use its own IP for proper cluster formation + sendCommandsUnconditionally( + ctx.scope, + 'ConfigureSparkMaster', + [master], + [ + `cat > /opt/spark/conf/spark-env.sh << 'SPARK_EOF' +#!/usr/bin/env bash +export SPARK_WORKER_DIR=/var/spark/work +export SPARK_LOG_DIR=/var/spark/logs +export SPARK_MASTER_HOST=${master.instancePrivateIp} +export SPARK_MASTER_PORT=7077 +export SPARK_MASTER_WEBUI_PORT=8082 +export SPARK_WORKER_WEBUI_PORT=8081 +export JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 +SPARK_EOF`, + 'chmod +x /opt/spark/conf/spark-env.sh', + `cat > /etc/systemd/system/spark-worker.service << 'SPARK_EOF' +[Unit] +Description=Spark Worker +After=network.target spark-master.service +Requires=spark-master.service + +[Service] +Type=forking +ExecStart=/opt/spark/sbin/start-worker.sh spark://${master.instancePrivateIp}:7077 +ExecStop=/opt/spark/sbin/stop-worker.sh +Restart=on-failure +RestartSec=5 +User=root +WorkingDirectory=/opt/spark +Environment="JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64" +Environment="SPARK_LOG_DIR=/var/spark/logs" +Environment="SPARK_WORKER_DIR=/var/spark/work" + +[Install] +WantedBy=multi-user.target +SPARK_EOF`, + `cat > /etc/systemd/system/spark-http.service << 'SPARK_EOF' +[Unit] +Description=Spark HTTP Server +After=network.target spark-master.service +Requires=spark-master.service + +[Service] +Type=simple +ExecStart=/usr/bin/python3 /var/spark/http/spark_http.py +Restart=on-failure +RestartSec=5 +User=root +WorkingDirectory=/var/spark/http +Environment="SPARK_MASTER_HOST=${master.instancePrivateIp}" +Environment="HTTP_PORT=9003" +Environment="JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64" +Environment="SPARK_JARS=/opt/spark/jars/hadoop-aws-3.4.1.jar,/opt/spark/jars/bundle-2.29.52.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar" +StandardOutput=append:/var/spark/logs/http.log +StandardError=append:/var/spark/logs/http.log + +[Install] +WantedBy=multi-user.target +SPARK_EOF`, + 'systemctl daemon-reload', + 'systemctl restart spark-master', + 'systemctl restart spark-worker', + 'systemctl restart spark-http' + ] + ) + } +} diff --git a/benchmarks/cdk/package.json b/benchmarks/cdk/package.json index 67b6ee8..719d58d 100644 --- a/benchmarks/cdk/package.json +++ b/benchmarks/cdk/package.json @@ -12,7 +12,8 @@ "sync-bucket": "aws s3 sync ../data s3://datafusion-distributed-benchmarks/", "datafusion-bench": "npx ts-node bin/datafusion-bench.ts", "ballista-bench": "npx ts-node bin/ballista-bench.ts", - "trino-bench": "npx ts-node bin/trino-bench.ts" + "trino-bench": "npx ts-node bin/trino-bench.ts", + "spark-bench": "npx ts-node bin/spark-bench.ts" }, "devDependencies": { "@types/jest": "^29.5.14", diff --git a/benchmarks/cdk/requirements.txt b/benchmarks/cdk/requirements.txt new file mode 100644 index 0000000..1ee6733 --- /dev/null +++ b/benchmarks/cdk/requirements.txt @@ -0,0 +1,2 @@ +flask==3.0.0 +pyspark==4.0.0