Skip to content

Commit 49f0653

Browse files
authored
Merge pull request #32 from datafusion-contrib/ntran/validate_correctness
Validate TPC-H query results using integration tests
2 parents ceff46f + ae86178 commit 49f0653

File tree

7 files changed

+1503
-16
lines changed

7 files changed

+1503
-16
lines changed

.gitignore

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,27 @@
1+
# Rust build artifacts
12
target/
2-
.idea/
3+
4+
# IDE and editor files
5+
.idea/
6+
7+
# Python virtual environments and files
8+
.venv/
9+
.python_startup.py
10+
__pycache__/
11+
*.py[cod]
12+
*$py.class
13+
*.so
14+
15+
# Log files
16+
*.log
17+
proxy.log
18+
worker*.log
19+
20+
# OS generated files
21+
.DS_Store
22+
.DS_Store?
23+
._*
24+
.Spotlight-V100
25+
.Trashes
26+
ehthumbs.db
27+
Thumbs.db

README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,9 @@ cargo build --release
138138

139139
### Running Tests
140140

141-
Run all tests:
141+
#### Basic Tests
142+
143+
Run all unit tests (fast - excludes TPC-H validation):
142144

143145
```bash
144146
cargo test
@@ -150,6 +152,20 @@ Run tests with output:
150152
cargo test -- --nocapture
151153
```
152154

155+
#### TPC-H Validation Integration Tests
156+
157+
Run comprehensive TPC-H validation tests that compare distributed DataFusion against regular DataFusion. No prerequisites needed - the tests handle everything automatically!
158+
159+
```bash
160+
# Run all TPC-H validation tests
161+
cargo test --test tpch_validation test_tpch_validation_all_queries -- --ignored --nocapture
162+
163+
# Run single query test for debugging
164+
cargo test --test tpch_validation test_tpch_validation_single_query -- --ignored --nocapture
165+
```
166+
167+
**Note:** TPC-H validation tests are annotated with #[ignore] to avoid slowing down `cargo test` during development. They're included in the CI pipeline and can be run manually when needed.
168+
153169
## Usage
154170

