Skip to content

Commit 0421fe6

Browse files
authored
search_path schema-based sharding (#621)
* search_path sharding * Handle insert * Finish up * Fix explain * remove debug * move up
1 parent f25f092 commit 0421fe6

File tree

8 files changed

+158
-7
lines changed

8 files changed

+158
-7
lines changed

pgdog/src/backend/server.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -451,26 +451,41 @@ impl Server {
451451
) -> Result<usize, Error> {
452452
// Sync application_name parameter
453453
// and update it in the stats.
454-
let default_name = "PgDog";
455-
let server_name = self
456-
.client_params
457-
.get_default("application_name", default_name);
458-
let client_name = params.get_default("application_name", default_name);
454+
let server_name = self.client_params.get_default("application_name", "PgDog");
455+
let client_name = params.get_default("application_name", "PgDog");
456+
459457
self.stats.link_client(client_name, server_name, id);
460458

461459
// Clear any params previously tracked by SET.
462460
self.changed_params.clear();
463461

464462
// Compare client and server params.
465463
if !params.identical(&self.client_params) {
464+
// Construct client parameter SET queries.
466465
let tracked = params.tracked();
466+
// Construct RESET queries to reset any current params
467+
// to their default values.
467468
let mut queries = self.client_params.reset_queries();
469+
470+
// Combine both to create a new, fresh session state
471+
// on this connection.
468472
queries.extend(tracked.set_queries());
473+
474+
// Set state on the connection only if
475+
// there are any params to change.
469476
if !queries.is_empty() {
470477
debug!("syncing {} params", queries.len());
478+
471479
self.execute_batch(&queries).await?;
480+
481+
// We can receive ParameterStatus messages here,
482+
// but we should ignore them since we are managing the session state.
483+
self.changed_params.clear();
472484
}
485+
486+
// Update params on this connection.
473487
self.client_params = tracked;
488+
474489
Ok(queries.len())
475490
} else {
476491
Ok(0)

pgdog/src/frontend/router/parser/explain_trace.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ impl ExplainEntry {
7373
}
7474
}
7575

76+
/// EXPLAIN recorder.
7677
#[derive(Debug, Default)]
7778
pub struct ExplainRecorder {
7879
entries: Vec<ExplainEntry>,
@@ -89,6 +90,12 @@ impl ExplainRecorder {
8990
self.entries.push(ExplainEntry::new(shard, description));
9091
}
9192

93+
pub fn clear(&mut self) {
94+
self.entries.clear();
95+
self.comment = None;
96+
self.plugin = None;
97+
}
98+
9299
pub fn record_comment_override(&mut self, shard: Shard, role: Option<&str>) {
93100
let mut description = match shard {
94101
Shard::Direct(_) | Shard::Multi(_) | Shard::All => {

pgdog/src/frontend/router/parser/query/ddl.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,17 @@ impl QueryParser {
99
node: &Option<NodeEnum>,
1010
context: &mut QueryParserContext<'_>,
1111
) -> Result<Command, Error> {
12-
Self::shard_ddl(node, &context.sharding_schema)
12+
let mut command = Self::shard_ddl(node, &context.sharding_schema)?;
13+
14+
if let Command::Query(ref mut route) = command {
15+
if route.shard().all() {
16+
if let Some(shard) = self.check_search_path_for_shard(context)? {
17+
route.set_shard_mut(shard);
18+
}
19+
}
20+
}
21+
22+
Ok(command)
1323
}
1424

1525
pub(super) fn shard_ddl(

pgdog/src/frontend/router/parser/query/delete.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ impl QueryParser {
1111
) -> Result<Command, Error> {
1212
let table = stmt.relation.as_ref().map(Table::from);
1313

14+
if let Some(shard) = self.check_search_path_for_shard(context)? {
15+
return Ok(Command::Query(Route::write(shard)));
16+
}
17+
1418
if let Some(table) = table {
1519
// Schema-based sharding.
1620
if let Some(schema) = context.sharding_schema.schemas.get(table.schema()) {

pgdog/src/frontend/router/parser/query/mod.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ mod ddl;
2828
mod delete;
2929
mod explain;
3030
mod plugins;
31-
pub mod schema_sharding;
31+
mod schema_sharding;
3232
mod select;
3333
mod set;
3434
mod shared;
@@ -382,6 +382,46 @@ impl QueryParser {
382382
// with the fingerprint.
383383
//
384384
if route.shard().all() {
385+
// Check search_path for schema.
386+
let search_path = context.router_context.params.get("search_path");
387+
388+
// Quick inline function to shard query
389+
// based on schema in search_path.
390+
fn shard_from_search_path(
391+
search_path: &str,
392+
context: &QueryParserContext<'_>,
393+
query_parser: &mut QueryParser,
394+
route: &mut Route,
395+
) {
396+
let schema = Schema::from(search_path);
397+
398+
if let Some(schema) = context.sharding_schema.schemas.get(Some(schema)) {
399+
let shard: Shard = schema.shard().into();
400+
401+
if let Some(recorder) = query_parser.recorder_mut() {
402+
recorder.record_entry(
403+
Some(shard.clone()),
404+
format!("matched schema {} in search_path", schema.name()),
405+
);
406+
}
407+
route.set_shard_mut(shard);
408+
}
409+
}
410+
411+
match search_path {
412+
Some(ParameterValue::String(search_path)) => {
413+
shard_from_search_path(search_path, context, self, route);
414+
}
415+
416+
Some(ParameterValue::Tuple(search_paths)) => {
417+
for schema in search_paths {
418+
shard_from_search_path(schema, context, self, route);
419+
}
420+
}
421+
422+
None => (),
423+
}
424+
385425
let databases = databases();
386426
// Only fingerprint the query if some manual queries are configured.
387427
// Otherwise, we're wasting time parsing SQL.
@@ -469,6 +509,10 @@ impl QueryParser {
469509
context.split_insert_mode(),
470510
)?;
471511

512+
if let Some(shard) = self.check_search_path_for_shard(context)? {
513+
return Ok(Command::Query(Route::write(shard)));
514+
}
515+
472516
match routing {
473517
InsertRouting::Routed(shard) => {
474518
if let Some(recorder) = self.recorder_mut() {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,64 @@
1+
use super::*;
12

3+
impl QueryParser {
4+
pub(super) fn check_search_path_for_shard(
5+
&mut self,
6+
context: &QueryParserContext<'_>,
7+
) -> Result<Option<Shard>, Error> {
8+
// Shortcut.
9+
if context.sharding_schema.schemas.is_empty() {
10+
return Ok(None);
11+
}
12+
13+
// Check search_path for schema.
14+
let search_path = context.router_context.params.get("search_path");
15+
16+
match search_path {
17+
Some(ParameterValue::String(search_path)) => {
18+
let schema = Schema::from(search_path.as_str());
19+
if let Some(schema) = context.sharding_schema.schemas.get(Some(schema)) {
20+
let shard: Shard = schema.shard().into();
21+
if let Some(recorder) = self.recorder_mut() {
22+
recorder.record_entry(
23+
Some(shard.clone()),
24+
format!("matched schema \"{}\" in search_path", schema.name()),
25+
);
26+
}
27+
return Ok(Some(shard));
28+
}
29+
}
30+
31+
Some(ParameterValue::Tuple(search_paths)) => {
32+
let mut candidates = vec![];
33+
34+
for (idx, schema) in search_paths.iter().enumerate() {
35+
let schema = Schema::from(schema.as_str());
36+
if let Some(schema) = context.sharding_schema.schemas.get(Some(schema)) {
37+
let shard: Shard = schema.shard().into();
38+
let catch_all = schema.is_default();
39+
candidates.push((shard, catch_all, idx));
40+
}
41+
}
42+
43+
// false < true
44+
// Catch-all schemas go first, more qualified ones go last.
45+
candidates.sort_by_key(|cand| !cand.1);
46+
if let Some(candidate) = candidates.pop() {
47+
if let Some(schema) = search_paths.get(candidate.2) {
48+
if let Some(recorder) = self.recorder_mut() {
49+
recorder.record_entry(
50+
Some(candidate.0.clone()),
51+
format!("matched schema mult \"{}\" in search_path", schema),
52+
);
53+
}
54+
}
55+
return Ok(Some(candidate.0));
56+
}
57+
}
58+
59+
None => (),
60+
}
61+
62+
Ok(None)
63+
}
64+
}

pgdog/src/frontend/router/parser/query/select.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ impl QueryParser {
6060
)?;
6161
}
6262

63+
if let Some(Shard::Direct(number)) = self.check_search_path_for_shard(context)? {
64+
return Ok(Command::Query(Route::read(number).set_write(writes)));
65+
}
66+
6367
// Schema-based sharding.
6468
for table in cached_ast.tables() {
6569
if let Some(schema) = context.sharding_schema.schemas.get(table.schema()) {

pgdog/src/frontend/router/parser/query/update.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ impl QueryParser {
2020
) -> Result<Command, Error> {
2121
let table = stmt.relation.as_ref().map(Table::from);
2222

23+
if let Some(shard) = self.check_search_path_for_shard(context)? {
24+
return Ok(Command::Query(Route::write(shard)));
25+
}
26+
2327
if let Some(table) = table {
2428
// Schema-based sharding.
2529
if let Some(schema) = context.sharding_schema.schemas.get(table.schema()) {

0 commit comments

Comments
 (0)