diff --git a/benchmarks/gen-tpch.sh b/benchmarks/gen-tpch.sh index 98ec9c88..91a23cea 100755 --- a/benchmarks/gen-tpch.sh +++ b/benchmarks/gen-tpch.sh @@ -2,18 +2,15 @@ set -e -SCALE_FACTOR=1 +SCALE_FACTOR=${SCALE_FACTOR:-1} +PARTITIONS=${PARTITIONS:-16} + +echo "Generating TPCH dataset with SCALE_FACTOR=${SCALE_FACTOR} and PARTITIONS=${PARTITIONS}" # https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data} CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"} - -if [ -z "$SCALE_FACTOR" ] ; then - echo "Internal error: Scale factor not specified" - exit 1 -fi - TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}" echo "Creating tpch dataset at Scale Factor ${SCALE_FACTOR} in ${TPCH_DIR}..." @@ -29,16 +26,6 @@ else docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s "${SCALE_FACTOR}" fi -# Copy expected answers into the ./data/answers directory if it does not already exist -FILE="${TPCH_DIR}/answers/q1.out" -if test -f "${FILE}"; then - echo " Expected answers exist (${FILE} exists)." -else - echo " Copying answers to ${TPCH_DIR}/answers" - mkdir -p "${TPCH_DIR}/answers" - docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/" -fi - # Create 'parquet' files from tbl FILE="${TPCH_DIR}/supplier" if test -d "${FILE}"; then @@ -46,18 +33,6 @@ if test -d "${FILE}"; then else echo " creating parquet files using benchmark binary ..." pushd "${SCRIPT_DIR}" > /dev/null - $CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet + $CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet --partitions "$PARTITIONS" popd > /dev/null fi - -# Create 'csv' files from tbl -FILE="${TPCH_DIR}/csv/supplier" -if test -d "${FILE}"; then - echo " csv files exist ($FILE exists)." -else - echo " creating csv files using benchmark binary ..." - pushd "${SCRIPT_DIR}" > /dev/null - $CARGO_COMMAND -- tpch-convert --input "${TPCH_DIR}" --output "${TPCH_DIR}/csv" --format csv - popd > /dev/null -fi - diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs index 2371cdd4..91ff8a67 100644 --- a/benchmarks/src/tpch/convert.rs +++ b/benchmarks/src/tpch/convert.rs @@ -20,10 +20,9 @@ use datafusion::logical_expr::select_expr::SelectExpr; use std::fs; use std::path::{Path, PathBuf}; -use datafusion::common::not_impl_err; - use super::TPCH_TABLES; use super::get_tbl_tpch_table_schema; +use datafusion::common::not_impl_err; use datafusion::error::Result; use datafusion::prelude::*; use parquet::basic::Compression; @@ -49,7 +48,7 @@ pub struct ConvertOpt { #[structopt(short = "c", long = "compression", default_value = "zstd")] compression: String, - /// Number of partitions to produce + /// Number of partitions to produce. By default, uses only 1 partition. #[structopt(short = "n", long = "partitions", default_value = "1")] partitions: usize, @@ -88,7 +87,9 @@ impl ConvertOpt { options }; - let config = SessionConfig::new().with_batch_size(self.batch_size); + let config = SessionConfig::new() + .with_target_partitions(self.partitions) + .with_batch_size(self.batch_size); let ctx = SessionContext::new_with_config(config); // build plan to read the TBL file @@ -104,11 +105,7 @@ impl ConvertOpt { .collect::>(); csv = csv.select(selection)?; - // optionally, repartition the file - let partitions = self.partitions; - if partitions > 1 { - csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))? - } + csv = csv.repartition(Partitioning::RoundRobinBatch(self.partitions))?; let csv = if self.sort { csv.sort_by(vec![col(key_column_name)])? } else {