Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,27 @@
# Rust build artifacts
target/
.idea/

# IDE and editor files
.idea/

# Python virtual environments and files
.venv/
.python_startup.py
__pycache__/
*.py[cod]
*$py.class
*.so

# Log files
*.log
proxy.log
worker*.log

# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ cargo build --release

### Running Tests

Run all tests:
#### Basic Tests

Run all unit tests (fast - excludes TPC-H validation):

```bash
cargo test
Expand All @@ -150,6 +152,20 @@ Run tests with output:
cargo test -- --nocapture
```

#### TPC-H Validation Integration Tests

Run comprehensive TPC-H validation tests that compare distributed DataFusion against regular DataFusion. No prerequisites needed - the tests handle everything automatically!

```bash
# Run all TPC-H validation tests
cargo test --test tpch_validation test_tpch_validation_all_queries -- --ignored --nocapture

# Run single query test for debugging
cargo test --test tpch_validation test_tpch_validation_single_query -- --ignored --nocapture
```

**Note:** TPC-H validation tests are annotated with #[ignore] to avoid slowing down `cargo test` during development. They're included in the CI pipeline and can be run manually when needed.

## Usage

With the code now built and ready, the next step is to set up the server and execute queries. To do that, we'll need a schema and some dataβ€”so for this example, we'll use the TPC-H schema and queries.
Expand Down Expand Up @@ -250,6 +266,18 @@ DFRAY_TABLES=customer:parquet:/tmp/tpch_s1/customer.parquet,nation:parquet:/tmp/
DFRAY_TABLES=customer:parquet:/tmp/tpch_s1/customer.parquet,nation:parquet:/tmp/tpch_s1/nation.parquet DATAFUSION_RAY_LOG_LEVEL=trace DFRAY_WORKER_ADDRESSES=worker1/localhost:20201,worker2/localhost:20202 ./target/release/distributed-datafusion --mode proxy --port 20200
```

**Using Views:**

To pre-create views that queries can reference (such as for TPC-H q15), you can use the `DFRAY_VIEWS` environment variable:

```bash
# Example: Create a view for TPC-H q15 revenue calculation
DFRAY_VIEWS="CREATE VIEW revenue0 (supplier_no, total_revenue) AS SELECT l_suppkey, sum(l_extendedprice * (1 - l_discount)) FROM lineitem WHERE l_shipdate >= date '1996-08-01' AND l_shipdate < date '1996-08-01' + interval '3' month GROUP BY l_suppkey"

