Skip to content

Commit beb6c32

Browse files
committed
Fix correctness tests
1 parent c3181a6 commit beb6c32

File tree

3 files changed

+64
-36
lines changed

3 files changed

+64
-36
lines changed

Cargo.lock

Lines changed: 24 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ prost = "0.13.5"
3232
rand = "0.8.5"
3333
object_store = "0.12.3"
3434
bytes = "1.10.1"
35+
pin-project = "1.1.10"
3536

3637
# integration_tests deps
3738
insta = { version = "1.43.1", features = ["filters"], optional = true }
@@ -41,7 +42,7 @@ parquet = { version = "56.1.0", optional = true }
4142
arrow = { version = "56.1.0", optional = true }
4243
tokio-stream = { version = "0.1.17", optional = true }
4344
hyper-util = { version = "0.1.16", optional = true }
44-
pin-project = "1.1.10"
45+
pretty_assertions = { version = "1.4", optional = true }
4546

4647
[features]
4748
integration = [
@@ -52,6 +53,7 @@ integration = [
5253
"arrow",
5354
"tokio-stream",
5455
"hyper-util",
56+
"pretty_assertions"
5557
]
5658

5759
tpch = ["integration"]
@@ -65,3 +67,4 @@ parquet = "56.1.0"
6567
arrow = "56.1.0"
6668
tokio-stream = "0.1.17"
6769
hyper-util = "0.1.16"
70+
pretty_assertions = "1.4"

tests/tpch_correctness_test.rs

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,135 +18,137 @@ mod tests {
1818

1919
#[tokio::test]
2020
async fn test_tpch_1() -> Result<(), Box<dyn Error>> {
21-
test_tpch_query(1).await
21+
test_tpch_query(get_test_tpch_query(1)).await
2222
}
2323

2424
#[tokio::test]
2525
async fn test_tpch_2() -> Result<(), Box<dyn Error>> {
26-
test_tpch_query(2).await
26+
test_tpch_query(get_test_tpch_query(2)).await
2727
}
2828

2929
#[tokio::test]
3030
async fn test_tpch_3() -> Result<(), Box<dyn Error>> {
31-
test_tpch_query(3).await
31+
test_tpch_query(get_test_tpch_query(3)).await
3232
}
3333

3434
#[tokio::test]
3535
async fn test_tpch_4() -> Result<(), Box<dyn Error>> {
36-
test_tpch_query(4).await
36+
test_tpch_query(get_test_tpch_query(4)).await
3737
}
3838

3939
#[tokio::test]
4040
async fn test_tpch_5() -> Result<(), Box<dyn Error>> {
41-
test_tpch_query(5).await
41+
test_tpch_query(get_test_tpch_query(5)).await
4242
}
4343

4444
#[tokio::test]
4545
async fn test_tpch_6() -> Result<(), Box<dyn Error>> {
46-
test_tpch_query(6).await
46+
test_tpch_query(get_test_tpch_query(6)).await
4747
}
4848

4949
#[tokio::test]
5050
async fn test_tpch_7() -> Result<(), Box<dyn Error>> {
51-
test_tpch_query(7).await
51+
test_tpch_query(get_test_tpch_query(7)).await
5252
}
5353

5454
#[tokio::test]
5555
async fn test_tpch_8() -> Result<(), Box<dyn Error>> {
56-
test_tpch_query(8).await
56+
test_tpch_query(get_test_tpch_query(8)).await
5757
}
5858

5959
#[tokio::test]
6060
async fn test_tpch_9() -> Result<(), Box<dyn Error>> {
61-
test_tpch_query(9).await
61+
test_tpch_query(get_test_tpch_query(9)).await
6262
}
6363

6464
#[tokio::test]
6565
async fn test_tpch_10() -> Result<(), Box<dyn Error>> {
66-
test_tpch_query(10).await
66+
let sql = get_test_tpch_query(10);
67+
// There is a chance that this query returns non-deterministic results if two entries
68+
// happen to have the exact same revenue. With small scales, this never happens, but with
69+
// bigger scales, this is very likely to happen.
70+
// This extra ordering accounts for it.
71+
let sql = sql.replace("revenue desc", "revenue, c_acctbal desc");
72+
test_tpch_query(sql).await
6773
}
6874

6975
#[tokio::test]
7076
async fn test_tpch_11() -> Result<(), Box<dyn Error>> {
71-
test_tpch_query(11).await
77+
test_tpch_query(get_test_tpch_query(11)).await
7278
}
7379

7480
#[tokio::test]
7581
async fn test_tpch_12() -> Result<(), Box<dyn Error>> {
76-
test_tpch_query(12).await
82+
test_tpch_query(get_test_tpch_query(12)).await
7783
}
7884

7985
#[tokio::test]
8086
async fn test_tpch_13() -> Result<(), Box<dyn Error>> {
81-
test_tpch_query(13).await
87+
test_tpch_query(get_test_tpch_query(13)).await
8288
}
8389

8490
#[tokio::test]
8591
async fn test_tpch_14() -> Result<(), Box<dyn Error>> {
86-
test_tpch_query(14).await
92+
test_tpch_query(get_test_tpch_query(14)).await
8793
}
8894

8995
#[tokio::test]
9096
async fn test_tpch_15() -> Result<(), Box<dyn Error>> {
91-
test_tpch_query(15).await
97+
test_tpch_query(get_test_tpch_query(15)).await
9298
}
9399

94100
#[tokio::test]
95101
async fn test_tpch_16() -> Result<(), Box<dyn Error>> {
96-
test_tpch_query(16).await
102+
test_tpch_query(get_test_tpch_query(16)).await
97103
}
98104

99105
#[tokio::test]
100106
async fn test_tpch_17() -> Result<(), Box<dyn Error>> {
101-
test_tpch_query(17).await
107+
test_tpch_query(get_test_tpch_query(17)).await
102108
}
103109

104110
#[tokio::test]
105111
async fn test_tpch_18() -> Result<(), Box<dyn Error>> {
106-
test_tpch_query(18).await
112+
test_tpch_query(get_test_tpch_query(18)).await
107113
}
108114

109115
#[tokio::test]
110116
async fn test_tpch_19() -> Result<(), Box<dyn Error>> {
111-
test_tpch_query(19).await
117+
test_tpch_query(get_test_tpch_query(19)).await
112118
}
113119

114120
#[tokio::test]
115121
async fn test_tpch_20() -> Result<(), Box<dyn Error>> {
116-
test_tpch_query(20).await
122+
test_tpch_query(get_test_tpch_query(20)).await
117123
}
118124

119125
#[tokio::test]
120126
async fn test_tpch_21() -> Result<(), Box<dyn Error>> {
121-
test_tpch_query(21).await
127+
test_tpch_query(get_test_tpch_query(21)).await
122128
}
123129

124130
#[tokio::test]
125131
async fn test_tpch_22() -> Result<(), Box<dyn Error>> {
126-
test_tpch_query(22).await
132+
test_tpch_query(get_test_tpch_query(22)).await
127133
}
128134

129-
async fn test_tpch_query(query_id: u8) -> Result<(), Box<dyn Error>> {
135+
async fn test_tpch_query(sql: String) -> Result<(), Box<dyn Error>> {
130136
let (ctx, _guard) = start_localhost_context(4, DefaultSessionBuilder).await;
131-
let results_d = run_tpch_query(ctx, query_id).await?;
132-
let results_s = run_tpch_query(SessionContext::new(), query_id).await?;
133-
134-
assert_eq!(
135-
results_d.to_string(),
136-
results_s.to_string(),
137-
"Query {query_id} results differ between executions",
138-
);
137+
138+
let results_d = run_tpch_query(ctx, sql.clone()).await?;
139+
let results_s = run_tpch_query(SessionContext::new(), sql).await?;
140+
141+
pretty_assertions::assert_eq!(results_d.to_string(), results_s.to_string(),);
139142
Ok(())
140143
}
141144

142145
// test_non_distributed_consistency runs each TPC-H query twice - once in a distributed manner
143146
// and once in a non-distributed manner. For each query, it asserts that the results are identical.
144147
async fn run_tpch_query(
145148
ctx: SessionContext,
146-
query_id: u8,
149+
sql: String,
147150
) -> Result<impl Display, Box<dyn Error>> {
148151
let data_dir = ensure_tpch_data(TPCH_SCALE_FACTOR, TPCH_DATA_PARTS).await;
149-
let sql = get_test_tpch_query(query_id);
150152
ctx.state_ref()
151153
.write()
152154
.config_mut()
@@ -170,7 +172,7 @@ mod tests {
170172
// Query 15 has three queries in it, one creating the view, the second
171173
// executing, which we want to capture the output of, and the third
172174
// tearing down the view
173-
let stream = if query_id == 15 {
175+
let stream = if sql.starts_with("create view") {
174176
let queries: Vec<&str> = sql
175177
.split(';')
176178
.map(str::trim)

0 commit comments

Comments
 (0)