Skip to content

Commit cecfceb

Browse files
authored
Add query parameter support for Database URL configuration (#525)
* Add query parameter support for Database URL configuration Enables configuring Database fields via URL query parameters in databases_from_urls(), allowing flexible configuration through environment variables without separate config files. Supported parameters: database_name, role, shard, user, password, pool_size, min_pool_size, pooler_mode, statement_timeout, idle_timeout, server_lifetime, read_only Example: PGDOG_DATABASE_URL_1=postgres://user:pass@host:port/db?database_name=realdb&role=replica&shard=3 * Load mirroring configuration from PGDOG_MIRRORING_N ENV vars
1 parent 54e86cf commit cecfceb

File tree

5 files changed

+305
-7
lines changed

5 files changed

+305
-7
lines changed

pgdog/src/config/database.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,15 @@ impl std::fmt::Display for Role {
136136
}
137137
}
138138
}
139+
140+
impl FromStr for Role {
141+
type Err = String;
142+
143+
fn from_str(s: &str) -> Result<Self, Self::Err> {
144+
match s.to_lowercase().as_str() {
145+
"primary" => Ok(Self::Primary),
146+
"replica" => Ok(Self::Replica),
147+
_ => Err(format!("Invalid role: {}", s)),
148+
}
149+
}
150+
}

pgdog/src/config/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ pub enum Error {
2525

2626
#[error("no database urls in environment")]
2727
NoDbsInEnv,
28+
29+
#[error("parse error: {0}")]
30+
ParseError(String),
2831
}
2932

