Skip to content

Commit 8fe24fd

Browse files
authored
feat: make query parser optional for load balancing (#693)
fix #690
1 parent e29a840 commit 8fe24fd

File tree

25 files changed

+418
-112
lines changed

25 files changed

+418
-112
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import psycopg2
2+
import pytest
3+
from globals import admin
4+
5+
queries = [
6+
"SELECT 1",
7+
"CREATE TABLE IF NOT EXISTS test_conn_reads(id BIGINT)",
8+
"INSERT INTO test_conn_reads (id) VALUES (1)",
9+
"INSERT INTO test_conn_reads (id) VALUES (2)",
10+
"INSERT INTO test_conn_reads (id) VALUES (3)",
11+
"SELECT * FROM test_conn_reads WHERE id = 1",
12+
"SELECT * FROM test_conn_reads WHERE id = 2",
13+
"SELECT * FROM test_conn_reads WHERE id = 3",
14+
"SET work_mem TO '4MB'",
15+
"SET work_mem TO '6MB'",
16+
"SET work_mem TO '8MB'",
17+
]
18+
19+
20+
@pytest.fixture
21+
def conn_reads():
22+
return psycopg2.connect(
23+
"host=127.0.0.1 port=6432 user=pgdog password=pgdog "
24+
"options='-c pgdog.role=replica'"
25+
)
26+
27+
28+
@pytest.fixture
29+
def conn_writes():
30+
return psycopg2.connect(
31+
"host=127.0.0.1 port=6432 user=pgdog password=pgdog "
32+
"options='-c pgdog.role=primary'"
33+
)
34+
35+
36+
@pytest.fixture
37+
def conn_default():
38+
return psycopg2.connect("host=127.0.0.1 port=6432 user=pgdog password=pgdog")
39+
40+
41+
def test_conn_writes(conn_writes):
42+
admin().execute("SET query_parser TO 'off'")
43+
for query in queries:
44+
conn_writes.autocommit = True
45+
cursor = conn_writes.cursor()
46+
cursor.execute(query)
47+
admin().execute("SET query_parser TO 'auto'")
48+
49+
50+
def test_conn_reads(conn_reads, conn_writes):
51+
admin().execute("SET query_parser TO 'off'")
52+
53+
conn_writes.autocommit = True
54+
conn_reads.autocommit = True
55+
56+
conn_writes.cursor().execute(
57+
"CREATE TABLE IF NOT EXISTS test_conn_reads(id BIGINT)"
58+
)
59+
60+
read = False
61+
for query in queries:
62+
cursor = conn_reads.cursor()
63+
try:
64+
cursor.execute(query)
65+
except psycopg2.errors.ReadOnlySqlTransaction:
66+
# Some will succeed because we allow reads
67+
# on the primary.
68+
read = True
69+
admin().execute("SET query_parser TO 'auto'")
70+
71+
conn_writes.cursor().execute("DROP TABLE IF EXISTS test_conn_reads")
72+
assert read, "expected some queries to hit replicas and fail"
73+
74+
75+
def test_transactions_writes(conn_writes):
76+
admin().execute("SET query_parser TO 'off'")
77+
78+
for query in queries:
79+
conn_writes.cursor().execute(query)
80+
conn_writes.commit()
81+
82+
admin().execute("SET query_parser TO 'auto'")
83+
84+
85+
def test_transactions_reads(conn_reads):
86+
admin().execute("SET query_parser TO 'off'")
87+
read = False
88+
89+
for query in queries:
90+
try:
91+
conn_reads.cursor().execute(query)
92+
except psycopg2.errors.ReadOnlySqlTransaction:
93+
# Some will succeed because we allow reads
94+
# on the primary.
95+
read = True
96+
conn_reads.commit()
97+
98+
assert read, "expected some queries to hit replicas and fail"
99+
admin().execute("SET query_parser TO 'auto'")
100+
101+
102+
def test_transaction_reads_explicit(conn_reads, conn_writes):
103+
conn_reads.autocommit = True
104+
admin().execute("SET query_parser TO 'off'")
105+
106+
conn_writes.cursor().execute(
107+
"CREATE TABLE IF NOT EXISTS test_conn_reads(id BIGINT)"
108+
)
109+
conn_writes.commit()
110+
111+
cursor = conn_reads.cursor()
112+
113+
read = False
114+
115+
for _ in range(15):
116+
cursor.execute("BEGIN")
117+
try:
118+
cursor.execute("INSERT INTO test_conn_reads (id) VALUES (1)")
119+
cursor.execute("COMMIT")
120+
except psycopg2.errors.ReadOnlySqlTransaction:
121+
read = True
122+
cursor.execute("ROLLBACK")
123+
124+
assert read, "expected some queries to hit replicas and fail"
125+
126+
for _ in range(15):
127+
cursor.execute("BEGIN READ ONLY") # Won't be parsed, doesn't matter to PgDog
128+
cursor.execute("SELECT 1")
129+
cursor.execute("ROLLBACK")
130+
131+
admin().execute("SET query_parser TO 'on'")

pgdog-config/src/core.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tracing::{info, warn};
77
use crate::sharding::ShardedSchema;
88
use crate::{
99
EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth, PreparedStatements,
10-
ReadWriteSplit, RewriteMode, Role,
10+
QueryParserLevel, ReadWriteSplit, RewriteMode, Role,
1111
};
1212

1313
use super::database::Database;
@@ -293,7 +293,7 @@ impl Config {
293293
mappings
294294
}
295295

296-
pub fn check(&self) {
296+
pub fn check(&mut self) {
297297
// Check databases.
298298
let mut duplicate_dbs = HashSet::new();
299299
for database in self.databases.clone() {
@@ -317,7 +317,9 @@ impl Config {
317317
pooler_mode: Option<PoolerMode>,
318318
role: Role,
319319
role_warned: bool,
320+
parser_warned: bool,
320321
have_replicas: bool,
322+
sharded: bool,
321323
}
322324

323325
// Check identical configs.
@@ -341,14 +343,30 @@ impl Config {
341343
if !existing.have_replicas {
342344
existing.have_replicas = database.role == Role::Replica;
343345
}
346+
if !existing.sharded {
347+
existing.sharded = database.shard > 0;
348+
}
349+
350+
if (existing.sharded || existing.have_replicas)
351+
&& self.general.query_parser == QueryParserLevel::Off
352+
&& !existing.parser_warned
353+
{
354+
existing.parser_warned = true;
355+
warn!(
356+
r#"database "{}" may need the query parser for load balancing/sharding, but it's disabled"#,
357+
database.name
358+
);
359+
}
344360
} else {
345361
checks.insert(
346362
database.name.clone(),
347363
Check {
348364
pooler_mode: database.pooler_mode,
349365
role: database.role,
350366
role_warned: false,
367+
parser_warned: false,
351368
have_replicas: database.role == Role::Replica,
369+
sharded: database.shard > 0,
352370
},
353371
);
354372
}
@@ -381,13 +399,15 @@ impl Config {
381399

382400
if !self.general.two_phase_commit && self.rewrite.enabled {
383401
if self.rewrite.shard_key == RewriteMode::Rewrite {
384-
warn!("rewrite.shard_key=rewrite will apply non-atomic shard-key rewrites; enabling two_phase_commit is strongly recommended"
385-
);
402+
warn!(
403+
r#"rewrite.shard_key = "rewrite" may apply non-atomic sharding key rewrites; enabling "two_phase_commit" is strongly recommended"#
404+
);
386405
}
387406

388407
if self.rewrite.split_inserts == RewriteMode::Rewrite {
389-
warn!("rewrite.split_inserts=rewrite may commit partial multi-row INSERTs; enabling two_phase_commit is strongly recommended"
390-
);
408+
warn!(
409+
r#"rewrite.split_inserts = "rewrite" may commit partial multi-row inserts; enabling "two_phase_commit" is strongly recommended"#
410+
);
391411
}
392412
}
393413

@@ -396,11 +416,16 @@ impl Config {
396416
&& self.general.read_write_split == ReadWriteSplit::ExcludePrimary
397417
{
398418
warn!(
399-
r#"database "{}" has no replicas and read_write_split is set to "{}", read queries will not be served"#,
419+
r#"database "{}" has no replicas and "read_write_split" is set to "{}": read queries will be rejected"#,
400420
database, self.general.read_write_split
401421
);
402422
}
403423
}
424+
425+
if self.general.query_parser_enabled {
426+
warn!(r#""query_parser_enabled" is deprecated, use "query_parser" = "on" instead"#);
427+
self.general.query_parser = QueryParserLevel::On;
428+
}
404429
}
405430

406431
/// Multi-tenancy is enabled.

pgdog-config/src/general.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::path::PathBuf;
55
use std::time::Duration;
66

77
use crate::pooling::ConnectionRecovery;
8+
use crate::QueryParserLevel;
89

910
use super::auth::{AuthType, PassthoughAuth};
1011
use super::database::{LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy};
@@ -96,6 +97,9 @@ pub struct General {
9697
/// Parse Queries override.
9798
#[serde(default = "General::query_parser_enabled")]
9899
pub query_parser_enabled: bool,
100+
/// Query parser.
101+
#[serde(default)]
102+
pub query_parser: QueryParserLevel,
99103
/// Limit on the number of prepared statements in the server cache.
100104
#[serde(default = "General::prepared_statements_limit")]
101105
pub prepared_statements_limit: usize,
@@ -219,6 +223,7 @@ impl Default for General {
219223
openmetrics_namespace: Self::openmetrics_namespace(),
220224
prepared_statements: Self::prepared_statements(),
221225
query_parser_enabled: Self::query_parser_enabled(),
226+
query_parser: QueryParserLevel::default(),
222227
prepared_statements_limit: Self::prepared_statements_limit(),
223228
query_cache_limit: Self::query_cache_limit(),
224229
passthrough_auth: Self::default_passthrough_auth(),

pgdog-config/src/sharding.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,3 +297,12 @@ impl ListShards {
297297
}
298298
}
299299
}
300+
301+
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
302+
#[serde(rename_all = "snake_case", deny_unknown_fields)]
303+
pub enum QueryParserLevel {
304+
On,
305+
#[default]
306+
Auto,
307+
Off,
308+
}

pgdog/src/admin/set.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ impl Command for Set {
145145
config.config.general.tls_client_required = Self::from_json(&self.value)?;
146146
}
147147

148+
"query_parser" => {
149+
config.config.general.query_parser = Self::from_json(&self.value)?;
150+
}
151+
148152
_ => return Err(Error::Syntax),
149153
}
150154

0 commit comments

Comments
 (0)