155171
With the code now built and ready, the next step is to set up the server and execute queries. To do that, we'll need a schema and some data—so for this example, we'll use the TPC-H schema and queries.
@@ -250,6 +266,18 @@ DFRAY_TABLES=customer:parquet:/tmp/tpch_s1/customer.parquet,nation:parquet:/tmp/
250266
DFRAY_TABLES=customer:parquet:/tmp/tpch_s1/customer.parquet,nation:parquet:/tmp/tpch_s1/nation.parquet DATAFUSION_RAY_LOG_LEVEL=trace DFRAY_WORKER_ADDRESSES=worker1/localhost:20201,worker2/localhost:20202 ./target/release/distributed-datafusion --mode proxy --port 20200
251267
```
252268

269+
**Using Views:**
270+
271+
To pre-create views that queries can reference (such as for TPC-H q15), you can use the `DFRAY_VIEWS` environment variable:
272+
273+
```bash
274+
# Example: Create a view for TPC-H q15 revenue calculation
275+
DFRAY_VIEWS="CREATE VIEW revenue0 (supplier_no, total_revenue) AS SELECT l_suppkey, sum(l_extendedprice * (1 - l_discount)) FROM lineitem WHERE l_shipdate >= date '1996-08-01' AND l_shipdate < date '1996-08-01' + interval '3' month GROUP BY l_suppkey"
276+
277+
# Use both tables and views in your cluster
278+
DFRAY_TABLES=customer:parquet:/tmp/tpch_s1/customer.parquet,lineitem:parquet:/tmp/tpch_s1/lineitem.parquet,supplier:parquet:/tmp/tpch_s1/supplier.parquet DFRAY_VIEWS="$DFRAY_VIEWS" DATAFUSION_RAY_LOG_LEVEL=trace ./target/release/distributed-datafusion --mode worker --port 20201
279+
```
280+
253281
#### Manual Client Setup
254282

255283
You can now connect a client to the proxy at `localhost:20200` to execute queries across the distributed cluster.
@@ -309,6 +337,7 @@ The system supports various configuration options through environment variables:
309337

310338
- `DATAFUSION_RAY_LOG_LEVEL`: Set logging level (default: WARN)
311339
- `DFRAY_TABLES`: Comma-separated list of tables in format `name:format:path`
340+
- `DFRAY_VIEWS`: Semicolon-separated list of CREATE VIEW SQL statements
312341

313342
## Development
314343

@@ -326,6 +355,9 @@ The system supports various configuration options through environment variables:
326355
- `launch_python_arrowflightsql_client.sh`: Launch Python query client
327356
- `build_and_push_docker.sh`: Docker build and push script
328357
- `python_tests.sh`: Python test runner
358+
- `tests/`: Integration tests
359+
- `tpch_validation.rs`: TPC-H validation integration tests
360+
- `common/mod.rs`: Shared test utilities and helper functions
329361
- `tpch/queries/`: TPC-H benchmark SQL queries
330362
- `testdata/`: Test data files
331363
- `k8s/`: Kubernetes deployment files

scripts/launch_tpch_cluster.sh

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ partsupp:parquet:${TPCH_DATA_DIR}/partsupp.parquet,\
125125
region:parquet:${TPCH_DATA_DIR}/region.parquet,\
126126
supplier:parquet:${TPCH_DATA_DIR}/supplier.parquet"
127127

128+
# Define views required for TPC-H queries (e.g., q15)
129+
export DFRAY_VIEWS="create view revenue0 (supplier_no, total_revenue) as select l_suppkey, sum(l_extendedprice * (1 - l_discount)) from lineitem where l_shipdate >= date '1996-08-01' and l_shipdate < date '1996-08-01' + interval '3' month group by l_suppkey"
130+
128131
# Array to store worker PIDs and addresses
129132
declare -a WORKER_PIDS
130133
declare -a WORKER_ADDRESSES
@@ -146,7 +149,7 @@ for ((i=0; i<NUM_WORKERS; i++)); do
146149
WORKER_NAME="worker$((i+1))"
147150
LOG_FILE="${LOG_DIR}/${WORKER_NAME}.log"
148151
echo " Starting $WORKER_NAME on port $PORT..."
149-
env DATAFUSION_RAY_LOG_LEVEL="$DATAFUSION_RAY_LOG_LEVEL" DFRAY_TABLES="$DFRAY_TABLES" ./target/release/distributed-datafusion --mode worker --port $PORT > "$LOG_FILE" 2>&1 &
152+
env DATAFUSION_RAY_LOG_LEVEL="$DATAFUSION_RAY_LOG_LEVEL" DFRAY_TABLES="$DFRAY_TABLES" DFRAY_VIEWS="$DFRAY_VIEWS" ./target/release/distributed-datafusion --mode worker --port $PORT > "$LOG_FILE" 2>&1 &
150153
WORKER_PIDS[$i]=$!
151154
WORKER_ADDRESSES[$i]="${WORKER_NAME}/localhost:${PORT}"
152155
done
@@ -162,7 +165,7 @@ WORKER_ADDRESSES_STR=$(IFS=,; echo "${WORKER_ADDRESSES[*]}")
162165
echo "Starting proxy on port 20200..."
163166
echo "Connecting to workers: $WORKER_ADDRESSES_STR"
164167
PROXY_LOG="${LOG_DIR}/proxy.log"
165-
env DATAFUSION_RAY_LOG_LEVEL="$DATAFUSION_RAY_LOG_LEVEL" DFRAY_TABLES="$DFRAY_TABLES" DFRAY_WORKER_ADDRESSES="$WORKER_ADDRESSES_STR" ./target/release/distributed-datafusion --mode proxy --port 20200 > "$PROXY_LOG" 2>&1 &
168+
env DATAFUSION_RAY_LOG_LEVEL="$DATAFUSION_RAY_LOG_LEVEL" DFRAY_TABLES="$DFRAY_TABLES" DFRAY_VIEWS="$DFRAY_VIEWS" DFRAY_WORKER_ADDRESSES="$WORKER_ADDRESSES_STR" ./target/release/distributed-datafusion --mode proxy --port 20200 > "$PROXY_LOG" 2>&1 &
166169
PROXY_PID=$!
167170

168171
echo

src/planning.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ async fn make_state() -> Result<SessionState> {
129129
.await
130130
.context("Failed to add tables from environment")?;
131131

132+
add_views_from_env(&state)
133+
.await
134+
.context("Failed to add views from environment")?;
135+
132136
Ok(state)
133137
}
134138

@@ -206,6 +210,38 @@ pub async fn add_tables_from_env(state: &mut SessionState) -> Result<()> {
206210
Ok(())
207211
}
208212

213+
pub async fn add_views_from_env(state: &SessionState) -> Result<()> {
214+
// this string contains CREATE VIEW SQL statements separated by semicolons
215+
let views_str = env::var("DFRAY_VIEWS");
216+
if views_str.is_err() {
217+
info!("No DFRAY_VIEWS environment variable set, skipping view creation");
218+
return Ok(());
219+
}
220+
221+
let ctx = SessionContext::new_with_state(state.clone());
222+
223+
for view_sql in views_str.unwrap().split(';') {
224+
let view_sql = view_sql.trim();
225+
if view_sql.is_empty() {
226+
continue;
227+
}
228+
229+
info!("creating view from env: {}", view_sql);
230+
231+
// Execute the CREATE VIEW statement
232+
match ctx.sql(view_sql).await {
233+
Ok(_) => {
234+
info!("Successfully created view: {}", view_sql);
235+
}
236+
Err(e) => {
237+
return Err(anyhow!("Failed to create view '{}': {}", view_sql, e).into());
238+
}
239+
}
240+
}
241+
242+
Ok(())
243+
}
244+
209245
pub async fn logical_planning(sql: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
210246
let options = SQLOptions::new();
211247
let plan = ctx.state().create_logical_plan(sql).await?;

0 commit comments

Comments
 (0)