Skip to content

Commit 347ddfa

Browse files
jaggederestCopilot
andauthored
Add AVG aggregate function coverage, expression parser that rides on AST, RewriteEngine that produces ghost columns (#502)
* Initial AVG + expression parser AVG function implementation: Detects paired count+avg functions on the same expression Expression parser handles logic of "same expression" by assigning id through interning Adds "ghost" column using rewriting if no count is paired with avg removes "ghost" column from returned value of aggregates * rollback transactions correctly * Remove redundant `.to_owned()` Co-authored-by: Copilot <[email protected]> * PR feedback - document distinct sort, avg with casts --------- Co-authored-by: Copilot <[email protected]>
1 parent 67db28b commit 347ddfa

File tree

23 files changed

+1909
-78
lines changed

23 files changed

+1909
-78
lines changed

AGENTS.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Repository Guidelines
2+
3+
## Project Structure & Module Organization
4+
PgDog is a Rust workspace centred in `pgdog/` with the proxy under `pgdog/src` and end-to-end specs in `pgdog/tests`. Shared macros live in `pgdog-macros`, runtime extensions in `pgdog-plugin` plus `pgdog-plugin-build`, and loadable plugins under `plugins/`. Integration harnesses, Docker assets, and helper scripts sit in `integration/`, `docker/`, and `dev/`. Use `example.pgdog.toml` and `example.users.toml` as templates when adding sharded configs.
5+
6+
## Build, Test, and Development Commands
7+
Run `cargo check` for a quick type pass, and `cargo build` to compile local binaries; switch to `cargo build --release` for benchmarking or packaging. Start the development stack with `bash integration/dev-server.sh`, which provisions dependencies and runs `cargo watch` for live reloads. CI parity tests run via `cargo nextest run --test-threads=1 --no-fail-fast`. **Never invoke** `cargo test` directly—always use `cargo nextest run --test-threads=1 ...` for unit or integration suites so concurrency stays deterministic.
8+
9+
## Coding Style & Naming Conventions
10+
Follow Rust conventions: modules and functions in `snake_case`, types in `UpperCamelCase`, constants in `SCREAMING_SNAKE_CASE`. Keep modules under ~200 lines unless justified. Format with `cargo fmt` and lint using `cargo clippy --all-targets --all-features` before posting a PR.
11+
12+
## Testing Guidelines
13+
Adhere to TDD—write the failing test first, implement minimally, then refactor. Co-locate unit tests with their crates, reserving heavier scenarios for `integration/` against the prepared-transaction Postgres stack. Invoke `cargo nextest run --test-threads=1 <test>` for focused iterations; gate Kerberos coverage behind `--features gssapi`. Do **not** run `cargo test`; Nextest with a single-thread budget is the required harness.
14+
15+
## Commit & Pull Request Guidelines
16+
Use concise, imperative commit subjects (e.g., "fix gssapi credential lookup") and describe user impact plus risk areas in PRs. Link relevant issues, attach logs or test output, and include config diffs or screenshots for operational changes. Keep PR scope tight and ensure linting, formatting, and tests pass locally before requesting review.
17+
18+
## Security & Configuration Tips
19+
Capture diagnostics before code changes, prefer configuration-driven fixes, and document new setup steps in `integration/README`. Never commit secrets; rely on the provided templates and environment variables when adjusting credentials or connection strings.

Makefile

Lines changed: 0 additions & 10 deletions
This file was deleted.
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
use rust::setup::connections_sqlx;
2+
use sqlx::{Connection, Executor, PgConnection, Row};
3+
4+
#[tokio::test]
5+
async fn avg_merges_with_helper_count() -> Result<(), Box<dyn std::error::Error>> {
6+
let conns = connections_sqlx().await;
7+
let sharded = conns.get(1).cloned().unwrap();
8+
9+
// Ensure clean state on each shard.
10+
for shard in [0, 1] {
11+
let comment = format!(
12+
"/* pgdog_shard: {} */ DROP TABLE IF EXISTS avg_reduce_test",
13+
shard
14+
);
15+
sharded.execute(comment.as_str()).await.ok();
16+
}
17+
18+
for shard in [0, 1] {
19+
let comment = format!(
20+
"/* pgdog_shard: {} */ CREATE TABLE avg_reduce_test(price DOUBLE PRECISION)",
21+
shard
22+
);
23+
sharded.execute(comment.as_str()).await?;
24+
}
25+
26+
// Insert data on each shard so the query spans multiple shards.
27+
sharded
28+
.execute("/* pgdog_shard: 0 */ INSERT INTO avg_reduce_test(price) VALUES (10.0), (14.0)")
29+
.await?;
30+
sharded
31+
.execute("/* pgdog_shard: 1 */ INSERT INTO avg_reduce_test(price) VALUES (18.0), (22.0)")
32+
.await?;
33+
34+
let rows = sharded
35+
.fetch_all(
36+
"SELECT COUNT(price)::bigint AS total_count, AVG(price) AS avg_price FROM avg_reduce_test",
37+
)
38+
.await?;
39+
40+
assert_eq!(rows.len(), 1);
41+
let total_count: i64 = rows[0].get("total_count");
42+
let avg_price: f64 = rows[0].get("avg_price");
43+
44+
assert_eq!(total_count, 4);
45+
assert!(
46+
(avg_price - 16.0).abs() < 1e-9,
47+
"unexpected avg: {}",
48+
avg_price
49+
);
50+
51+
// Cleanup tables per shard.
52+
for shard in [0, 1] {
53+
let comment = format!("/* pgdog_shard: {} */ DROP TABLE avg_reduce_test", shard);
54+
sharded.execute(comment.as_str()).await.ok();
55+
}
56+
57+
Ok(())
58+
}
59+
60+
#[tokio::test]
61+
async fn avg_without_helper_should_still_merge() -> Result<(), Box<dyn std::error::Error>> {
62+
let conns = connections_sqlx().await;
63+
let sharded = conns.get(1).cloned().unwrap();
64+
65+
// Clean slate on each shard.
66+
for shard in [0, 1] {
67+
let comment = format!(
68+
"/* pgdog_shard: {} */ DROP TABLE IF EXISTS avg_rewrite_expectation",
69+
shard
70+
);
71+
sharded.execute(comment.as_str()).await.ok();
72+
}
73+
74+
for shard in [0, 1] {
75+
let comment = format!(
76+
"/* pgdog_shard: {} */ CREATE TABLE avg_rewrite_expectation(price DOUBLE PRECISION)",
77+
shard
78+
);
79+
sharded.execute(comment.as_str()).await?;
80+
}
81+
82+
sharded
83+
.execute(
84+
"/* pgdog_shard: 0 */ INSERT INTO avg_rewrite_expectation(price) VALUES (10.0), (14.0)",
85+
)
86+
.await?;
87+
sharded
88+
.execute(
89+
"/* pgdog_shard: 1 */ INSERT INTO avg_rewrite_expectation(price) VALUES (18.0), (22.0)",
90+
)
91+
.await?;
92+
93+
let rows = sharded
94+
.fetch_all("SELECT AVG(price) AS avg_price FROM avg_rewrite_expectation")
95+
.await?;
96+
97+
// Desired behavior: rows should merge to a single average across all shards.
98+
assert_eq!(
99+
rows.len(),
100+
1,
101+
"AVG without helper COUNT should merge across shards"
102+
);
103+
104+
let pgdog_avg: f64 = rows[0].get("avg_price");
105+
106+
let mut shard0 = PgConnection::connect("postgres://pgdog:[email protected]:5432/shard_0")
107+
.await
108+
.unwrap();
109+
let mut shard1 = PgConnection::connect("postgres://pgdog:[email protected]:5432/shard_1")
110+
.await
111+
.unwrap();
112+
113+
let (sum0, count0): (f64, i64) = sqlx::query_as::<_, (f64, i64)>(
114+
"SELECT COALESCE(SUM(price), 0)::float8, COUNT(*) FROM avg_rewrite_expectation",
115+
)
116+
.fetch_one(&mut shard0)
117+
.await?;
118+
let (sum1, count1): (f64, i64) = sqlx::query_as::<_, (f64, i64)>(
119+
"SELECT COALESCE(SUM(price), 0)::float8, COUNT(*) FROM avg_rewrite_expectation",
120+
)
121+
.fetch_one(&mut shard1)
122+
.await?;
123+
124+
let expected = (sum0 + sum1) / (count0 + count1) as f64;
125+
assert!(
126+
(pgdog_avg - expected).abs() < 1e-9,
127+
"PgDog AVG should match Postgres"
128+
);
129+
130+
Ok(())
131+
}
132+
133+
#[tokio::test]
134+
async fn avg_multiple_columns_should_merge() -> Result<(), Box<dyn std::error::Error>> {
135+
let conns = connections_sqlx().await;
136+
let sharded = conns.get(1).cloned().unwrap();
137+
138+
for shard in [0, 1] {
139+
let comment = format!(
140+
"/* pgdog_shard: {} */ DROP TABLE IF EXISTS avg_multi_column",
141+
shard
142+
);
143+
sharded.execute(comment.as_str()).await.ok();
144+
}
145+
146+
for shard in [0, 1] {
147+
let comment = format!(
148+
"/* pgdog_shard: {} */ CREATE TABLE avg_multi_column(price DOUBLE PRECISION, discount DOUBLE PRECISION)",
149+
shard
150+
);
151+
sharded.execute(comment.as_str()).await?;
152+
}
153+
154+
sharded
155+
.execute(
156+
"/* pgdog_shard: 0 */ INSERT INTO avg_multi_column(price, discount) VALUES (10.0, 1.0), (14.0, 3.0)",
157+
)
158+
.await?;
159+
sharded
160+
.execute(
161+
"/* pgdog_shard: 1 */ INSERT INTO avg_multi_column(price, discount) VALUES (18.0, 2.0), (22.0, 6.0)",
162+
)
163+
.await?;
164+
165+
let rows = sharded
166+
.fetch_all(
167+
"SELECT AVG(price) AS avg_price, AVG(discount) AS avg_discount FROM avg_multi_column",
168+
)
169+
.await?;
170+
171+
assert_eq!(
172+
rows.len(),
173+
1,
174+
"rewritten AVG columns should merge across shards"
175+
);
176+
177+
let avg_price: f64 = rows[0].get("avg_price");
178+
let avg_discount: f64 = rows[0].get("avg_discount");
179+
180+
let mut shard0 = PgConnection::connect("postgres://pgdog:[email protected]:5432/shard_0")
181+
.await
182+
.unwrap();
183+
let mut shard1 = PgConnection::connect("postgres://pgdog:[email protected]:5432/shard_1")
184+
.await
185+
.unwrap();
186+
187+
let (sum_price0, sum_discount0, count0): (f64, f64, i64) = sqlx::query_as(
188+
"SELECT COALESCE(SUM(price), 0)::float8, COALESCE(SUM(discount), 0)::float8, COUNT(*) FROM avg_multi_column",
189+
)
190+
.fetch_one(&mut shard0)
191+
.await?;
192+
let (sum_price1, sum_discount1, count1): (f64, f64, i64) = sqlx::query_as(
193+
"SELECT COALESCE(SUM(price), 0)::float8, COALESCE(SUM(discount), 0)::float8, COUNT(*) FROM avg_multi_column",
194+
)
195+
.fetch_one(&mut shard1)
196+
.await?;
197+
198+
let total_count = (count0 + count1) as f64;
199+
let expected_price = (sum_price0 + sum_price1) / total_count;
200+
let expected_discount = (sum_discount0 + sum_discount1) / total_count;
201+
202+
assert!(
203+
(avg_price - expected_price).abs() < 1e-9,
204+
"PgDog AVG(price) should match Postgres"
205+
);
206+
assert!(
207+
(avg_discount - expected_discount).abs() < 1e-9,
208+
"PgDog AVG(discount) should match Postgres"
209+
);
210+
211+
for shard in [0, 1] {
212+
let comment = format!("/* pgdog_shard: {} */ DROP TABLE avg_multi_column", shard);
213+
sharded.execute(comment.as_str()).await.ok();
214+
}
215+
216+
Ok(())
217+
}
218+
219+
#[tokio::test]
220+
async fn avg_prepared_statement_should_merge() -> Result<(), Box<dyn std::error::Error>> {
221+
let conns = connections_sqlx().await;
222+
let sharded = conns.get(1).cloned().unwrap();
223+
224+
for shard in [0, 1] {
225+
let comment = format!(
226+
"/* pgdog_shard: {} */ DROP TABLE IF EXISTS avg_prepared_params",
227+
shard
228+
);
229+
sharded.execute(comment.as_str()).await.ok();
230+
}
231+
232+
for shard in [0, 1] {
233+
let comment = format!(
234+
"/* pgdog_shard: {} */ CREATE TABLE avg_prepared_params(price DOUBLE PRECISION)",
235+
shard
236+
);
237+
sharded.execute(comment.as_str()).await?;
238+
}
239+
240+
sharded
241+
.execute(
242+
"/* pgdog_shard: 0 */ INSERT INTO avg_prepared_params(price) VALUES (10.0), (14.0)",
243+
)
244+
.await?;
245+
sharded
246+
.execute(
247+
"/* pgdog_shard: 1 */ INSERT INTO avg_prepared_params(price) VALUES (18.0), (22.0)",
248+
)
249+
.await?;
250+
251+
let avg_rows =
252+
sqlx::query("SELECT AVG(price) AS avg_price FROM avg_prepared_params WHERE price >= $1")
253+
.bind(10.0_f64)
254+
.fetch_all(&sharded)
255+
.await?;
256+
257+
assert_eq!(avg_rows.len(), 1);
258+
let pgdog_avg: f64 = avg_rows[0].get("avg_price");
259+
260+
let mut shard0 = PgConnection::connect("postgres://pgdog:[email protected]:5432/shard_0")
261+
.await
262+
.unwrap();
263+
let mut shard1 = PgConnection::connect("postgres://pgdog:[email protected]:5432/shard_1")
264+
.await
265+
.unwrap();
266+
267+
let (sum0, count0): (f64, i64) = sqlx::query_as(
268+
"SELECT COALESCE(SUM(price), 0)::float8, COUNT(*) FROM avg_prepared_params WHERE price >= $1",
269+
)
270+
.bind(10.0_f64)
271+
.fetch_one(&mut shard0)
272+
.await?;
273+
let (sum1, count1): (f64, i64) = sqlx::query_as(
274+
"SELECT COALESCE(SUM(price), 0)::float8, COUNT(*) FROM avg_prepared_params WHERE price >= $1",
275+
)
276+
.bind(10.0_f64)
277+
.fetch_one(&mut shard1)
278+
.await?;
279+
280+
let expected = (sum0 + sum1) / (count0 + count1) as f64;
281+
assert!(
282+
(pgdog_avg - expected).abs() < 1e-9,
283+
"Prepared AVG should match Postgres"
284+
);
285+
286+
for shard in [0, 1] {
287+
let comment = format!(
288+
"/* pgdog_shard: {} */ DROP TABLE avg_prepared_params",
289+
shard
290+
);
291+
sharded.execute(comment.as_str()).await.ok();
292+
}
293+
294+
Ok(())
295+
}

integration/rust/tests/integration/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod auth;
2+
pub mod avg;
23
pub mod ban;
34
pub mod cross_shard_disabled;
45
pub mod distinct;

0 commit comments

Comments
 (0)