Skip to content

Commit b076bac

Browse files
address PR comments and use dict encoding in tpcds table
1 parent 67ce84c commit b076bac

File tree

4 files changed

+187
-37
lines changed

4 files changed

+187
-37
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,4 @@
33
/benchmarks/data/
44
testdata/tpch/*
55
!testdata/tpch/queries
6-
testdata/tpch/data/
76
testdata/tpcds/data/

src/test_utils/tpcds.rs

Lines changed: 148 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
use datafusion::{
2+
arrow::{
3+
array::{Array, ArrayRef, DictionaryArray, StringArray, StringViewArray},
4+
datatypes::{DataType, Field, Schema, UInt16Type},
5+
record_batch::RecordBatch,
6+
},
27
common::{internal_datafusion_err, internal_err},
38
error::Result,
49
execution::context::SessionContext,
510
prelude::ParquetReadOptions,
611
};
12+
use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
713
use std::fs;
814
use std::path::Path;
915
use std::process::Command;
16+
use std::sync::Arc;
1017

1118
pub fn get_data_dir() -> std::path::PathBuf {
1219
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpcds/data")
@@ -74,15 +81,147 @@ pub const TPCDS_TABLES: &[&str] = &[
7481
"web_site",
7582
];
7683

84+
/// Tables that should have dictionary encoding applied for testing
85+
const DICT_ENCODING_TABLES: &[&str] = &["item", "customer", "store"];
86+
87+
/// Force dictionary encoding for specific string columns in a table for extra test coverage.
88+
fn force_dictionary_encoding_for_table(
89+
table_name: &str,
90+
batch: RecordBatch,
91+
) -> Result<RecordBatch> {
92+
let dict_columns = match table_name {
93+
"item" => vec!["i_brand", "i_category", "i_class", "i_color", "i_size"],
94+
"customer" => vec!["c_salutation"],
95+
"store" => vec!["s_state", "s_country"],
96+
_ => vec![], // No dictionary encoding for other tables
97+
};
98+
99+
if dict_columns.is_empty() {
100+
return Ok(batch);
101+
}
102+
103+
let schema = batch.schema();
104+
let mut new_fields = Vec::new();
105+
let mut new_columns = Vec::new();
106+
107+
for (i, field) in schema.fields().iter().enumerate() {
108+
let column = batch.column(i);
109+
110+
// Check if this column should be dictionary-encoded
111+
if dict_columns.contains(&field.name().as_str())
112+
&& matches!(field.data_type(), DataType::Utf8 | DataType::Utf8View)
113+
{
114+
// Convert to dictionary encoding
115+
let string_data =
116+
if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
117+
string_array.iter().collect::<Vec<_>>()
118+
} else if let Some(view_array) = column.as_any().downcast_ref::<StringViewArray>() {
119+
view_array.iter().collect::<Vec<_>>()
120+
} else {
121+
return internal_err!("Expected string array for column {}", field.name());
122+
};
123+
124+
let dict_array: DictionaryArray<UInt16Type> = string_data.into_iter().collect();
125+
let dict_field = Field::new(
126+
field.name(),
127+
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
128+
field.is_nullable(),
129+
);
130+
131+
new_fields.push(dict_field);
132+
new_columns.push(Arc::new(dict_array) as ArrayRef);
133+
} else {
134+
new_fields.push((**field).clone());
135+
new_columns.push(column.clone());
136+
}
137+
}
138+
139+
let new_schema = Arc::new(Schema::new(new_fields));
140+
RecordBatch::try_new(new_schema, new_columns).map_err(|e| internal_datafusion_err!("{}", e))
141+
}
142+
77143
pub async fn register_tpcds_table(
78144
ctx: &SessionContext,
79145
table_name: &str,
80146
data_dir: Option<&Path>,
147+
) -> Result<()> {
148+
register_tpcds_table_with_options(ctx, table_name, data_dir, false).await
149+
}
150+
151+
pub async fn register_tpcds_table_with_options(
152+
ctx: &SessionContext,
153+
table_name: &str,
154+
data_dir: Option<&Path>,
155+
dict_encode_items_table: bool,
81156
) -> Result<()> {
82157
let default_data_dir = get_data_dir();
83158
let data_path = data_dir.unwrap_or(&default_data_dir);
84159

85-
// Check if this is a single parquet file
160+
// Apply dictionary encoding if requested and materialize to disk
161+
if dict_encode_items_table && DICT_ENCODING_TABLES.contains(&table_name) {
162+
let table_dir_path = data_path.join(table_name);
163+
if table_dir_path.is_dir() {
164+
let dict_table_path = data_path.join(format!("{table_name}_dict"));
165+
166+
// Check if dictionary encoded version already exists
167+
if dict_table_path.exists() {
168+
// Use the existing dictionary encoded version
169+
ctx.register_parquet(
170+
table_name,
171+
&dict_table_path.to_string_lossy(),
172+
ParquetReadOptions::default(),
173+
)
174+
.await?;
175+
return Ok(());
176+
}
177+
178+
// Register temporarily to read the original data
179+
let temp_table_name = format!("temp_{table_name}");
180+
ctx.register_parquet(
181+
&temp_table_name,
182+
&table_dir_path.to_string_lossy(),
183+
ParquetReadOptions::default(),
184+
)
185+
.await?;
186+
187+
// Read data and apply dictionary encoding
188+
let df = ctx.table(&temp_table_name).await?;
189+
let batches = df.collect().await?;
190+
191+
let mut dict_batches = Vec::new();
192+
for batch in batches {
193+
dict_batches.push(force_dictionary_encoding_for_table(table_name, batch)?);
194+
}
195+
196+
// Write dictionary-encoded data to disk
197+
if !dict_batches.is_empty() {
198+
fs::create_dir_all(&dict_table_path)?;
199+
let dict_file_path = dict_table_path.join("data.parquet");
200+
let file = fs::File::create(&dict_file_path)?;
201+
let props = WriterProperties::builder().build();
202+
let mut writer = ArrowWriter::try_new(file, dict_batches[0].schema(), Some(props))?;
203+
204+
for batch in &dict_batches {
205+
writer.write(batch)?;
206+
}
207+
writer.close()?;
208+
209+
// Register the dictionary encoded table
210+
ctx.register_parquet(
211+
table_name,
212+
&dict_table_path.to_string_lossy(),
213+
ParquetReadOptions::default(),
214+
)
215+
.await?;
216+
}
217+
218+
// Deregister the temporary table
219+
ctx.deregister_table(&temp_table_name)?;
220+
return Ok(());
221+
}
222+
}
223+
224+
// Use normal parquet registration for all tables
86225
let table_file_path = data_path.join(format!("{table_name}.parquet"));
87226
if table_file_path.is_file() {
88227
ctx.register_parquet(
@@ -113,10 +252,17 @@ pub async fn register_tpcds_table(
113252
}
114253

115254
pub async fn register_tables(ctx: &SessionContext) -> Result<Vec<String>> {
255+
register_tables_with_options(ctx, false).await
256+
}
257+
258+
pub async fn register_tables_with_options(
259+
ctx: &SessionContext,
260+
dict_encode_items_table: bool,
261+
) -> Result<Vec<String>> {
116262
let mut registered_tables = Vec::new();
117263

118264
for &table_name in TPCDS_TABLES {
119-
register_tpcds_table(ctx, table_name, None).await?;
265+
register_tpcds_table_with_options(ctx, table_name, None, dict_encode_items_table).await?;
120266
registered_tables.push(table_name.to_string());
121267
}
122268

testdata/tpcds/generate.sh

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
set -e
44

5+
# Get the directory where this script is located
6+
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
7+
58
if [ $# -ne 1 ]; then
69
echo "Usage: $0 <scale_factor>"
710
echo "Scale factor must be greater than or equal to 0"
@@ -22,46 +25,46 @@ if ! command -v duckdb &> /dev/null; then
2225
fi
2326

2427
echo "Clearing testdata/tpcds/data directory..."
25-
rm -rf ./data
26-
mkdir data
28+
rm -rf "$SCRIPT_DIR/data"
29+
mkdir "$SCRIPT_DIR/data"
2730

2831
echo "Removing existing database file..."
29-
rm -f tpcds.duckdb
32+
rm -f "$SCRIPT_DIR/tpcds.duckdb"
3033

3134
echo "Generating TPC-DS data with scale factor $SCALE_FACTOR..."
32-
duckdb tpcds.duckdb -c "INSTALL tpcds; LOAD tpcds; CALL dsdgen(sf=$SCALE_FACTOR);"
35+
duckdb "$SCRIPT_DIR/tpcds.duckdb" -c "INSTALL tpcds; LOAD tpcds; CALL dsdgen(sf=$SCALE_FACTOR);"
3336

3437
FILE_SIZE_BYTES=16777216
3538

3639
echo "Exporting tables to parquet files..."
37-
duckdb tpcds.duckdb << EOF
38-
COPY store_sales TO 'data/store_sales' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
39-
COPY catalog_sales TO 'data/catalog_sales' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
40-
COPY web_sales TO 'data/web_sales' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
41-
COPY store_returns TO 'data/store_returns' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
42-
COPY catalog_returns TO 'data/catalog_returns' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
43-
COPY web_returns TO 'data/web_returns' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
44-
COPY inventory TO 'data/inventory' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
45-
COPY date_dim TO 'data/date_dim' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
46-
COPY time_dim TO 'data/time_dim' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
47-
COPY item TO 'data/item' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
48-
COPY customer TO 'data/customer' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
49-
COPY customer_address TO 'data/customer_address' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
50-
COPY customer_demographics TO 'data/customer_demographics' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
51-
COPY household_demographics TO 'data/household_demographics' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
52-
COPY income_band TO 'data/income_band' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
53-
COPY warehouse TO 'data/warehouse' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
54-
COPY store TO 'data/store' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
55-
COPY call_center TO 'data/call_center' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
56-
COPY web_site TO 'data/web_site' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
57-
COPY web_page TO 'data/web_page' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
58-
COPY ship_mode TO 'data/ship_mode' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
59-
COPY reason TO 'data/reason' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
60-
COPY promotion TO 'data/promotion' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
61-
COPY catalog_page TO 'data/catalog_page' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
40+
duckdb "$SCRIPT_DIR/tpcds.duckdb" << EOF
41+
COPY store_sales TO '$SCRIPT_DIR/data/store_sales' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
42+
COPY catalog_sales TO '$SCRIPT_DIR/data/catalog_sales' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
43+
COPY web_sales TO '$SCRIPT_DIR/data/web_sales' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
44+
COPY store_returns TO '$SCRIPT_DIR/data/store_returns' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
45+
COPY catalog_returns TO '$SCRIPT_DIR/data/catalog_returns' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
46+
COPY web_returns TO '$SCRIPT_DIR/data/web_returns' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
47+
COPY inventory TO '$SCRIPT_DIR/data/inventory' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
48+
COPY date_dim TO '$SCRIPT_DIR/data/date_dim' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
49+
COPY time_dim TO '$SCRIPT_DIR/data/time_dim' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
50+
COPY item TO '$SCRIPT_DIR/data/item' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
51+
COPY customer TO '$SCRIPT_DIR/data/customer' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
52+
COPY customer_address TO '$SCRIPT_DIR/data/customer_address' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
53+
COPY customer_demographics TO '$SCRIPT_DIR/data/customer_demographics' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
54+
COPY household_demographics TO '$SCRIPT_DIR/data/household_demographics' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
55+
COPY income_band TO '$SCRIPT_DIR/data/income_band' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
56+
COPY warehouse TO '$SCRIPT_DIR/data/warehouse' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
57+
COPY store TO '$SCRIPT_DIR/data/store' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
58+
COPY call_center TO '$SCRIPT_DIR/data/call_center' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
59+
COPY web_site TO '$SCRIPT_DIR/data/web_site' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
60+
COPY web_page TO '$SCRIPT_DIR/data/web_page' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
61+
COPY ship_mode TO '$SCRIPT_DIR/data/ship_mode' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
62+
COPY reason TO '$SCRIPT_DIR/data/reason' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
63+
COPY promotion TO '$SCRIPT_DIR/data/promotion' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
64+
COPY catalog_page TO '$SCRIPT_DIR/data/catalog_page' (FORMAT PARQUET, COMPRESSION ZSTD, FILE_SIZE_BYTES ${FILE_SIZE_BYTES});
6265
EOF
6366

6467
echo "Cleaning up temporary database..."
65-
rm -f tpcds.duckdb
68+
rm -f "$SCRIPT_DIR/tpcds.duckdb"
6669

6770
echo "TPC-DS data generation complete!"

tests/tpcds_test.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ mod tests {
77
use datafusion_distributed::test_utils::{
88
localhost::start_localhost_context,
99
property_based::{compare_ordering, compare_result_set},
10-
tpcds::{generate_tpcds_data, get_data_dir, get_test_tpcds_query, register_tables},
10+
tpcds::{
11+
generate_tpcds_data, get_data_dir, get_test_tpcds_query, register_tables_with_options,
12+
},
1113
};
1214

1315
use datafusion::arrow::array::RecordBatch;
@@ -29,11 +31,11 @@ mod tests {
2931
distributed_ctx.set_distributed_files_per_task(FILES_PER_TASK)?;
3032
distributed_ctx
3133
.set_distributed_cardinality_effect_task_scale_factor(CARDINALITY_TASK_COUNT_FACTOR)?;
32-
register_tables(&distributed_ctx).await?;
34+
register_tables_with_options(&distributed_ctx, true).await?;
3335

3436
// Create single node context to compare results to.
3537
let single_node_ctx = SessionContext::new();
36-
register_tables(&single_node_ctx).await?;
38+
register_tables_with_options(&single_node_ctx, true).await?;
3739

3840
Ok((distributed_ctx, single_node_ctx, worker_tasks))
3941
}
@@ -552,7 +554,7 @@ mod tests {
552554
ctx: &SessionContext,
553555
query_sql: &str,
554556
) -> (Arc<dyn ExecutionPlan>, Result<Vec<RecordBatch>>) {
555-
let df = ctx.sql(&query_sql).await.unwrap();
557+
let df = ctx.sql(query_sql).await.unwrap();
556558
let task_ctx = ctx.task_ctx();
557559
let plan = df.create_physical_plan().await.unwrap();
558560
(plan.clone(), collect(plan, task_ctx).await) // Collect execution errors, do not unwrap.

0 commit comments

Comments
 (0)