# Use both tables and views in your cluster
DFRAY_TABLES=customer:parquet:/tmp/tpch_s1/customer.parquet,lineitem:parquet:/tmp/tpch_s1/lineitem.parquet,supplier:parquet:/tmp/tpch_s1/supplier.parquet DFRAY_VIEWS="$DFRAY_VIEWS" DATAFUSION_RAY_LOG_LEVEL=trace ./target/release/distributed-datafusion --mode worker --port 20201
```

#### Manual Client Setup

You can now connect a client to the proxy at `localhost:20200` to execute queries across the distributed cluster.
Expand Down Expand Up @@ -309,6 +337,7 @@ The system supports various configuration options through environment variables:

- `DATAFUSION_RAY_LOG_LEVEL`: Set logging level (default: WARN)
- `DFRAY_TABLES`: Comma-separated list of tables in format `name:format:path`
- `DFRAY_VIEWS`: Semicolon-separated list of CREATE VIEW SQL statements

## Development

Expand All @@ -326,6 +355,9 @@ The system supports various configuration options through environment variables:
- `launch_python_arrowflightsql_client.sh`: Launch Python query client
- `build_and_push_docker.sh`: Docker build and push script
- `python_tests.sh`: Python test runner
- `tests/`: Integration tests
- `tpch_validation.rs`: TPC-H validation integration tests
- `common/mod.rs`: Shared test utilities and helper functions
- `tpch/queries/`: TPC-H benchmark SQL queries
- `testdata/`: Test data files
- `k8s/`: Kubernetes deployment files
Expand Down
7 changes: 5 additions & 2 deletions scripts/launch_tpch_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ partsupp:parquet:${TPCH_DATA_DIR}/partsupp.parquet,\
region:parquet:${TPCH_DATA_DIR}/region.parquet,\
supplier:parquet:${TPCH_DATA_DIR}/supplier.parquet"

# Define views required for TPC-H queries (e.g., q15)
export DFRAY_VIEWS="create view revenue0 (supplier_no, total_revenue) as select l_suppkey, sum(l_extendedprice * (1 - l_discount)) from lineitem where l_shipdate >= date '1996-08-01' and l_shipdate < date '1996-08-01' + interval '3' month group by l_suppkey"

# Array to store worker PIDs and addresses
declare -a WORKER_PIDS
declare -a WORKER_ADDRESSES
Expand All @@ -146,7 +149,7 @@ for ((i=0; i<NUM_WORKERS; i++)); do
WORKER_NAME="worker$((i+1))"
LOG_FILE="${LOG_DIR}/${WORKER_NAME}.log"
echo " Starting $WORKER_NAME on port $PORT..."
env DATAFUSION_RAY_LOG_LEVEL="$DATAFUSION_RAY_LOG_LEVEL" DFRAY_TABLES="$DFRAY_TABLES" ./target/release/distributed-datafusion --mode worker --port $PORT > "$LOG_FILE" 2>&1 &
env DATAFUSION_RAY_LOG_LEVEL="$DATAFUSION_RAY_LOG_LEVEL" DFRAY_TABLES="$DFRAY_TABLES" DFRAY_VIEWS="$DFRAY_VIEWS" ./target/release/distributed-datafusion --mode worker --port $PORT > "$LOG_FILE" 2>&1 &
WORKER_PIDS[$i]=$!
WORKER_ADDRESSES[$i]="${WORKER_NAME}/localhost:${PORT}"
done
Expand All @@ -162,7 +165,7 @@ WORKER_ADDRESSES_STR=$(IFS=,; echo "${WORKER_ADDRESSES[*]}")
echo "Starting proxy on port 20200..."
echo "Connecting to workers: $WORKER_ADDRESSES_STR"
PROXY_LOG="${LOG_DIR}/proxy.log"
env DATAFUSION_RAY_LOG_LEVEL="$DATAFUSION_RAY_LOG_LEVEL" DFRAY_TABLES="$DFRAY_TABLES" DFRAY_WORKER_ADDRESSES="$WORKER_ADDRESSES_STR" ./target/release/distributed-datafusion --mode proxy --port 20200 > "$PROXY_LOG" 2>&1 &
env DATAFUSION_RAY_LOG_LEVEL="$DATAFUSION_RAY_LOG_LEVEL" DFRAY_TABLES="$DFRAY_TABLES" DFRAY_VIEWS="$DFRAY_VIEWS" DFRAY_WORKER_ADDRESSES="$WORKER_ADDRESSES_STR" ./target/release/distributed-datafusion --mode proxy --port 20200 > "$PROXY_LOG" 2>&1 &
PROXY_PID=$!

echo
Expand Down
36 changes: 36 additions & 0 deletions src/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ async fn make_state() -> Result<SessionState> {
.await
.context("Failed to add tables from environment")?;

add_views_from_env(&state)
.await
.context("Failed to add views from environment")?;

Ok(state)
}

Expand Down Expand Up @@ -206,6 +210,38 @@ pub async fn add_tables_from_env(state: &mut SessionState) -> Result<()> {
Ok(())
}

pub async fn add_views_from_env(state: &SessionState) -> Result<()> {
// this string contains CREATE VIEW SQL statements separated by semicolons
let views_str = env::var("DFRAY_VIEWS");
if views_str.is_err() {
info!("No DFRAY_VIEWS environment variable set, skipping view creation");
return Ok(());
}

let ctx = SessionContext::new_with_state(state.clone());

for view_sql in views_str.unwrap().split(';') {
let view_sql = view_sql.trim();
if view_sql.is_empty() {
continue;
}

info!("creating view from env: {}", view_sql);

// Execute the CREATE VIEW statement
match ctx.sql(view_sql).await {
Ok(_) => {
info!("Successfully created view: {}", view_sql);
}
Err(e) => {
return Err(anyhow!("Failed to create view '{}': {}", view_sql, e).into());
}
}
}

Ok(())
}

pub async fn logical_planning(sql: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
let options = SQLOptions::new();
let plan = ctx.state().create_logical_plan(sql).await?;
Expand Down
Loading
Loading