Skip to content

Commit bab5bce

Browse files
authored
fix: handle SET after query inside transaction correctly with schema sharding (#684)
Example: ``` BEGIN; SELECT 1; SET LOCAL work_mem TO '128MB'; COMMIT; ```
1 parent 8ae2a75 commit bab5bce

File tree

11 files changed

+269
-105
lines changed

11 files changed

+269
-105
lines changed

integration/python/test_sqlalchemy.py

Lines changed: 220 additions & 86 deletions
Large diffs are not rendered by default.

pgdog/src/backend/server.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -372,12 +372,9 @@ impl Server {
372372
}
373373
}
374374
Err(err) => {
375-
match err {
376-
Error::ProtocolOutOfSync => {
377-
// conservatively, we do not know for sure if this is recoverable
378-
self.stats.state(State::Error);
379-
}
380-
_ => {}
375+
if let Error::ProtocolOutOfSync = err {
376+
// conservatively, we do not know for sure if this is recoverable
377+
self.stats.state(State::Error);
381378
}
382379
error!(
383380
"{:?} got: {}, extended buffer: {:?}, state: {}",

pgdog/src/frontend/client/query_engine/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,9 @@ impl QueryEngine {
239239
.await?
240240
}
241241
Command::Unlisten(channel) => self.unlisten(context, &channel.clone()).await?,
242-
Command::Set { name, value, local } => {
242+
Command::Set {
243+
name, value, local, ..
244+
} => {
243245
// FIXME: parameters set in between statements inside a transaction won't
244246
// be recorded in the client parameters.
245247
if self.backend.connected() {

pgdog/src/frontend/router/parser/command.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub enum Command {
2727
name: String,
2828
value: ParameterValue,
2929
local: bool,
30+
route: Route,
3031
},
3132
PreparedStatement(Prepare),
3233
InternalField {
@@ -61,6 +62,7 @@ impl Command {
6162
match self {
6263
Self::Query(route) => route,
6364
Self::ShardKeyRewrite(plan) => plan.route(),
65+
Self::Set { route, .. } => route,
6466
_ => &DEFAULT_ROUTE,
6567
}
6668
}

pgdog/src/frontend/router/parser/query/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,7 @@ impl QueryParser {
128128
}
129129
}
130130

131-
debug!(
132-
"query router decision: {:#?} (shard: {:#?})",
133-
command,
134-
context.shards_calculator.peek(),
135-
);
131+
debug!("query router decision: {:#?}", command);
136132

137133
self.attach_explain(&mut command);
138134

pgdog/src/frontend/router/parser/query/set.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ impl QueryParser {
2020
name: stmt.name.to_string(),
2121
value,
2222
local: stmt.is_local,
23+
route: Route::write(context.shards_calculator.shard()),
2324
});
2425
}
2526

pgdog/src/frontend/router/parser/query/test/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,9 @@ fn test_transaction() {
595595
cluster.clone()
596596
);
597597
match route {
598-
Command::Set { name, value, local } => {
598+
Command::Set {
599+
name, value, local, ..
600+
} => {
599601
assert_eq!(name, "application_name");
600602
assert_eq!(value.as_str().unwrap(), "test");
601603
assert!(!cluster.read_only());

pgdog/src/frontend/router/parser/query/test/test_schema_sharding.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use crate::frontend::router::parser::Shard;
1+
use crate::frontend::router::parser::{route::ShardSource, Shard};
2+
use crate::net::parameter::ParameterValue;
23

34
use super::setup::{QueryParserTest, *};
45

@@ -273,3 +274,19 @@ fn test_schema_sharding_priority_on_delete() {
273274

274275
assert_eq!(command.route().shard(), &Shard::Direct(0));
275276
}
277+
278+
// --- SET commands with schema sharding via search_path ---
279+
280+
#[test]
281+
fn test_set_routes_to_shard_from_search_path() {
282+
let mut test =
283+
QueryParserTest::new().with_param("search_path", ParameterValue::String("shard_0".into()));
284+
285+
let command = test.execute(vec![Query::new("SET statement_timeout TO 1000").into()]);
286+
287+
assert_eq!(command.route().shard(), &Shard::Direct(0));
288+
assert_eq!(
289+
command.route().shard_with_priority().source(),
290+
&ShardSource::SearchPath("shard_0".into())
291+
);
292+
}

pgdog/src/frontend/router/parser/query/test/test_set.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ fn test_set_comment() {
1212
.into()]);
1313

1414
assert!(
15-
matches!(command, Command::Set { name, value, local } if name == "statement_timeout" && !local && value == ParameterValue::String("1".into())),
16-
"expected Command::Set"
15+
matches!(command.clone(), Command::Set { name, value, local, route } if name == "statement_timeout" && !local && value == ParameterValue::String("1".into()) && route.shard().is_direct()),
16+
"expected Command::Set, got {:#?}",
17+
command,
1718
);
1819
}

pgdog/src/frontend/router/parser/query/test/test_transaction.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,22 @@ fn test_set_application_name_in_transaction() {
7979

8080
let command = test.execute(vec![Query::new("SET application_name TO 'test'").into()]);
8181

82-
match command {
83-
Command::Set { name, value, local } => {
82+
match command.clone() {
83+
Command::Set {
84+
name,
85+
value,
86+
local,
87+
route,
88+
} => {
8489
assert_eq!(name, "application_name");
8590
assert_eq!(value.as_str().unwrap(), "test");
8691
assert!(!local);
8792
assert!(!test.cluster().read_only());
93+
assert!(route.is_write());
94+
assert!(route.shard().is_all());
8895
}
8996
_ => panic!("expected Set, got {command:?}"),
9097
}
98+
99+
assert!(command.route().shard().is_all());
91100
}

0 commit comments

Comments
 (0)