3033
impl Error {

pgdog/src/config/mod.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ pub fn from_urls(urls: &[String]) -> Result<ConfigAndUsers, Error> {
9595
/// Extract all database URLs from the environment and
9696
/// create the config.
9797
pub fn from_env() -> Result<ConfigAndUsers, Error> {
98+
let _lock = LOCK.lock();
99+
98100
let mut urls = vec![];
99101
let mut index = 1;
100102
while let Ok(url) = env::var(format!("PGDOG_DATABASE_URL_{}", index)) {
@@ -103,10 +105,26 @@ pub fn from_env() -> Result<ConfigAndUsers, Error> {
103105
}
104106

105107
if urls.is_empty() {
106-
Err(Error::NoDbsInEnv)
107-
} else {
108-
from_urls(&urls)
108+
return Err(Error::NoDbsInEnv);
109109
}
110+
111+
let mut config = (*config()).clone();
112+
config = config.databases_from_urls(&urls)?;
113+
114+
// Extract mirroring configuration
115+
let mut mirror_strs = vec![];
116+
let mut index = 1;
117+
while let Ok(mirror_str) = env::var(format!("PGDOG_MIRRORING_{}", index)) {
118+
mirror_strs.push(mirror_str);
119+
index += 1;
120+
}
121+
122+
if !mirror_strs.is_empty() {
123+
config = config.mirroring_from_strings(&mirror_strs)?;
124+
}
125+
126+
CONFIG.store(Arc::new(config.clone()));
127+
Ok(config)
110128
}
111129

112130
/// Override some settings.

pgdog/src/config/replication.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use serde::{Deserialize, Serialize};
22
use std::path::PathBuf;
3+
use std::str::FromStr;
34
use std::time::Duration;
45

56
#[derive(Deserialize)]
@@ -131,6 +132,54 @@ pub struct Mirroring {
131132
pub exposure: Option<f32>,
132133
}
133134

135+
impl FromStr for Mirroring {
136+
type Err = String;
137+
138+
fn from_str(s: &str) -> Result<Self, Self::Err> {
139+
let mut source_db = None;
140+
let mut destination_db = None;
141+
let mut queue_length = None;
142+
let mut exposure = None;
143+
144+
for pair in s.split('&') {
145+
let parts: Vec<&str> = pair.split('=').collect();
146+
if parts.len() != 2 {
147+
return Err(format!("Invalid key=value pair: {}", pair));
148+
}
149+
150+
match parts[0] {
151+
"source_db" => source_db = Some(parts[1].to_string()),
152+
"destination_db" => destination_db = Some(parts[1].to_string()),
153+
"queue_length" => {
154+
queue_length = Some(
155+
parts[1]
156+
.parse::<usize>()
157+
.map_err(|_| format!("Invalid queue_length: {}", parts[1]))?,
158+
);
159+
}
160+
"exposure" => {
161+
exposure = Some(
162+
parts[1]
163+
.parse::<f32>()
164+
.map_err(|_| format!("Invalid exposure: {}", parts[1]))?,
165+
);
166+
}
167+
_ => return Err(format!("Unknown parameter: {}", parts[0])),
168+
}
169+
}
170+
171+
let source_db = source_db.ok_or("Missing required parameter: source_db")?;
172+
let destination_db = destination_db.ok_or("Missing required parameter: destination_db")?;
173+
174+
Ok(Mirroring {
175+
source_db,
176+
destination_db,
177+
queue_length,
178+
exposure,
179+
})
180+
}
181+
}
182+
134183
/// Runtime mirror configuration with resolved values.
135184
#[derive(Debug, Clone)]
136185
pub struct MirrorConfig {

pgdog/src/config/url.rs

Lines changed: 220 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
//! Parse URL and convert to config struct.
2-
use std::{collections::BTreeSet, env::var};
2+
use std::{collections::BTreeSet, env::var, str::FromStr};
33
use url::Url;
44

5-
use super::{ConfigAndUsers, Database, Error, User, Users};
5+
use super::{ConfigAndUsers, Database, Error, PoolerMode, Role, User, Users};
66

77
fn database_name(url: &Url) -> String {
88
let database = url.path().chars().skip(1).collect::<String>();
@@ -21,12 +21,68 @@ impl From<&Url> for Database {
2121
.unwrap_or("127.0.0.1".into());
2222
let port = value.port().unwrap_or(5432);
2323

24-
Database {
24+
let mut database = Database {
2525
name: database_name(value),
2626
host,
2727
port,
2828
..Default::default()
29+
};
30+
31+
for (key, val) in value.query_pairs() {
32+
match key.as_ref() {
33+
"database_name" => database.database_name = Some(val.to_string()),
34+
"role" => {
35+
if let Ok(role) = Role::from_str(&val) {
36+
database.role = role;
37+
}
38+
}
39+
"shard" => {
40+
if let Ok(shard) = val.parse::<usize>() {
41+
database.shard = shard;
42+
}
43+
}
44+
"user" => database.user = Some(val.to_string()),
45+
"password" => database.password = Some(val.to_string()),
46+
"pool_size" => {
47+
if let Ok(size) = val.parse::<usize>() {
48+
database.pool_size = Some(size);
49+
}
50+
}
51+
"min_pool_size" => {
52+
if let Ok(size) = val.parse::<usize>() {
53+
database.min_pool_size = Some(size);
54+
}
55+
}
56+
"pooler_mode" => {
57+
if let Ok(mode) = PoolerMode::from_str(&val) {
58+
database.pooler_mode = Some(mode);
59+
}
60+
}
61+
"statement_timeout" => {
62+
if let Ok(timeout) = val.parse::<u64>() {
63+
database.statement_timeout = Some(timeout);
64+
}
65+
}
66+
"idle_timeout" => {
67+
if let Ok(timeout) = val.parse::<u64>() {
68+
database.idle_timeout = Some(timeout);
69+
}
70+
}
71+
"read_only" => {
72+
if let Ok(read_only) = val.parse::<bool>() {
73+
database.read_only = Some(read_only);
74+
}
75+
}
76+
"server_lifetime" => {
77+
if let Ok(lifetime) = val.parse::<u64>() {
78+
database.server_lifetime = Some(lifetime);
79+
}
80+
}
81+
_ => {}
82+
}
2983
}
84+
85+
database
3086
}
3187
}
3288

@@ -39,7 +95,6 @@ impl From<&Url> for User {
3995
user.to_string()
4096
};
4197
let password = value.password().unwrap_or("postgres").to_owned();
42-
4398
User {
4499
name: user,
45100
password: Some(password),
@@ -74,6 +129,20 @@ impl ConfigAndUsers {
74129

75130
Ok(self)
76131
}
132+
133+
/// Load from mirroring strings.
134+
pub fn mirroring_from_strings(mut self, mirror_strs: &[String]) -> Result<Self, Error> {
135+
use super::Mirroring;
136+
137+
let mirroring = mirror_strs
138+
.iter()
139+
.map(|s| Mirroring::from_str(s).map_err(|e| Error::ParseError(e)))
140+
.collect::<Result<Vec<_>, _>>()?;
141+
142+
self.config.mirroring = mirroring;
143+
144+
Ok(self)
145+
}
77146
}
78147

79148
#[cfg(test)]
@@ -85,4 +154,151 @@ mod test {
85154
let url = Url::parse("postgres://user:password@host:5432/name").unwrap();
86155
println!("{:#?}", url);
87156
}
157+
158+
#[test]
159+
fn test_database_name_from_query_param() {
160+
let url =
161+
Url::parse("postgres://user:password@host:5432/name?database_name=dbname").unwrap();
162+
let database = Database::from(&url);
163+
164+
assert_eq!(database.name, "name");
165+
assert_eq!(database.database_name, Some("dbname".to_string()));
166+
}
167+
168+
#[test]
169+
fn test_role_from_query_param() {
170+
let url = Url::parse("postgres://user:password@host:5432/name?role=replica").unwrap();
171+
let database = Database::from(&url);
172+
173+
assert_eq!(database.role, super::super::Role::Replica);
174+
}
175+
176+
#[test]
177+
fn test_shard_from_query_param() {
178+
let url = Url::parse("postgres://user:password@host:5432/name?shard=5").unwrap();
179+
let database = Database::from(&url);
180+
181+
assert_eq!(database.shard, 5);
182+
}
183+
184+
#[test]
185+
fn test_numeric_fields_from_query_params() {
186+
let url = Url::parse("postgres://user:password@host:5432/name?pool_size=10&min_pool_size=2&statement_timeout=5000&idle_timeout=300&server_lifetime=3600").unwrap();
187+
let database = Database::from(&url);
188+
189+
assert_eq!(database.pool_size, Some(10));
190+
assert_eq!(database.min_pool_size, Some(2));
191+
assert_eq!(database.statement_timeout, Some(5000));
192+
assert_eq!(database.idle_timeout, Some(300));
193+
assert_eq!(database.server_lifetime, Some(3600));
194+
}
195+
196+
#[test]
197+
fn test_bool_field_from_query_param() {
198+
let url = Url::parse("postgres://user:password@host:5432/name?read_only=true").unwrap();
199+
let database = Database::from(&url);
200+
201+
assert_eq!(database.read_only, Some(true));
202+
}
203+
204+
#[test]
205+
fn test_pooler_mode_from_query_param() {
206+
let url =
207+
Url::parse("postgres://user:password@host:5432/name?pooler_mode=session").unwrap();
208+
let database = Database::from(&url);
209+
210+
assert_eq!(
211+
database.pooler_mode,
212+
Some(super::super::PoolerMode::Session)
213+
);
214+
}
215+
216+
#[test]
217+
fn test_string_fields_from_query_params() {
218+
let url = Url::parse("postgres://user:password@host:5432/name?user=admin&password=secret")
219+
.unwrap();
220+
let database = Database::from(&url);
221+
222+
assert_eq!(database.user, Some("admin".to_string()));
223+
assert_eq!(database.password, Some("secret".to_string()));
224+
}
225+
226+
#[test]
227+
fn test_multiple_query_params() {
228+
let url = Url::parse("postgres://user:password@host:5432/name?database_name=realdb&role=replica&shard=3&pool_size=20&read_only=true").unwrap();
229+
let database = Database::from(&url);
230+
231+
assert_eq!(database.name, "name");
232+
assert_eq!(database.database_name, Some("realdb".to_string()));
233+
assert_eq!(database.role, super::super::Role::Replica);
234+
assert_eq!(database.shard, 3);
235+
assert_eq!(database.pool_size, Some(20));
236+
assert_eq!(database.read_only, Some(true));
237+
}
238+
239+
#[test]
240+
fn test_basic_mirroring_string() {
241+
let mirror_str = "source_db=primary&destination_db=backup";
242+
let mirroring = super::super::Mirroring::from_str(mirror_str).unwrap();
243+
244+
assert_eq!(mirroring.source_db, "primary");
245+
assert_eq!(mirroring.destination_db, "backup");
246+
assert_eq!(mirroring.queue_length, None);
247+
assert_eq!(mirroring.exposure, None);
248+
}
249+
250+
#[test]
251+
fn test_mirroring_with_queue_length() {
252+
let mirror_str = "source_db=db1&destination_db=db2&queue_length=256";
253+
let mirroring = super::super::Mirroring::from_str(mirror_str).unwrap();
254+
255+
assert_eq!(mirroring.source_db, "db1");
256+
assert_eq!(mirroring.destination_db, "db2");
257+
assert_eq!(mirroring.queue_length, Some(256));
258+
assert_eq!(mirroring.exposure, None);
259+
}
260+
261+
#[test]
262+
fn test_mirroring_with_exposure() {
263+
let mirror_str = "source_db=prod&destination_db=staging&exposure=0.5";
264+
let mirroring = super::super::Mirroring::from_str(mirror_str).unwrap();
265+
266+
assert_eq!(mirroring.source_db, "prod");
267+
assert_eq!(mirroring.destination_db, "staging");
268+
assert_eq!(mirroring.queue_length, None);
269+
assert_eq!(mirroring.exposure, Some(0.5));
270+
}
271+
272+
#[test]
273+
fn test_mirroring_with_both_overrides() {
274+
let mirror_str = "source_db=main&destination_db=backup&queue_length=512&exposure=0.75";
275+
let mirroring = super::super::Mirroring::from_str(mirror_str).unwrap();
276+
277+
assert_eq!(mirroring.source_db, "main");
278+
assert_eq!(mirroring.destination_db, "backup");
279+
assert_eq!(mirroring.queue_length, Some(512));
280+
assert_eq!(mirroring.exposure, Some(0.75));
281+
}
282+
283+
#[test]
284+
fn test_config_mirroring_from_strings() {
285+
let config = ConfigAndUsers::default();
286+
let mirror_strs = vec![
287+
"source_db=db1&destination_db=db1_mirror".to_string(),
288+
"source_db=db2&destination_db=db2_mirror&queue_length=256&exposure=0.5".to_string(),
289+
];
290+
291+
let config = config.mirroring_from_strings(&mirror_strs).unwrap();
292+
293+
assert_eq!(config.config.mirroring.len(), 2);
294+
assert_eq!(config.config.mirroring[0].source_db, "db1");
295+
assert_eq!(config.config.mirroring[0].destination_db, "db1_mirror");
296+
assert_eq!(config.config.mirroring[0].queue_length, None);
297+
assert_eq!(config.config.mirroring[0].exposure, None);
298+
299+
assert_eq!(config.config.mirroring[1].source_db, "db2");
300+
assert_eq!(config.config.mirroring[1].destination_db, "db2_mirror");
301+
assert_eq!(config.config.mirroring[1].queue_length, Some(256));
302+
assert_eq!(config.config.mirroring[1].exposure, Some(0.5));
303+
}
88304
}

0 commit comments

Comments
 (0)