Skip to content

Commit 74e51d9

Browse files
authored
Automatic database role detection (#616)
1 parent a272ce2 commit 74e51d9

File tree

26 files changed

+711
-120
lines changed

26 files changed

+711
-120
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
docker stop $(docker ps | grep 45000 | awk '{print $1}')
3+
PGPASSWORD=postgres psql -h 127.0.0.1 -p 45001 -U postgres postgres -c 'SELECT pg_promote()'
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[general]
2+
lsn_check_delay = 0
3+
lsn_check_interval = 5_000
4+
idle_timeout = 1_000
5+
6+
[[databases]]
7+
name = "postgres"
8+
role = "auto"
9+
host = "127.0.0.1"
10+
port = 45000
11+
12+
[[databases]]
13+
name = "postgres"
14+
role = "auto"
15+
host = "127.0.0.1"
16+
port = 45001
17+
18+
[[databases]]
19+
name = "postgres"
20+
role = "auto"
21+
host = "127.0.0.1"
22+
port = 45002
23+
24+
[admin]
25+
password = "pgdog"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[[users]]
2+
name = "postgres"
3+
database = "postgres"
4+
password = "postgres"

pgdog-config/src/core.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::path::PathBuf;
55
use tracing::{info, warn};
66

77
use crate::sharding::ShardedSchema;
8-
use crate::{Memory, PassthoughAuth, PreparedStatements, RewriteMode};
8+
use crate::{EnumeratedDatabase, Memory, PassthoughAuth, PreparedStatements, RewriteMode};
99

1010
use super::database::Database;
1111
use super::error::Error;
@@ -191,8 +191,9 @@ pub struct Config {
191191

192192
impl Config {
193193
/// Organize all databases by name for quicker retrieval.
194-
pub fn databases(&self) -> HashMap<String, Vec<Vec<Database>>> {
194+
pub fn databases(&self) -> HashMap<String, Vec<Vec<EnumeratedDatabase>>> {
195195
let mut databases = HashMap::new();
196+
let mut number = 0;
196197
for database in &self.databases {
197198
let entry = databases
198199
.entry(database.name.clone())
@@ -203,7 +204,11 @@ impl Config {
203204
entry
204205
.get_mut(database.shard)
205206
.unwrap()
206-
.push(database.clone());
207+
.push(EnumeratedDatabase {
208+
number,
209+
database: database.clone(),
210+
});
211+
number += 1;
207212
}
208213
databases
209214
}

pgdog-config/src/database.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use serde::{Deserialize, Serialize};
2-
use std::str::FromStr;
2+
use std::{
3+
ops::{Deref, DerefMut},
4+
str::FromStr,
5+
};
36

47
use super::pooling::PoolerMode;
58

@@ -153,3 +156,25 @@ impl FromStr for Role {
153156
}
154157
}
155158
}
159+
160+
/// Database with a unique number, identifying it
161+
/// in the config.
162+
#[derive(Debug, Clone)]
163+
pub struct EnumeratedDatabase {
164+
pub number: usize,
165+
pub database: Database,
166+
}
167+
168+
impl Deref for EnumeratedDatabase {
169+
type Target = Database;
170+
171+
fn deref(&self) -> &Self::Target {
172+
&self.database
173+
}
174+
}
175+
176+
impl DerefMut for EnumeratedDatabase {
177+
fn deref_mut(&mut self) -> &mut Self::Target {
178+
&mut self.database
179+
}
180+
}

pgdog-config/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ pub mod util;
1919
pub use auth::{AuthType, PassthoughAuth};
2020
pub use core::{Config, ConfigAndUsers};
2121
pub use data_types::*;
22-
pub use database::{Database, LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy, Role};
22+
pub use database::{
23+
Database, EnumeratedDatabase, LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy, Role,
24+
};
2325
pub use error::Error;
2426
pub use general::General;
2527
pub use memory::*;

pgdog/src/admin/show_replication.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl Command for ShowReplication {
3232
Field::numeric("port"),
3333
Field::numeric("shard"),
3434
Field::text("role"),
35-
Field::text("pg_replica_lag"),
35+
Field::text("replica_lag"),
3636
Field::text("pg_lsn"),
3737
Field::text("lsn_age"),
3838
Field::text("pg_is_in_recovery"),

pgdog/src/admin/show_stats.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ impl Command for ShowStats {
3636
Field::numeric(&format!("{}_received", prefix)),
3737
Field::numeric(&format!("{}_sent", prefix)),
3838
Field::numeric(&format!("{}_xact_time", prefix)),
39+
Field::numeric(&format!("{}_idle_xact_time", prefix)),
3940
Field::numeric(&format!("{}_query_time", prefix)),
4041
Field::numeric(&format!("{}_wait_time", prefix)),
4142
// Field::numeric(&format!("{}_client_parse_count", prefix)),
@@ -83,6 +84,7 @@ impl Command for ShowStats {
8384
.add(stat.received)
8485
.add(stat.sent)
8586
.add(stat.xact_time.as_millis() as u64)
87+
.add(stat.idle_xact_time.as_millis() as u64)
8688
.add(stat.query_time.as_millis() as u64)
8789
.add(stat.wait_time.as_millis() as u64)
8890
.add(stat.parse_count)

pgdog/src/backend/databases.rs

Lines changed: 122 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ pub fn reconnect() -> Result<(), Error> {
7171
Ok(())
7272
}
7373

74+
/// Re-create databases from existing config,
75+
/// preserving connections.
76+
pub fn reload_from_existing() {
77+
let _lock = lock();
78+
79+
let config = config();
80+
let databases = from_config(&config);
81+
82+
replace_databases(databases, true);
83+
}
84+
7485
/// Initialize the databases for the first time.
7586
pub fn init() {
7687
let config = config();
@@ -180,6 +191,15 @@ impl ToUser for (&str, Option<&str>) {
180191
}
181192
}
182193

194+
impl ToUser for &pgdog_config::User {
195+
fn to_user(&self) -> User {
196+
User {
197+
user: self.name.clone(),
198+
database: self.database.clone(),
199+
}
200+
}
201+
}
202+
183203
/// Databases.
184204
#[derive(Default, Clone)]
185205
pub struct Databases {
@@ -354,95 +374,123 @@ pub(crate) fn new_pool(
354374
user: &crate::config::User,
355375
config: &crate::config::Config,
356376
) -> Option<(User, Cluster)> {
377+
let existing_roles = databases()
378+
.cluster(user)
379+
.ok()
380+
.map(|cluster| cluster.redetect_roles());
381+
357382
let sharded_tables = config.sharded_tables();
358383
let omnisharded_tables = config.omnisharded_tables();
359384
let sharded_mappings = config.sharded_mappings();
360385
let sharded_schemas = config.sharded_schemas();
361386
let general = &config.general;
362387
let databases = config.databases();
363-
let shards = databases.get(&user.database);
364-
365-
if let Some(shards) = shards {
366-
let mut shard_configs = vec![];
367-
for user_databases in shards {
368-
let has_single_replica = user_databases.len() == 1;
369-
let primary = user_databases
370-
.iter()
371-
.find(|d| d.role == Role::Primary)
372-
.map(|primary| PoolConfig {
373-
address: Address::new(primary, user),
374-
config: Config::new(general, primary, user, has_single_replica),
375-
});
376-
let replicas = user_databases
377-
.iter()
378-
.filter(|d| d.role == Role::Replica)
379-
.map(|replica| PoolConfig {
380-
address: Address::new(replica, user),
381-
config: Config::new(general, replica, user, has_single_replica),
382-
})
383-
.collect::<Vec<_>>();
384-
385-
shard_configs.push(ClusterShardConfig { primary, replicas });
386-
}
387-
388-
let mut sharded_tables = sharded_tables
389-
.get(&user.database)
390-
.cloned()
391-
.unwrap_or_default();
392-
let sharded_schemas = sharded_schemas
393-
.get(&user.database)
394-
.cloned()
395-
.unwrap_or_default();
396-
397-
for sharded_table in &mut sharded_tables {
398-
let mappings = sharded_mappings.get(&(
399-
sharded_table.database.clone(),
400-
sharded_table.column.clone(),
401-
sharded_table.name.clone(),
402-
));
403388

404-
if let Some(mappings) = mappings {
405-
sharded_table.mapping = Mapping::new(mappings);
389+
let mut shards = if let Some(shards) = databases.get(&user.database).cloned() {
390+
shards
391+
} else {
392+
return None;
393+
};
406394

407-
if let Some(ref mapping) = sharded_table.mapping {
408-
if !mapping_valid(mapping) {
409-
warn!(
410-
"sharded table name=\"{}\", column=\"{}\" has overlapping ranges",
411-
sharded_table.name.as_ref().unwrap_or(&String::from("")),
412-
sharded_table.column
413-
);
395+
let mut shard_configs = vec![];
396+
for (shard_number, user_databases) in shards.iter_mut().enumerate() {
397+
let role_detector = user_databases.iter().any(|d| d.role == Role::Auto);
398+
399+
if let Some(ref shard_roles) = existing_roles
400+
.as_ref()
401+
.map(|existing_roles| existing_roles.get(shard_number).cloned())
402+
.flatten()
403+
.flatten()
404+
{
405+
for user_database in user_databases.iter_mut() {
406+
// Override role with automatically detected one.
407+
if let Some(role) = shard_roles.get(&user_database.number) {
408+
if user_database.role == Role::Auto {
409+
user_database.role = role.role;
414410
}
415411
}
416412
}
417413
}
418414

419-
let omnisharded_tables = omnisharded_tables
420-
.get(&user.database)
421-
.cloned()
422-
.unwrap_or(vec![]);
423-
let sharded_tables = ShardedTables::new(sharded_tables, omnisharded_tables);
424-
let sharded_schemas = ShardedSchemas::new(sharded_schemas);
425-
426-
let cluster_config = ClusterConfig::new(
427-
general,
428-
user,
429-
&shard_configs,
430-
sharded_tables,
431-
config.multi_tenant(),
432-
sharded_schemas,
433-
&config.rewrite,
434-
);
415+
let has_single_replica = user_databases.len() == 1;
416+
let primary = user_databases
417+
.iter()
418+
.find(|d| d.role == Role::Primary)
419+
.map(|primary| PoolConfig {
420+
address: Address::new(primary, user, primary.number),
421+
config: Config::new(general, primary, user, has_single_replica),
422+
});
423+
let replicas = user_databases
424+
.iter()
425+
.filter(|d| matches!(d.role, Role::Replica | Role::Auto))
426+
.map(|replica| PoolConfig {
427+
address: Address::new(replica, user, replica.number),
428+
config: Config::new(general, replica, user, has_single_replica),
429+
})
430+
.collect::<Vec<_>>();
431+
432+
shard_configs.push(ClusterShardConfig {
433+
primary,
434+
replicas,
435+
role_detector,
436+
});
437+
}
435438

436-
Some((
437-
User {
438-
user: user.name.clone(),
439-
database: user.database.clone(),
440-
},
441-
Cluster::new(cluster_config),
442-
))
443-
} else {
444-
None
439+
let mut sharded_tables = sharded_tables
440+
.get(&user.database)
441+
.cloned()
442+
.unwrap_or_default();
443+
let sharded_schemas = sharded_schemas
444+
.get(&user.database)
445+
.cloned()
446+
.unwrap_or_default();
447+
448+
for sharded_table in &mut sharded_tables {
449+
let mappings = sharded_mappings.get(&(
450+
sharded_table.database.clone(),
451+
sharded_table.column.clone(),
452+
sharded_table.name.clone(),
453+
));
454+
455+
if let Some(mappings) = mappings {
456+
sharded_table.mapping = Mapping::new(mappings);
457+
458+
if let Some(ref mapping) = sharded_table.mapping {
459+
if !mapping_valid(mapping) {
460+
warn!(
461+
"sharded table name=\"{}\", column=\"{}\" has overlapping ranges",
462+
sharded_table.name.as_ref().unwrap_or(&String::from("")),
463+
sharded_table.column
464+
);
465+
}
466+
}
467+
}
445468
}
469+
470+
let omnisharded_tables = omnisharded_tables
471+
.get(&user.database)
472+
.cloned()
473+
.unwrap_or(vec![]);
474+
let sharded_tables = ShardedTables::new(sharded_tables, omnisharded_tables);
475+
let sharded_schemas = ShardedSchemas::new(sharded_schemas);
476+
477+
let cluster_config = ClusterConfig::new(
478+
general,
479+
user,
480+
&shard_configs,
481+
sharded_tables,
482+
config.multi_tenant(),
483+
sharded_schemas,
484+
&config.rewrite,
485+
);
486+
487+
Some((
488+
User {
489+
user: user.name.clone(),
490+
database: user.database.clone(),
491+
},
492+
Cluster::new(cluster_config),
493+
))
446494
}
447495

448496
/// Load databases from config.

0 commit comments

Comments
 (0)