Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 5 additions & 30 deletions benchmarks/gen-tpch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@

set -e

SCALE_FACTOR=1
SCALE_FACTOR=${SCALE_FACTOR:-1}
PARTITIONS=${PARTITIONS:-16}

echo "Generating TPCH dataset with SCALE_FACTOR=${SCALE_FACTOR} and PARTITIONS=${PARTITIONS}"

# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}

if [ -z "$SCALE_FACTOR" ] ; then
echo "Internal error: Scale factor not specified"
exit 1
fi

TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"
echo "Creating tpch dataset at Scale Factor ${SCALE_FACTOR} in ${TPCH_DIR}..."

Expand All @@ -29,35 +26,13 @@ else
docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s "${SCALE_FACTOR}"
fi

# Copy expected answers into the ./data/answers directory if it does not already exist
FILE="${TPCH_DIR}/answers/q1.out"
if test -f "${FILE}"; then
echo " Expected answers exist (${FILE} exists)."
else
echo " Copying answers to ${TPCH_DIR}/answers"
mkdir -p "${TPCH_DIR}/answers"
docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/"
fi

# Create 'parquet' files from tbl
FILE="${TPCH_DIR}/supplier"
if test -d "${FILE}"; then
echo " parquet files exist ($FILE exists)."
else
echo " creating parquet files using benchmark binary ..."
pushd "${SCRIPT_DIR}" > /dev/null
$CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet
$CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet --partitions "$PARTITIONS"
popd > /dev/null
fi

# Create 'csv' files from tbl
FILE="${TPCH_DIR}/csv/supplier"
if test -d "${FILE}"; then
echo " csv files exist ($FILE exists)."
else
echo " creating csv files using benchmark binary ..."
pushd "${SCRIPT_DIR}" > /dev/null
$CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}/csv" --format csv
popd > /dev/null
fi

15 changes: 6 additions & 9 deletions benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ use datafusion::logical_expr::select_expr::SelectExpr;
use std::fs;
use std::path::{Path, PathBuf};

use datafusion::common::not_impl_err;

use super::TPCH_TABLES;
use super::get_tbl_tpch_table_schema;
use datafusion::common::not_impl_err;
use datafusion::error::Result;
use datafusion::prelude::*;
use parquet::basic::Compression;
Expand All @@ -49,7 +48,7 @@ pub struct ConvertOpt {
#[structopt(short = "c", long = "compression", default_value = "zstd")]
compression: String,

/// Number of partitions to produce
/// Number of partitions to produce. By default, uses only 1 partition.
#[structopt(short = "n", long = "partitions", default_value = "1")]
partitions: usize,

Expand Down Expand Up @@ -88,7 +87,9 @@ impl ConvertOpt {
options
};

let config = SessionConfig::new().with_batch_size(self.batch_size);
let config = SessionConfig::new()
.with_target_partitions(self.partitions)
.with_batch_size(self.batch_size);
let ctx = SessionContext::new_with_config(config);

// build plan to read the TBL file
Expand All @@ -104,11 +105,7 @@ impl ConvertOpt {
.collect::<Vec<_>>();

csv = csv.select(selection)?;
// optionally, repartition the file
let partitions = self.partitions;
if partitions > 1 {
csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))?
}
csv = csv.repartition(Partitioning::RoundRobinBatch(self.partitions))?;
let csv = if self.sort {
csv.sort_by(vec![col(key_column_name)])?
} else {
Expand Down