Skip to content

Commit 7add0ab

Browse files
committed
Add serialization and deserialization tests for all tpc-ch queries
1 parent 6851fb4 commit 7add0ab

11 files changed

+224
-70
lines changed

Cargo.lock

Lines changed: 79 additions & 67 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ async-trait = "0.1.88"
4444
async-stream = "0.3"
4545
bytes = "1.5"
4646
clap = { version = "4.4", features = ["derive"] }
47-
datafusion = "48.0.0"
48-
datafusion-proto = "48.0.0"
49-
datafusion-substrait = "48.0.0"
47+
datafusion = { git = "https://github.com/apache/datafusion", rev = "a45a4c44317ce32f432cbeb8647e6555c93e998f" }
48+
datafusion-proto = { git = "https://github.com/apache/datafusion", rev = "a45a4c44317ce32f432cbeb8647e6555c93e998f" }
49+
datafusion-substrait = { git = "https://github.com/apache/datafusion", rev = "a45a4c44317ce32f432cbeb8647e6555c93e998f" }
5050
env_logger = "0.11"
5151
futures = "0.3"
5252
itertools = "0.14"

tests/round_trip_physical_plan.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
use anyhow::Result;
2+
use datafusion::common::DataFusionError;
3+
use datafusion::physical_plan::ExecutionPlan;
4+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
5+
use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
6+
use datafusion_proto::protobuf::PhysicalPlanNode;
7+
use std::sync::Arc;
8+
9+
/// Helper function to create physical plan and its version after serialization and deserialization
10+
async fn physical_plan_before_after_serde(
11+
sql: &str,
12+
ctx: &SessionContext,
13+
) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>), DataFusionError> {
14+
// Build physical plan from sql
15+
let plan = ctx.sql(sql).await?.create_physical_plan().await?;
16+
17+
let codec = DefaultPhysicalExtensionCodec {};
18+
19+
// Serialize the physical plan
20+
let proto: PhysicalPlanNode =
21+
PhysicalPlanNode::try_from_physical_plan(plan.clone(), &codec).expect("to proto");
22+
23+
// Deserialize the physical plan
24+
let runtime = ctx.runtime_env();
25+
let de_plan: Arc<dyn ExecutionPlan> = proto
26+
.try_into_physical_plan(ctx, runtime.as_ref(), &codec)
27+
.expect("from proto");
28+
29+
Ok((plan, de_plan))
30+
}
31+
32+
/// Perform a serde (serialize and deserialize) roundtrip for the specified sql's physical plan, and assert that
33+
async fn assert_serde_roundtrip(sql: &str, ctx: &SessionContext) -> Result<(), DataFusionError> {
34+
let (plan, de_plan) = physical_plan_before_after_serde(sql, ctx).await?;
35+
36+
// Assert that the physical plan is identical after serialization and deserialization
37+
assert_eq!(format!("{plan:?}"), format!("{de_plan:?}"));
38+
39+
Ok(())
40+
}
41+
42+
/// Helper function to create a SessionContext with TPC-H tables registered from parquet files
43+
async fn tpch_context() -> Result<SessionContext, DataFusionError> {
44+
let ctx = SessionContext::new();
45+
46+
// TPC-H table definitions with their corresponding parquet file paths
47+
let tables = [
48+
("customer", "tpch/data/tpch_customer_small.parquet"),
49+
("lineitem", "tpch/data/tpch_lineitem_small.parquet"),
50+
("nation", "tpch/data/tpch_nation_small.parquet"),
51+
("orders", "tpch/data/tpch_orders_small.parquet"),
52+
("part", "tpch/data/tpch_part_small.parquet"),
53+
("partsupp", "tpch/data/tpch_partsupp_small.parquet"),
54+
("region", "tpch/data/tpch_region_small.parquet"),
55+
("supplier", "tpch/data/tpch_supplier_small.parquet"),
56+
];
57+
58+
// Register all TPC-H tables
59+
for (table_name, file_path) in &tables {
60+
ctx.register_parquet(*table_name, *file_path, ParquetReadOptions::default())
61+
.await?;
62+
}
63+
64+
// Create the revenue0 view required for query 15
65+
let revenue0_sql = "CREATE VIEW revenue0 (supplier_no, total_revenue) AS SELECT l_suppkey, sum(l_extendedprice * (1 - l_discount)) FROM lineitem WHERE l_shipdate >= date '1996-08-01' AND l_shipdate < date '1996-08-01' + interval '3' month GROUP BY l_suppkey";
66+
ctx.sql(revenue0_sql).await?.collect().await?;
67+
68+
Ok(ctx)
69+
}
70+
71+
/// Helper function to get TPC-H query SQL from the repository
72+
fn get_tpch_query_sql(query: usize) -> Result<Vec<String>, DataFusionError> {
73+
use std::fs;
74+
75+
if !(1..=22).contains(&query) {
76+
return Err(DataFusionError::External(
77+
format!("Invalid TPC-H query number: {query}").into(),
78+
));
79+
}
80+
81+
let filename = format!("tpch/queries/q{query}.sql");
82+
let contents = fs::read_to_string(&filename).map_err(|e| {
83+
DataFusionError::External(format!("Failed to read query file {filename}: {e}").into())
84+
})?;
85+
86+
Ok(contents
87+
.split(';')
88+
.map(|s| s.trim())
89+
.filter(|s| !s.is_empty())
90+
.map(|s| s.to_string())
91+
.collect())
92+
}
93+
94+
/// Test that we are able to build the physical plan and its version after serialization and deserialization
95+
#[tokio::test]
96+
async fn test_serialize_deserialize_tpch_queries() -> Result<()> {
97+
// Create context with TPC-H tables from parquet files
98+
let ctx = tpch_context().await?;
99+
100+
// repeat to run all 22 queries
101+
for query in 1..=22 {
102+
// run all statements in the query
103+
let sql = get_tpch_query_sql(query)?;
104+
for stmt in sql {
105+
// Skip empty statements and comment-only statements
106+
let trimmed = stmt.trim();
107+
if trimmed.is_empty() {
108+
continue;
109+
}
110+
// Skip statements that are only comments
111+
if trimmed
112+
.lines()
113+
.all(|line| line.trim().is_empty() || line.trim().starts_with("--"))
114+
{
115+
continue;
116+
}
117+
118+
// Ensure we are able to build the physical plan and its version after serialization and deserialization
119+
physical_plan_before_after_serde(&stmt, &ctx).await?;
120+
}
121+
}
122+
123+
Ok(())
124+
}
125+
126+
// Test compare the result of the physical plan before and after serialization and deserialization
127+
#[tokio::test]
128+
async fn test_round_trip_tpch_queries() -> Result<(), DataFusionError> {
129+
// Create context with TPC-H tables
130+
let ctx = tpch_context().await?;
131+
132+
// Verify q3, q5, q10, q12 pass the round trip
133+
// todo: after bug https://github.com/apache/datafusion/issues/16772 is fixed, test with all 22 queries
134+
for query in [3, 5, 10, 12] {
135+
let sql = get_tpch_query_sql(query)?;
136+
for stmt in sql {
137+
assert_serde_roundtrip(&stmt, &ctx).await?;
138+
}
139+
}
140+
141+
Ok(())
142+
}
7.55 KB
Binary file not shown.
10.8 KB
Binary file not shown.
4.26 KB
Binary file not shown.
7.05 KB
Binary file not shown.

tpch/data/tpch_part_small.parquet

4.25 KB
Binary file not shown.
5.33 KB
Binary file not shown.
2.76 KB
Binary file not shown.

0 commit comments

Comments
 (0)