Skip to content

Commit 690adbd

Browse files
test_utils/tpcds
1 parent 053d3a9 commit 690adbd

File tree

2 files changed

+228
-0
lines changed

2 files changed

+228
-0
lines changed

src/test_utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ pub mod mock_exec;
66
pub mod parquet;
77
pub mod plans;
88
pub mod session_context;
9+
pub mod tpcds;
910
pub mod tpch;

src/test_utils/tpcds.rs

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
2+
use std::path::Path;
3+
use datafusion::{
4+
execution::context::SessionContext,
5+
prelude::ParquetReadOptions,
6+
error::{DataFusionError, Result},
7+
};
8+
9+
pub fn get_data_dir() -> std::path::PathBuf {
10+
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpcds/data")
11+
}
12+
13+
pub fn get_queries_dir() -> std::path::PathBuf {
14+
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/tpcds/queries")
15+
}
16+
17+
/// All TPCDS table names
18+
pub const TPCDS_TABLES: &[&str] = &[
19+
"call_center",
20+
"catalog_page",
21+
"catalog_returns",
22+
"catalog_sales",
23+
"customer",
24+
"customer_address",
25+
"customer_demographics",
26+
"date_dim",
27+
"household_demographics",
28+
"income_band",
29+
"inventory",
30+
"item",
31+
"promotion",
32+
"reason",
33+
"ship_mode",
34+
"store",
35+
"store_returns",
36+
"store_sales",
37+
"time_dim",
38+
"warehouse",
39+
"web_page",
40+
"web_returns",
41+
"web_sales",
42+
"web_site"
43+
];
44+
45+
/// Register a single TPCDS table from parquet files in the data directory
46+
///
47+
/// # Arguments
48+
/// * `ctx` - The DataFusion SessionContext to register the table with
49+
/// * `table_name` - The name of the table to register
50+
/// * `data_dir` - Optional path to the TPCDS data directory. If None, uses get_data_dir()
51+
///
52+
/// # Returns
53+
/// * `Result<()>` - Ok if successful, error if table directory is missing or registration fails
54+
pub async fn register_tpcds_table(
55+
ctx: &SessionContext,
56+
table_name: &str,
57+
data_dir: Option<&Path>,
58+
) -> Result<()> {
59+
let default_data_dir = get_data_dir();
60+
let data_path = data_dir.unwrap_or(&default_data_dir);
61+
let table_path = data_path.join(table_name);
62+
63+
if !table_path.exists() {
64+
return Err(DataFusionError::Execution(format!(
65+
"TPCDS table directory not found: {}",
66+
table_path.display()
67+
)));
68+
}
69+
70+
// Check if this is a partitioned table or single files
71+
let entries = std::fs::read_dir(&table_path)
72+
.map_err(|e| DataFusionError::Execution(format!(
73+
"Failed to read table directory {}: {}",
74+
table_path.display(), e
75+
)))?;
76+
77+
let mut has_partitions = false;
78+
let mut has_direct_files = false;
79+
80+
for entry in entries {
81+
let entry = entry.map_err(|e| DataFusionError::Execution(format!(
82+
"Failed to read directory entry: {}", e
83+
)))?;
84+
let path = entry.path();
85+
if path.is_dir() {
86+
has_partitions = true;
87+
} else if path.extension().and_then(|s| s.to_str()) == Some("parquet") {
88+
has_direct_files = true;
89+
}
90+
}
91+
92+
let table_path_str = table_path.to_string_lossy();
93+
94+
if has_partitions && !has_direct_files {
95+
// Register as partitioned table
96+
ctx.register_parquet(table_name, &table_path_str, ParquetReadOptions::default())
97+
.await
98+
} else if has_direct_files && !has_partitions {
99+
// Register single parquet files
100+
ctx.register_parquet(table_name, &table_path_str, ParquetReadOptions::default())
101+
.await
102+
} else if has_partitions && has_direct_files {
103+
return Err(DataFusionError::Execution(format!(
104+
"Mixed directory structure not supported for table: {}", table_name
105+
)));
106+
} else {
107+
return Err(DataFusionError::Execution(format!(
108+
"No parquet files found in table directory: {}", table_path.display()
109+
)));
110+
}
111+
}
112+
113+
/// Register all available TPCDS tables from the data directory
114+
///
115+
/// # Arguments
116+
/// * `ctx` - The DataFusion SessionContext to register tables with
117+
/// * `data_dir` - Optional path to the TPCDS data directory. If None, uses get_data_dir()
118+
///
119+
/// # Returns
120+
/// * `Result<Vec<String>>` - List of successfully registered table names, or error if any required table is missing
121+
pub async fn register_all_tpcds_tables(
122+
ctx: &SessionContext,
123+
data_dir: Option<&Path>,
124+
) -> Result<Vec<String>> {
125+
let mut registered_tables = Vec::new();
126+
127+
for &table_name in TPCDS_TABLES {
128+
match register_tpcds_table(ctx, table_name, data_dir).await {
129+
Ok(_) => {
130+
registered_tables.push(table_name.to_string());
131+
}
132+
Err(e) => {
133+
return Err(DataFusionError::Execution(format!(
134+
"Failed to register required TPCDS table '{}': {}",
135+
table_name, e
136+
)));
137+
}
138+
}
139+
}
140+
141+
Ok(registered_tables)
142+
}
143+
144+
/// Register only available TPCDS tables from the data directory, skipping missing ones
145+
///
146+
/// # Arguments
147+
/// * `ctx` - The DataFusion SessionContext to register tables with
148+
/// * `data_dir` - Optional path to the TPCDS data directory. If None, uses get_data_dir()
149+
///
150+
/// # Returns
151+
/// * `Result<(Vec<String>, Vec<String>)>` - Tuple of (successfully registered tables, missing tables)
152+
pub async fn register_available_tpcds_tables(
153+
ctx: &SessionContext,
154+
data_dir: Option<&Path>,
155+
) -> Result<(Vec<String>, Vec<String>)> {
156+
let mut registered_tables = Vec::new();
157+
let mut missing_tables = Vec::new();
158+
159+
for &table_name in TPCDS_TABLES {
160+
match register_tpcds_table(ctx, table_name, data_dir).await {
161+
Ok(_) => {
162+
registered_tables.push(table_name.to_string());
163+
}
164+
Err(_) => {
165+
missing_tables.push(table_name.to_string());
166+
}
167+
}
168+
}
169+
170+
Ok((registered_tables, missing_tables))
171+
}
172+
173+
#[cfg(test)]
174+
mod tests {
175+
use super::*;
176+
use datafusion::execution::context::SessionContext;
177+
178+
#[tokio::test]
179+
async fn test_register_available_tpcds_tables() {
180+
let ctx = SessionContext::new();
181+
182+
// Test with the actual data directory
183+
let (registered, missing) = register_available_tpcds_tables(&ctx, None)
184+
.await
185+
.expect("Should successfully register available tables");
186+
187+
// We should have at least some tables registered
188+
assert!(!registered.is_empty(), "Expected at least some TPCDS tables to be available");
189+
190+
// Test that we can query a registered table
191+
if registered.contains(&"date_dim".to_string()) {
192+
let result = ctx.sql("SELECT COUNT(*) as count FROM date_dim").await;
193+
assert!(result.is_ok(), "Should be able to query registered table date_dim");
194+
195+
let df = result.unwrap();
196+
let batches = df.collect().await;
197+
assert!(batches.is_ok(), "Should be able to collect results from date_dim");
198+
assert!(!batches.unwrap().is_empty(), "date_dim should have some data");
199+
}
200+
201+
println!("Registered tables: {:?}", registered);
202+
println!("Missing tables: {:?}", missing);
203+
}
204+
205+
#[tokio::test]
206+
async fn test_register_single_tpcds_table() {
207+
let ctx = SessionContext::new();
208+
209+
// Test registering a single table that should exist
210+
match register_tpcds_table(&ctx, "date_dim", None).await {
211+
Ok(_) => {
212+
// Table registered successfully, try to query it
213+
let result = ctx.sql("SELECT COUNT(*) FROM date_dim").await;
214+
assert!(result.is_ok(), "Should be able to query registered date_dim table");
215+
}
216+
Err(e) => {
217+
// If table doesn't exist, that's also acceptable for this test
218+
println!("date_dim table not available: {}", e);
219+
}
220+
}
221+
222+
// Test with a non-existent table - should return error
223+
let result = register_tpcds_table(&ctx, "nonexistent_table", None).await;
224+
assert!(result.is_err(), "Should return error for non-existent table");
225+
}
226+
}
227+

0 commit comments

Comments
 (0)