From c6cce42aa8e8d87ba0ff457065cd1c3f385740bb Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Mon, 24 Nov 2025 21:25:57 +0100 Subject: [PATCH 1/8] Add trino to CDK --- benchmarks/cdk/lib/cdk-stack.ts | 77 ++++++++++++++++---------- benchmarks/cdk/lib/trino.ts | 98 +++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 29 deletions(-) create mode 100644 benchmarks/cdk/lib/trino.ts diff --git a/benchmarks/cdk/lib/cdk-stack.ts b/benchmarks/cdk/lib/cdk-stack.ts index 28fe3db9..abd61537 100644 --- a/benchmarks/cdk/lib/cdk-stack.ts +++ b/benchmarks/cdk/lib/cdk-stack.ts @@ -7,6 +7,7 @@ import * as cr from 'aws-cdk-lib/custom-resources'; import { Construct } from 'constructs'; import * as path from 'path'; import { execSync } from 'child_process'; +import { trinoAfterDeployCommands, trinoUserDataCommands } from "./trino"; const ROOT = path.join(__dirname, '../../..') @@ -18,7 +19,7 @@ interface CdkStackProps extends StackProps { } export class CdkStack extends Stack { - constructor (scope: Construct, id: string, props: CdkStackProps) { + constructor(scope: Construct, id: string, props: CdkStackProps) { super(scope, id, props); const { config } = props; @@ -122,7 +123,8 @@ EOF`, // Enable and start the service 'systemctl daemon-reload', 'systemctl enable worker', - 'systemctl start worker' + 'systemctl start worker', + ...trinoUserDataCommands(i) ); const instance = new ec2.Instance(this, `BenchmarkInstance${i}`, { @@ -161,33 +163,50 @@ sudo journalctl -u worker.service -f -o cat }); // Custom resource to restart worker service on every deploy - const restartWorker = new cr.AwsCustomResource(this, 'RestartWorkerService', { - onUpdate: { - service: 'SSM', - action: 'sendCommand', - parameters: { - DocumentName: 'AWS-RunShellScript', - InstanceIds: instances.map(inst => inst.instanceId), - Parameters: { - commands: [ - `aws s3 cp s3://${workerBinary.s3BucketName}/${workerBinary.s3ObjectKey} /usr/local/bin/worker`, - 'chmod +x /usr/local/bin/worker', - 'systemctl restart worker', - ], - }, + sendCommandsUnconditionally(this, 'RestartWorkerService', instances, [ + `aws s3 cp s3://${workerBinary.s3BucketName}/${workerBinary.s3ObjectKey} /usr/local/bin/worker`, + 'chmod +x /usr/local/bin/worker', + 'systemctl restart worker', + ]) + + // Start coordinator first + sendCommandsUnconditionally(this, 'RestartTrinoCoordinator', [instances[0]], [ + 'systemctl start trino', + ]) + + // Then start workers (they will discover the coordinator) + sendCommandsUnconditionally(this, 'RestartTrinoWorkers', instances.slice(1), trinoAfterDeployCommands(this.region)) + } +} + +function sendCommandsUnconditionally( + construct: Construct, + name: string, + instances: ec2.Instance[], + commands: string[] +) { + const cmd = new cr.AwsCustomResource(construct, name, { + onUpdate: { + service: 'SSM', + action: 'sendCommand', + parameters: { + DocumentName: 'AWS-RunShellScript', + InstanceIds: instances.map(inst => inst.instanceId), + Parameters: { + commands }, - physicalResourceId: cr.PhysicalResourceId.of(`restart-${Date.now()}`), - ignoreErrorCodesMatching: '.*', }, - policy: cr.AwsCustomResourcePolicy.fromStatements([ - new iam.PolicyStatement({ - actions: ['ssm:SendCommand'], - resources: ['*'], - }), - ]), - }); - - // Ensure instances are created before restarting - restartWorker.node.addDependency(...instances) - } + physicalResourceId: cr.PhysicalResourceId.of(`${name}-${Date.now()}`), + ignoreErrorCodesMatching: '.*', + }, + policy: cr.AwsCustomResourcePolicy.fromStatements([ + new iam.PolicyStatement({ + actions: ['ssm:SendCommand'], + resources: ['*'], + }), + ]), + }); + + // Ensure instances are created before restarting + cmd.node.addDependency(...instances) } diff --git a/benchmarks/cdk/lib/trino.ts b/benchmarks/cdk/lib/trino.ts new file mode 100644 index 00000000..20dcf758 --- /dev/null +++ b/benchmarks/cdk/lib/trino.ts @@ -0,0 +1,98 @@ +export function trinoUserDataCommands(instanceIndex: number): string[] { + const isCoordinator = instanceIndex === 0; + + return [ + // Install Java 22 for Trino (Trino 461 requires Java 22+) + 'yum install -y java-22-amazon-corretto-headless python', + + // Download and install Trino + 'cd /opt', + 'curl -L -o trino-server.tar.gz https://repo1.maven.org/maven2/io/trino/trino-server/461/trino-server-461.tar.gz', + 'tar -xzf trino-server.tar.gz', + 'mv trino-server-461 trino-server', + 'rm trino-server.tar.gz', + + // Create Trino directories + 'mkdir -p /var/trino/data', + 'mkdir -p /opt/trino-server/etc/catalog', + + // Configure Trino node properties + `cat > /opt/trino-server/etc/node.properties << 'TRINO_EOF' +node.environment=benchmark +node.id=instance-${instanceIndex} +node.data-dir=/var/trino/data +TRINO_EOF`, + + // Configure Trino JVM settings (minimal - using conservative 8GB heap) + `cat > /opt/trino-server/etc/jvm.config << 'TRINO_EOF' +-server +-Xmx8G +-XX:+UseG1GC +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+HeapDumpOnOutOfMemoryError +-XX:+ExitOnOutOfMemoryError +-Djdk.attach.allowAttachSelf=true +TRINO_EOF`, + + // Configure Trino config.properties (workers will be reconfigured during lazy startup) + isCoordinator + ? `cat > /opt/trino-server/etc/config.properties << 'TRINO_EOF' +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +discovery.uri=http://localhost:8080 +TRINO_EOF` + : `cat > /opt/trino-server/etc/config.properties << 'TRINO_EOF' +coordinator=false +http-server.http.port=8080 +discovery.uri=http://localhost:8080 +TRINO_EOF`, + + // Configure Hive catalog for S3 Parquet files + `cat > /opt/trino-server/etc/catalog/hive.properties << 'TRINO_EOF' +connector.name=hive +hive.metastore=file +hive.metastore.catalog.dir=/var/trino/metastore +TRINO_EOF`, + + // Download Trino CLI + 'curl -L -o /usr/local/bin/trino https://repo1.maven.org/maven2/io/trino/trino-cli/461/trino-cli-461-executable.jar', + 'chmod +x /usr/local/bin/trino', + + // Create Trino systemd service + `cat > /etc/systemd/system/trino.service << 'TRINO_EOF' +[Unit] +Description=Trino Server +After=network.target + +[Service] +Type=forking +ExecStart=/opt/trino-server/bin/launcher start +ExecStop=/opt/trino-server/bin/launcher stop +Restart=on-failure +User=root +WorkingDirectory=/opt/trino-server + +[Install] +WantedBy=multi-user.target +TRINO_EOF`, + + // Enable Trino (but don't start yet - will be started lazily after all instances are up) + 'systemctl daemon-reload', + 'systemctl enable trino', + 'systemctl start trino' + ]; +} + +export function trinoAfterDeployCommands(region: string) { + return [ + `COORDINATOR_IP=$(aws ec2 describe-instances --region ${region} --filters "Name=tag:Name,Values=instance-0" "Name=instance-state-name,Values=running" --query "Reservations[0].Instances[0].PrivateIpAddress" --output text) +cat > /opt/trino-server/etc/config.properties << TRINO_EOF +coordinator=false +http-server.http.port=8080 +discovery.uri=http://\${COORDINATOR_IP}:8080 +TRINO_EOF`, + 'systemctl restart trino', + ] +} \ No newline at end of file From a85131f541f987f43e86f137cb6a55ea37527e24 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Mon, 24 Nov 2025 23:13:49 +0100 Subject: [PATCH 2/8] Fix trino deployments --- benchmarks/cdk/lib/cdk-stack.ts | 21 ++++++++++++++++++++- benchmarks/cdk/lib/trino.ts | 29 +++++++++++++++++++---------- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/benchmarks/cdk/lib/cdk-stack.ts b/benchmarks/cdk/lib/cdk-stack.ts index abd61537..82f7848b 100644 --- a/benchmarks/cdk/lib/cdk-stack.ts +++ b/benchmarks/cdk/lib/cdk-stack.ts @@ -84,6 +84,25 @@ export class CdkStack extends Stack { resources: ['*'], })); + // Grant Glue permissions for Trino Hive metastore + role.addToPolicy(new iam.PolicyStatement({ + actions: [ + 'glue:GetDatabase', + 'glue:GetDatabases', + 'glue:GetTable', + 'glue:GetTables', + 'glue:GetPartition', + 'glue:GetPartitions', + 'glue:CreateTable', + 'glue:UpdateTable', + 'glue:DeleteTable', + 'glue:CreateDatabase', + 'glue:UpdateDatabase', + 'glue:DeleteDatabase', + ], + resources: ['*'], + })); + // Grant read access to the bucket and worker binary bucket.grantRead(role); workerBinary.grantRead(role); @@ -124,7 +143,7 @@ EOF`, 'systemctl daemon-reload', 'systemctl enable worker', 'systemctl start worker', - ...trinoUserDataCommands(i) + ...trinoUserDataCommands(i, this.region) ); const instance = new ec2.Instance(this, `BenchmarkInstance${i}`, { diff --git a/benchmarks/cdk/lib/trino.ts b/benchmarks/cdk/lib/trino.ts index 20dcf758..70bd2389 100644 --- a/benchmarks/cdk/lib/trino.ts +++ b/benchmarks/cdk/lib/trino.ts @@ -1,15 +1,17 @@ -export function trinoUserDataCommands(instanceIndex: number): string[] { +const TRINO_VERSION = 476 + +export function trinoUserDataCommands(instanceIndex: number, region: string): string[] { const isCoordinator = instanceIndex === 0; return [ - // Install Java 22 for Trino (Trino 461 requires Java 22+) - 'yum install -y java-22-amazon-corretto-headless python', + // Install Java 24 for Trino (Trino 478 requires Java 24+) + 'yum install -y java-24-amazon-corretto-headless python', - // Download and install Trino + // Download and install Trino 478 (latest version) 'cd /opt', - 'curl -L -o trino-server.tar.gz https://repo1.maven.org/maven2/io/trino/trino-server/461/trino-server-461.tar.gz', + `curl -L -o trino-server.tar.gz https://repo1.maven.org/maven2/io/trino/trino-server/${TRINO_VERSION}/trino-server-${TRINO_VERSION}.tar.gz`, 'tar -xzf trino-server.tar.gz', - 'mv trino-server-461 trino-server', + `mv trino-server-${TRINO_VERSION} trino-server`, 'rm trino-server.tar.gz', // Create Trino directories @@ -49,15 +51,22 @@ http-server.http.port=8080 discovery.uri=http://localhost:8080 TRINO_EOF`, - // Configure Hive catalog for S3 Parquet files + // Configure Hive catalog with AWS Glue metastore `cat > /opt/trino-server/etc/catalog/hive.properties << 'TRINO_EOF' connector.name=hive -hive.metastore=file -hive.metastore.catalog.dir=/var/trino/metastore +hive.metastore=glue +hive.metastore.glue.region=${region} +fs.native-s3.enabled=true +s3.region=${region} +TRINO_EOF`, + + // Configure TPCH catalog for reference + `cat > /opt/trino-server/etc/catalog/tpch.properties << 'TRINO_EOF' +connector.name=tpch TRINO_EOF`, // Download Trino CLI - 'curl -L -o /usr/local/bin/trino https://repo1.maven.org/maven2/io/trino/trino-cli/461/trino-cli-461-executable.jar', + 'curl -L -o /usr/local/bin/trino https://repo1.maven.org/maven2/io/trino/trino-cli/478/trino-cli-478-executable.jar', 'chmod +x /usr/local/bin/trino', // Create Trino systemd service From c441dd804ac6df5e7bcf5ea63d0ddc04af74449c Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Tue, 25 Nov 2025 09:41:31 +0100 Subject: [PATCH 3/8] Fix trino commands --- benchmarks/cdk/lib/cdk-stack.ts | 11 ++++------- benchmarks/cdk/lib/trino.ts | 9 +++++---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/benchmarks/cdk/lib/cdk-stack.ts b/benchmarks/cdk/lib/cdk-stack.ts index 82f7848b..7f29da07 100644 --- a/benchmarks/cdk/lib/cdk-stack.ts +++ b/benchmarks/cdk/lib/cdk-stack.ts @@ -7,7 +7,7 @@ import * as cr from 'aws-cdk-lib/custom-resources'; import { Construct } from 'constructs'; import * as path from 'path'; import { execSync } from 'child_process'; -import { trinoAfterDeployCommands, trinoUserDataCommands } from "./trino"; +import { trinoWorkerCommands, trinoUserDataCommands } from "./trino"; const ROOT = path.join(__dirname, '../../..') @@ -188,13 +188,10 @@ sudo journalctl -u worker.service -f -o cat 'systemctl restart worker', ]) - // Start coordinator first - sendCommandsUnconditionally(this, 'RestartTrinoCoordinator', [instances[0]], [ - 'systemctl start trino', - ]) - // Then start workers (they will discover the coordinator) - sendCommandsUnconditionally(this, 'RestartTrinoWorkers', instances.slice(1), trinoAfterDeployCommands(this.region)) + const [coordinator, ...workers] = instances + sendCommandsUnconditionally(this, 'TrinoCoordinatorCommands', [coordinator], ['systemctl start trino']) + sendCommandsUnconditionally(this, 'TrinoWorkerCommands', workers, trinoWorkerCommands(coordinator)) } } diff --git a/benchmarks/cdk/lib/trino.ts b/benchmarks/cdk/lib/trino.ts index 70bd2389..ee97a45e 100644 --- a/benchmarks/cdk/lib/trino.ts +++ b/benchmarks/cdk/lib/trino.ts @@ -1,3 +1,5 @@ +import * as ec2 from 'aws-cdk-lib/aws-ec2'; + const TRINO_VERSION = 476 export function trinoUserDataCommands(instanceIndex: number, region: string): string[] { @@ -94,13 +96,12 @@ TRINO_EOF`, ]; } -export function trinoAfterDeployCommands(region: string) { +export function trinoWorkerCommands(coordinator: ec2.Instance) { return [ - `COORDINATOR_IP=$(aws ec2 describe-instances --region ${region} --filters "Name=tag:Name,Values=instance-0" "Name=instance-state-name,Values=running" --query "Reservations[0].Instances[0].PrivateIpAddress" --output text) -cat > /opt/trino-server/etc/config.properties << TRINO_EOF + `cat > /opt/trino-server/etc/config.properties << TRINO_EOF coordinator=false http-server.http.port=8080 -discovery.uri=http://\${COORDINATOR_IP}:8080 +discovery.uri=http://${coordinator.instancePrivateIp}:8080 TRINO_EOF`, 'systemctl restart trino', ] From fed143ea166690b60278a9f3e7950ad2507810e8 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Tue, 25 Nov 2025 11:07:57 +0100 Subject: [PATCH 4/8] Add Trino benchmark scripts --- benchmarks/cdk/bin/@bench-common.ts | 132 +++++++++++++ benchmarks/cdk/bin/datafusion-bench.ts | 260 +++++++++---------------- benchmarks/cdk/bin/trino-bench.ts | 242 +++++++++++++++++++++++ benchmarks/cdk/package.json | 3 +- 4 files changed, 468 insertions(+), 169 deletions(-) create mode 100644 benchmarks/cdk/bin/@bench-common.ts create mode 100644 benchmarks/cdk/bin/trino-bench.ts diff --git a/benchmarks/cdk/bin/@bench-common.ts b/benchmarks/cdk/bin/@bench-common.ts new file mode 100644 index 00000000..2980efac --- /dev/null +++ b/benchmarks/cdk/bin/@bench-common.ts @@ -0,0 +1,132 @@ +import path from "path"; +import fs from "fs/promises"; +import { z } from 'zod'; + +export const ROOT = path.join(__dirname, '../../..') + +// Simple data structures +export type QueryResult = { + query: string; + iterations: { elapsed: number; row_count: number }[]; +} + +export type BenchmarkResults = { + queries: QueryResult[]; +} + +export const BenchmarkResults = z.object({ + queries: z.array(z.object({ + query: z.string(), + iterations: z.array(z.object({ + elapsed: z.number(), + row_count: z.number() + })) + })) +}) + +export const IDS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22] + +export async function writeJson(results: BenchmarkResults, outputPath?: string) { + if (!outputPath) return; + await fs.mkdir(path.dirname(outputPath), { recursive: true }); + await fs.writeFile(outputPath, JSON.stringify(results, null, 2)); +} + +export async function compareWithPrevious(results: BenchmarkResults, outputPath: string) { + let prevResults: BenchmarkResults; + try { + const prevContent = await fs.readFile(outputPath, 'utf-8'); + prevResults = BenchmarkResults.parse(JSON.parse(prevContent)); + } catch { + return; // No previous results to compare + } + + console.log('\n==== Comparison with previous run ===='); + + for (const query of results.queries) { + const prevQuery = prevResults.queries.find(q => q.query === query.query); + if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) { + continue; + } + + const avgPrev = Math.round( + prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length + ); + const avg = Math.round( + query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length + ); + + const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev; + const tag = avg < avgPrev ? "faster" : "slower"; + const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖"); + + console.log( + `${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}` + ); + } +} + +export interface BenchmarkRunner { + createTables(sf: number): Promise; + + executeQuery(query: string): Promise<{ rowCount: number }>; +} + +export async function runBenchmark( + runner: BenchmarkRunner, + options: { + sf: number; + iterations: number; + specificQuery?: number; + outputPath: string; + } +) { + const { sf, iterations, specificQuery, outputPath } = options; + + const results: BenchmarkResults = { queries: [] }; + const queriesPath = path.join(ROOT, "testdata", "tpch", "queries") + + console.log("Creating tables..."); + await runner.createTables(sf); + + for (let id of IDS) { + if (specificQuery && specificQuery !== id) { + continue; + } + + const queryId = `q${id}`; + const filePath = path.join(queriesPath, `${queryId}.sql`) + const queryToExecute = await fs.readFile(filePath, 'utf-8') + + const queryResult: QueryResult = { + query: queryId, + iterations: [] + }; + + for (let i = 0; i < iterations; i++) { + const start = new Date() + const response = await runner.executeQuery(queryToExecute); + const elapsed = Math.round(new Date().getTime() - start.getTime()) + + queryResult.iterations.push({ + elapsed, + row_count: response.rowCount + }); + + console.log( + `Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.rowCount} rows` + ); + } + + const avg = Math.round( + queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length + ); + console.log(`Query ${id} avg time: ${avg} ms`); + + results.queries.push(queryResult); + } + + // Write results and compare + await compareWithPrevious(results, outputPath); + await writeJson(results, outputPath); +} diff --git a/benchmarks/cdk/bin/datafusion-bench.ts b/benchmarks/cdk/bin/datafusion-bench.ts index 0088ff21..19a34c9b 100644 --- a/benchmarks/cdk/bin/datafusion-bench.ts +++ b/benchmarks/cdk/bin/datafusion-bench.ts @@ -1,190 +1,114 @@ import path from "path"; -import fs from "fs/promises"; -import { Command } from "commander"; -import { z } from 'zod'; - -const ROOT = path.join(__dirname, '../../..') +import {Command} from "commander"; +import {z} from 'zod'; +import {BenchmarkRunner, ROOT, runBenchmark} from "./@bench-common"; // Remember to port-forward a worker with // aws ssm start-session --target {host-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000" -async function main () { - const program = new Command(); - - program - .option('--sf ', 'Scale factor', '1') - .option('-i, --iterations ', 'Number of iterations', '3') - .option('--files-per-task ', 'Files per task', '4') - .option('--cardinality-task-sf ', 'Cardinality task scale factor', '2') - .option('--query ', 'A specific query to run', undefined) - .parse(process.argv); - - const options = program.opts(); - - const sf = parseInt(options.sf); - const iterations = parseInt(options.iterations); - const filesPerTask = parseInt(options.filesPerTask); - const cardinalityTaskSf = parseInt(options.cardinalityTaskSf); - - // Compare with previous results first - const results: BenchmarkResults = { queries: [] }; - const queriesPath = path.join(ROOT, "testdata", "tpch", "queries") - - console.log("Creating tables...") - await query(createTablesSql(sf)) - await query(` - SET distributed.files_per_task=${filesPerTask}; - SET distributed.cardinality_task_count_factor=${cardinalityTaskSf} - `) - - for (let id of IDS) { - if (options.query && parseInt(options.query) !== id) { - continue - } - const queryId = `q${id}`; - const filePath = path.join(queriesPath, `${queryId}.sql`) - const content = await fs.readFile(filePath, 'utf-8') - - const queryResult: QueryResult = { - query: queryId, - iterations: [] - }; +async function main() { + const program = new Command(); + + program + .option('--sf ', 'Scale factor', '1') + .option('-i, --iterations ', 'Number of iterations', '3') + .option('--files-per-task ', 'Files per task', '4') + .option('--cardinality-task-sf ', 'Cardinality task scale factor', '2') + .option('--shuffle-batch-size ', 'Shuffle batch coalescing size (number of rows)', '8192') + .option('--query ', 'A specific query to run', undefined) + .parse(process.argv); + + const options = program.opts(); + + const sf = parseInt(options.sf); + const iterations = parseInt(options.iterations); + const filesPerTask = parseInt(options.filesPerTask); + const cardinalityTaskSf = parseInt(options.cardinalityTaskSf); + const shuffleBatchSize = parseInt(options.shuffleBatchSize); + const specificQuery = options.query ? parseInt(options.query) : undefined; + + const runner = new DataFusionRunner({ + filesPerTask, + cardinalityTaskSf, + shuffleBatchSize, + }); + + const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json"); + + await runBenchmark(runner, { + sf, + iterations, + specificQuery, + outputPath, + }); +} - for (let i = 0; i < iterations; i++) { - const start = new Date() - const response = await query(content) - const elapsed = Math.round(new Date().getTime() - start.getTime()) +const QueryResponse = z.object({ + count: z.number(), + plan: z.string() +}) +type QueryResponse = z.infer - queryResult.iterations.push({ - elapsed, - row_count: response.count - }); +class DataFusionRunner implements BenchmarkRunner { + private url = 'http://localhost:9000'; - console.log( - `Query ${id} iteration ${i} took ${elapsed} ms and returned ${response.count} rows` - ); + constructor(private readonly options: { + filesPerTask: number; + cardinalityTaskSf: number; + shuffleBatchSize: number; + }) { } - const avg = Math.round( - queryResult.iterations.reduce((a, b) => a + b.elapsed, 0) / queryResult.iterations.length - ); - console.log(`Query ${id} avg time: ${avg} ms`); - - results.queries.push(queryResult); - } + async executeQuery(sql: string): Promise<{ rowCount: number }> { + const response = await this.query(sql); + return {rowCount: response.count}; + } - // Write results and compare - const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json"); - await compareWithPrevious(results, outputPath); - await writeJson(results, outputPath); -} + private async query(sql: string): Promise { + const url = new URL(this.url); + url.searchParams.set('sql', sql); -// Simple data structures -type QueryResult = { - query: string; - iterations: { elapsed: number; row_count: number }[]; -} + const response = await fetch(url.toString()); -type BenchmarkResults = { - queries: QueryResult[]; -} + if (!response.ok) { + const msg = await response.text(); + throw new Error(`Query failed: ${response.status} ${msg}`); + } -const BenchmarkResults = z.object({ - queries: z.array(z.object({ - query: z.string(), - iterations: z.array(z.object({ - elapsed: z.number(), - row_count: z.number() - })) - })) -}) - -async function writeJson (results: BenchmarkResults, outputPath?: string) { - if (!outputPath) return; - await fs.writeFile(outputPath, JSON.stringify(results, null, 2)); -} - -async function compareWithPrevious (results: BenchmarkResults, outputPath: string) { - let prevResults: BenchmarkResults; - try { - const prevContent = await fs.readFile(outputPath, 'utf-8'); - prevResults = BenchmarkResults.parse(JSON.parse(prevContent)); - } catch { - return; // No previous results to compare - } - - console.log('\n==== Comparison with previous run ===='); - - for (const query of results.queries) { - const prevQuery = prevResults.queries.find(q => q.query === query.query); - if (!prevQuery || prevQuery.iterations.length === 0 || query.iterations.length === 0) { - continue; + const unparsed = await response.json(); + return QueryResponse.parse(unparsed); } - const avgPrev = Math.round( - prevQuery.iterations.reduce((sum, i) => sum + i.elapsed, 0) / prevQuery.iterations.length - ); - const avg = Math.round( - query.iterations.reduce((sum, i) => sum + i.elapsed, 0) / query.iterations.length - ); - - const factor = avg < avgPrev ? avgPrev / avg : avg / avgPrev; - const tag = avg < avgPrev ? "faster" : "slower"; - const emoji = factor > 1.2 ? (avg < avgPrev ? "✅" : "❌") : (avg < avgPrev ? "✔" : "✖"); - - console.log( - `${query.query.padStart(8)}: prev=${avgPrev.toString().padStart(4)} ms, new=${avg.toString().padStart(4)} ms, ${factor.toFixed(2)}x ${tag} ${emoji}` - ); - } -} - - -function createTablesSql (sf: number): string { - let stmt = '' - for (const tbl of [ - "lineitem", - "orders", - "part", - "partsupp", - "customer", - "nation", - "region", - "supplier", - ]) { - // language=SQL format=false - stmt += ` + async createTables(sf: number): Promise { + let stmt = ''; + for (const tbl of [ + "lineitem", + "orders", + "part", + "partsupp", + "customer", + "nation", + "region", + "supplier", + ]) { + // language=SQL format=false + stmt += ` DROP TABLE IF EXISTS ${tbl}; CREATE EXTERNAL TABLE IF NOT EXISTS ${tbl} STORED AS PARQUET LOCATION 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/${tbl}/'; - ` - } - return stmt -} - -const IDS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22] - -const QueryResponse = z.object({ - count: z.number(), - plan: z.string() -}) -type QueryResponse = z.infer - -async function query (sql: string): Promise { - const url = new URL('http://localhost:9000') - url.searchParams.set('sql', sql) - - const response = await fetch(url.toString()) - - if (!response.ok) { - const msg = await response.text() - throw new Error(`Query failed: ${response.status} ${msg}`) - } + `; + } + await this.query(stmt); + await this.query(` + SET distributed.files_per_task=${this.options.filesPerTask}; + SET distributed.cardinality_task_count_factor=${this.options.cardinalityTaskSf}; + SET distributed.shuffle_batch_size=${this.options.shuffleBatchSize} + `); + } - const unparsed = await response.json() - return QueryResponse.parse(unparsed) } main() - .catch(err => { - console.error(err) - process.exit(1) - }) + .catch(err => { + console.error(err) + process.exit(1) + }) diff --git a/benchmarks/cdk/bin/trino-bench.ts b/benchmarks/cdk/bin/trino-bench.ts new file mode 100644 index 00000000..23a1d82c --- /dev/null +++ b/benchmarks/cdk/bin/trino-bench.ts @@ -0,0 +1,242 @@ +import path from "path"; +import { Command } from "commander"; +import { ROOT, runBenchmark, BenchmarkRunner } from "./@bench-common"; + +// Remember to port-forward Trino coordinator with +// aws ssm start-session --target {instance-0-id} --document-name AWS-StartPortForwardingSession --parameters "portNumber=8080,localPortNumber=8080" + +async function main() { + const program = new Command(); + + program + .option('--sf ', 'Scale factor', '1') + .option('-i, --iterations ', 'Number of iterations', '3') + .option('--query ', 'A specific query to run', undefined) + .parse(process.argv); + + const options = program.opts(); + + const sf = parseInt(options.sf); + const iterations = parseInt(options.iterations); + const specificQuery = options.query ? parseInt(options.query) : undefined; + + const runner = new TrinoRunner({ sf }); + const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json"); + + await runBenchmark(runner, { + sf, + iterations, + specificQuery, + outputPath, + }); +} + +class TrinoRunner implements BenchmarkRunner { + private trinoUrl = 'http://localhost:8080'; + + constructor(private readonly options: { + sf: number + }) { + } + + + async executeQuery(sql: string): Promise<{ rowCount: number }> { + // Fix query 4: Add DATE prefix to date literals that don't have it. + sql = sql.replace(/(? s.trim()).filter(s => s.length > 0); + + if (statements.length > 1) { + // Execute all statements except the last one + for (let i = 0; i < statements.length - 1; i++) { + await this.executeSingleStatement(statements[i]); + } + // Execute the last statement and return its result + return await this.executeSingleStatement(statements[statements.length - 1]); + } + + return await this.executeSingleStatement(sql); + } + + private async executeSingleStatement(sql: string): Promise<{ rowCount: number }> { + // Submit query + const submitResponse = await fetch(`${this.trinoUrl}/v1/statement`, { + method: 'POST', + headers: { + 'X-Trino-User': 'benchmark', + 'X-Trino-Catalog': 'hive', + 'X-Trino-Schema': `tpch_sf${this.options.sf}`, + }, + body: sql.trim().replace(/;+$/, ''), + }); + + if (!submitResponse.ok) { + const msg = await submitResponse.text(); + throw new Error(`Query submission failed: ${submitResponse.status} ${msg}`); + } + + let result: any = await submitResponse.json(); + let rowCount = 0; + + // Poll for results + while (result.nextUri) { + const pollResponse = await fetch(result.nextUri); + + if (!pollResponse.ok) { + const msg = await pollResponse.text(); + throw new Error(`Query polling failed: ${pollResponse.status} ${msg}`); + } + + result = await pollResponse.json(); + + // Count rows if data is present + if (result.data) { + rowCount += result.data.length; + } + + // Check for errors + if (result.error) { + throw new Error(`Query failed: ${result.error.message}`); + } + } + + return { rowCount }; + } + + async createTables(sf: number): Promise { + const schema = `tpch_sf${sf}`; + + // Create schema first + await this.executeQuery(`CREATE SCHEMA IF NOT EXISTS hive.${schema} WITH (location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/')`); + + // Create customer table + await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.customer`); + await this.executeQuery(`CREATE TABLE hive.${schema}.customer + ( + c_custkey bigint, + c_name varchar(25), + c_address varchar(40), + c_nationkey bigint, + c_phone varchar(15), + c_acctbal decimal(15, 2), + c_mktsegment varchar(10), + c_comment varchar(117) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/customer/', format = 'PARQUET')`); + + // Create lineitem table + await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.lineitem`); + await this.executeQuery(`CREATE TABLE hive.${schema}.lineitem + ( + l_orderkey bigint, + l_partkey bigint, + l_suppkey bigint, + l_linenumber integer, + l_quantity decimal(15, 2), + l_extendedprice decimal(15, 2), + l_discount decimal(15, 2), + l_tax decimal(15, 2), + l_returnflag varchar(1), + l_linestatus varchar(1), + l_shipdate date, + l_commitdate date, + l_receiptdate date, + l_shipinstruct varchar(25), + l_shipmode varchar(10), + l_comment varchar(44) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/lineitem/', format = 'PARQUET')`); + + // Create nation table + await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.nation`); + await this.executeQuery(`CREATE TABLE hive.${schema}.nation + ( + n_nationkey bigint, + n_name varchar(25), + n_regionkey bigint, + n_comment varchar(152) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/nation/', format = 'PARQUET')`); + + // Create orders table + await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.orders`); + await this.executeQuery(`CREATE TABLE hive.${schema}.orders + ( + o_orderkey bigint, + o_custkey bigint, + o_orderstatus varchar(1), + o_totalprice decimal(15, 2), + o_orderdate date, + o_orderpriority varchar(15), + o_clerk varchar(15), + o_shippriority integer, + o_comment varchar(79) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/orders/', format = 'PARQUET')`); + + // Create part table + await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.part`); + await this.executeQuery(`CREATE TABLE hive.${schema}.part + ( + p_partkey bigint, + p_name varchar(55), + p_mfgr varchar(25), + p_brand varchar(10), + p_type varchar(25), + p_size integer, + p_container varchar(10), + p_retailprice decimal(15, 2), + p_comment varchar(23) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/part/', format = 'PARQUET')`); + + // Create partsupp table + await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.partsupp`); + await this.executeQuery(`CREATE TABLE hive.${schema}.partsupp + ( + ps_partkey bigint, + ps_suppkey bigint, + ps_availqty integer, + ps_supplycost decimal(15, 2), + ps_comment varchar(199) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/partsupp/', format = 'PARQUET')`); + + // Create region table + await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.region`); + await this.executeQuery(`CREATE TABLE hive.${schema}.region + ( + r_regionkey bigint, + r_name varchar(25), + r_comment varchar(152) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/region/', format = 'PARQUET')`); + + // Create supplier table + await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.supplier`); + await this.executeQuery(`CREATE TABLE hive.${schema}.supplier + ( + s_suppkey bigint, + s_name varchar(25), + s_address varchar(40), + s_nationkey bigint, + s_phone varchar(15), + s_acctbal decimal(15, 2), + s_comment varchar(101) + ) + WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/supplier/', format = 'PARQUET')`); + } +} + +main() + .catch(err => { + console.error(err) + process.exit(1) + }) diff --git a/benchmarks/cdk/package.json b/benchmarks/cdk/package.json index fc4fb949..3c006f89 100644 --- a/benchmarks/cdk/package.json +++ b/benchmarks/cdk/package.json @@ -10,7 +10,8 @@ "test": "jest", "cdk": "cdk", "sync-bucket": "aws s3 sync ../data s3://datafusion-distributed-benchmarks/", - "datafusion-bench": "npx ts-node bin/datafusion-bench.ts" + "datafusion-bench": "npx ts-node bin/datafusion-bench.ts", + "trino-bench": "npx ts-node bin/trino-bench.ts" }, "devDependencies": { "@types/jest": "^29.5.14", From 13c08c98963c7b6c274cd8d3d67dbf5045eefdba Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Tue, 25 Nov 2025 11:24:26 +0100 Subject: [PATCH 5/8] Remove unused shuffle-task-batch-size --- benchmarks/cdk/bin/datafusion-bench.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/benchmarks/cdk/bin/datafusion-bench.ts b/benchmarks/cdk/bin/datafusion-bench.ts index 19a34c9b..b9247ec4 100644 --- a/benchmarks/cdk/bin/datafusion-bench.ts +++ b/benchmarks/cdk/bin/datafusion-bench.ts @@ -14,7 +14,6 @@ async function main() { .option('-i, --iterations ', 'Number of iterations', '3') .option('--files-per-task ', 'Files per task', '4') .option('--cardinality-task-sf ', 'Cardinality task scale factor', '2') - .option('--shuffle-batch-size ', 'Shuffle batch coalescing size (number of rows)', '8192') .option('--query ', 'A specific query to run', undefined) .parse(process.argv); @@ -24,13 +23,11 @@ async function main() { const iterations = parseInt(options.iterations); const filesPerTask = parseInt(options.filesPerTask); const cardinalityTaskSf = parseInt(options.cardinalityTaskSf); - const shuffleBatchSize = parseInt(options.shuffleBatchSize); const specificQuery = options.query ? parseInt(options.query) : undefined; const runner = new DataFusionRunner({ filesPerTask, cardinalityTaskSf, - shuffleBatchSize, }); const outputPath = path.join(ROOT, "benchmarks", "data", `tpch_sf${sf}`, "remote-results.json"); @@ -55,7 +52,6 @@ class DataFusionRunner implements BenchmarkRunner { constructor(private readonly options: { filesPerTask: number; cardinalityTaskSf: number; - shuffleBatchSize: number; }) { } @@ -100,8 +96,7 @@ class DataFusionRunner implements BenchmarkRunner { await this.query(stmt); await this.query(` SET distributed.files_per_task=${this.options.filesPerTask}; - SET distributed.cardinality_task_count_factor=${this.options.cardinalityTaskSf}; - SET distributed.shuffle_batch_size=${this.options.shuffleBatchSize} + SET distributed.cardinality_task_count_factor=${this.options.cardinalityTaskSf} `); } From 24984f9c74901984cdfa0c9fd16482e019ae498b Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Tue, 25 Nov 2025 11:34:12 +0100 Subject: [PATCH 6/8] Fix view creation --- benchmarks/cdk/bin/datafusion-bench.ts | 12 +++++++++++- benchmarks/cdk/bin/trino-bench.ts | 21 ++++++++++----------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/benchmarks/cdk/bin/datafusion-bench.ts b/benchmarks/cdk/bin/datafusion-bench.ts index b9247ec4..d3c8779a 100644 --- a/benchmarks/cdk/bin/datafusion-bench.ts +++ b/benchmarks/cdk/bin/datafusion-bench.ts @@ -56,7 +56,17 @@ class DataFusionRunner implements BenchmarkRunner { } async executeQuery(sql: string): Promise<{ rowCount: number }> { - const response = await this.query(sql); + let response + if (sql.includes("create view")) { + // This is query 15 + let [createView, query, dropView] = sql.split(";") + await this.query(createView); + response = await this.query(query) + await this.query(dropView); + } else { + response = await this.query(sql) + } + return {rowCount: response.count}; } diff --git a/benchmarks/cdk/bin/trino-bench.ts b/benchmarks/cdk/bin/trino-bench.ts index 23a1d82c..bc4e1522 100644 --- a/benchmarks/cdk/bin/trino-bench.ts +++ b/benchmarks/cdk/bin/trino-bench.ts @@ -50,19 +50,18 @@ class TrinoRunner implements BenchmarkRunner { 'create view revenue0 as select l_suppkey as supplier_no, sum(l_extendedprice * (1 - l_discount)) as total_revenue' ); - // Handle multi-statement queries (e.g., query 15 with CREATE VIEW) - const statements = sql.split(';').map(s => s.trim()).filter(s => s.length > 0); - - if (statements.length > 1) { - // Execute all statements except the last one - for (let i = 0; i < statements.length - 1; i++) { - await this.executeSingleStatement(statements[i]); - } - // Execute the last statement and return its result - return await this.executeSingleStatement(statements[statements.length - 1]); + let response + if (sql.includes("create view")) { + // This is query 15 + let [createView, query, dropView] = sql.split(";") + await this.executeSingleStatement(createView); + response = await this.executeSingleStatement(query); + await this.executeSingleStatement(dropView); + } else { + response = await this.executeSingleStatement(sql) } - return await this.executeSingleStatement(sql); + return response } private async executeSingleStatement(sql: string): Promise<{ rowCount: number }> { From c8a5fcbd816bcfc846691248eca731327d35da14 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Tue, 25 Nov 2025 14:31:49 +0100 Subject: [PATCH 7/8] Add warmups --- benchmarks/cdk/bin/@bench-common.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/benchmarks/cdk/bin/@bench-common.ts b/benchmarks/cdk/bin/@bench-common.ts index 2980efac..2d978b60 100644 --- a/benchmarks/cdk/bin/@bench-common.ts +++ b/benchmarks/cdk/bin/@bench-common.ts @@ -103,6 +103,9 @@ export async function runBenchmark( iterations: [] }; + console.log(`Warming up query ${id}...`) + await runner.executeQuery(queryToExecute); + for (let i = 0; i < iterations; i++) { const start = new Date() const response = await runner.executeQuery(queryToExecute); From 1744711a6036494c632eedf2396b3ffe78cd19b5 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Tue, 25 Nov 2025 15:59:34 +0100 Subject: [PATCH 8/8] Use explain analyze in Trino for not pulling rows --- benchmarks/cdk/bin/trino-bench.ts | 202 ++++++++++++++++-------------- 1 file changed, 105 insertions(+), 97 deletions(-) diff --git a/benchmarks/cdk/bin/trino-bench.ts b/benchmarks/cdk/bin/trino-bench.ts index bc4e1522..d3536374 100644 --- a/benchmarks/cdk/bin/trino-bench.ts +++ b/benchmarks/cdk/bin/trino-bench.ts @@ -55,10 +55,10 @@ class TrinoRunner implements BenchmarkRunner { // This is query 15 let [createView, query, dropView] = sql.split(";") await this.executeSingleStatement(createView); - response = await this.executeSingleStatement(query); + response = await this.executeSingleStatement(`EXPLAIN ANALYZE ${query}`); // Use EXPLAIN ANALYZE for the actual query await this.executeSingleStatement(dropView); } else { - response = await this.executeSingleStatement(sql) + response = await this.executeSingleStatement(`EXPLAIN ANALYZE ${sql}`) } return response @@ -97,7 +97,15 @@ class TrinoRunner implements BenchmarkRunner { // Count rows if data is present if (result.data) { - rowCount += result.data.length; + if (typeof result.data?.[0]?.[0] === 'string') { + // Extract row count from EXPLAIN ANALYZE output + const outputMatch = result.data[0][0].match(/Output.*?(\d+)\s+rows/i); + if (outputMatch) { + rowCount = parseInt(outputMatch[1]); + } + } else { + rowCount += result.data.length; + } } // Check for errors @@ -113,123 +121,123 @@ class TrinoRunner implements BenchmarkRunner { const schema = `tpch_sf${sf}`; // Create schema first - await this.executeQuery(`CREATE SCHEMA IF NOT EXISTS hive.${schema} WITH (location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/')`); + await this.executeSingleStatement(`CREATE SCHEMA IF NOT EXISTS hive.${schema} WITH (location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/')`); // Create customer table - await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.customer`); - await this.executeQuery(`CREATE TABLE hive.${schema}.customer - ( - c_custkey bigint, - c_name varchar(25), - c_address varchar(40), - c_nationkey bigint, - c_phone varchar(15), - c_acctbal decimal(15, 2), - c_mktsegment varchar(10), - c_comment varchar(117) - ) + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.customer`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.customer + ( + c_custkey bigint, + c_name varchar(25), + c_address varchar(40), + c_nationkey bigint, + c_phone varchar(15), + c_acctbal decimal(15, 2), + c_mktsegment varchar(10), + c_comment varchar(117) + ) WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/customer/', format = 'PARQUET')`); // Create lineitem table - await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.lineitem`); - await this.executeQuery(`CREATE TABLE hive.${schema}.lineitem - ( - l_orderkey bigint, - l_partkey bigint, - l_suppkey bigint, - l_linenumber integer, - l_quantity decimal(15, 2), - l_extendedprice decimal(15, 2), - l_discount decimal(15, 2), - l_tax decimal(15, 2), - l_returnflag varchar(1), - l_linestatus varchar(1), - l_shipdate date, - l_commitdate date, - l_receiptdate date, - l_shipinstruct varchar(25), - l_shipmode varchar(10), - l_comment varchar(44) - ) + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.lineitem`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.lineitem + ( + l_orderkey bigint, + l_partkey bigint, + l_suppkey bigint, + l_linenumber integer, + l_quantity decimal(15, 2), + l_extendedprice decimal(15, 2), + l_discount decimal(15, 2), + l_tax decimal(15, 2), + l_returnflag varchar(1), + l_linestatus varchar(1), + l_shipdate date, + l_commitdate date, + l_receiptdate date, + l_shipinstruct varchar(25), + l_shipmode varchar(10), + l_comment varchar(44) + ) WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/lineitem/', format = 'PARQUET')`); // Create nation table - await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.nation`); - await this.executeQuery(`CREATE TABLE hive.${schema}.nation - ( - n_nationkey bigint, - n_name varchar(25), - n_regionkey bigint, - n_comment varchar(152) - ) + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.nation`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.nation + ( + n_nationkey bigint, + n_name varchar(25), + n_regionkey bigint, + n_comment varchar(152) + ) WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/nation/', format = 'PARQUET')`); // Create orders table - await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.orders`); - await this.executeQuery(`CREATE TABLE hive.${schema}.orders - ( - o_orderkey bigint, - o_custkey bigint, - o_orderstatus varchar(1), - o_totalprice decimal(15, 2), - o_orderdate date, - o_orderpriority varchar(15), - o_clerk varchar(15), - o_shippriority integer, - o_comment varchar(79) - ) + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.orders`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.orders + ( + o_orderkey bigint, + o_custkey bigint, + o_orderstatus varchar(1), + o_totalprice decimal(15, 2), + o_orderdate date, + o_orderpriority varchar(15), + o_clerk varchar(15), + o_shippriority integer, + o_comment varchar(79) + ) WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/orders/', format = 'PARQUET')`); // Create part table - await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.part`); - await this.executeQuery(`CREATE TABLE hive.${schema}.part - ( - p_partkey bigint, - p_name varchar(55), - p_mfgr varchar(25), - p_brand varchar(10), - p_type varchar(25), - p_size integer, - p_container varchar(10), - p_retailprice decimal(15, 2), - p_comment varchar(23) - ) + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.part`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.part + ( + p_partkey bigint, + p_name varchar(55), + p_mfgr varchar(25), + p_brand varchar(10), + p_type varchar(25), + p_size integer, + p_container varchar(10), + p_retailprice decimal(15, 2), + p_comment varchar(23) + ) WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/part/', format = 'PARQUET')`); // Create partsupp table - await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.partsupp`); - await this.executeQuery(`CREATE TABLE hive.${schema}.partsupp - ( - ps_partkey bigint, - ps_suppkey bigint, - ps_availqty integer, - ps_supplycost decimal(15, 2), - ps_comment varchar(199) - ) + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.partsupp`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.partsupp + ( + ps_partkey bigint, + ps_suppkey bigint, + ps_availqty integer, + ps_supplycost decimal(15, 2), + ps_comment varchar(199) + ) WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/partsupp/', format = 'PARQUET')`); // Create region table - await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.region`); - await this.executeQuery(`CREATE TABLE hive.${schema}.region - ( - r_regionkey bigint, - r_name varchar(25), - r_comment varchar(152) - ) + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.region`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.region + ( + r_regionkey bigint, + r_name varchar(25), + r_comment varchar(152) + ) WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/region/', format = 'PARQUET')`); // Create supplier table - await this.executeQuery(`DROP TABLE IF EXISTS hive.${schema}.supplier`); - await this.executeQuery(`CREATE TABLE hive.${schema}.supplier - ( - s_suppkey bigint, - s_name varchar(25), - s_address varchar(40), - s_nationkey bigint, - s_phone varchar(15), - s_acctbal decimal(15, 2), - s_comment varchar(101) - ) + await this.executeSingleStatement(`DROP TABLE IF EXISTS hive.${schema}.supplier`); + await this.executeSingleStatement(`CREATE TABLE hive.${schema}.supplier + ( + s_suppkey bigint, + s_name varchar(25), + s_address varchar(40), + s_nationkey bigint, + s_phone varchar(15), + s_acctbal decimal(15, 2), + s_comment varchar(101) + ) WITH (external_location = 's3://datafusion-distributed-benchmarks/tpch_sf${sf}/supplier/', format = 'PARQUET')`); } }