Skip to content

Commit cda5dbb

Browse files
authored
perf fixes && fix disconnect server (#63)
1 parent c6c020a commit cda5dbb

24 files changed

+660
-253
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pg_doorman"
3-
version = "2.4.3"
3+
version = "2.5.0"
44
edition = "2021"
55
rust-version = "1.87.0"
66
license = "MIT"

documentation/docs/changelog.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@ title: Changelog
44

55
# Changelog
66

7+
### 2.5.0 <small>Nov 18, 2025</small> { id="2.5.0" }
8+
9+
**Improvements:**
10+
- Reworked the statistics collection system, yielding up to 20% performance gain on fast queries.
11+
- Improved detection of `SAVEPOINT` usage, allowing the auto-rollback feature to be applied in more situations.
12+
13+
**Bug Fixes / Behavior:**
14+
- Less aggressive behavior on write errors when sending a response to the client: the server connection is no longer immediately marked as "bad" and evicted from the pool. We now read the remaining server response and clean up its state, returning the connection to the pool in a clean state. This improves performance during client reconnections.
15+
16+
717
### 2.4.3 <small>Nov 15, 2025</small> { id="2.4.3" }
818

919
**Bug Fixes:**

pg_doorman.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,20 @@ syslog_prog_name = "pg_doorman"
175175
# For example, it can be the address mask 10.0.0.0/8. By default, access is allowed from any address.
176176
hba = []
177177

178+
[prometheus]
179+
# Prometheus metrics exporter settings. See documentation/docs/reference/prometheus.md
180+
# Enable or disable the Prometheus exporter.
181+
# Default: false
182+
enabled = false
183+
184+
# Host to listen on for the metrics HTTP endpoint.
185+
# Default: "0.0.0.0"
186+
host = "0.0.0.0"
187+
188+
# Port for the metrics HTTP endpoint.
189+
# Default: 9127
190+
port = 9127
191+
178192
[pools]
179193
# Each record in the pool is the name of the virtual database that the pg-doorman client can connect to.
180194
[pools.exampledb]

src/admin.rs

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -138,29 +138,15 @@ where
138138
"free_clients".to_string(),
139139
client_stats
140140
.keys()
141-
.filter(|client_id| {
142-
client_stats
143-
.get(client_id)
144-
.unwrap()
145-
.state
146-
.load(Ordering::Relaxed)
147-
== CLIENT_STATE_IDLE
148-
})
141+
.filter(|client_id| client_stats.get(client_id).unwrap().state() == CLIENT_STATE_IDLE)
149142
.count()
150143
.to_string(),
151144
]));
152145
res.put(data_row(&vec![
153146
"used_clients".to_string(),
154147
client_stats
155148
.keys()
156-
.filter(|client_id| {
157-
client_stats
158-
.get(client_id)
159-
.unwrap()
160-
.state
161-
.load(Ordering::Relaxed)
162-
== CLIENT_STATE_ACTIVE
163-
})
149+
.filter(|client_id| client_stats.get(client_id).unwrap().state() == CLIENT_STATE_ACTIVE)
164150
.count()
165151
.to_string(),
166152
]));
@@ -172,29 +158,15 @@ where
172158
"free_servers".to_string(),
173159
server_stats
174160
.keys()
175-
.filter(|server_id| {
176-
server_stats
177-
.get(server_id)
178-
.unwrap()
179-
.state
180-
.load(Ordering::Relaxed)
181-
== SERVER_STATE_IDLE
182-
})
161+
.filter(|server_id| server_stats.get(server_id).unwrap().state() == SERVER_STATE_IDLE)
183162
.count()
184163
.to_string(),
185164
]));
186165
res.put(data_row(&vec![
187166
"used_servers".to_string(),
188167
server_stats
189168
.keys()
190-
.filter(|server_id| {
191-
server_stats
192-
.get(server_id)
193-
.unwrap()
194-
.state
195-
.load(Ordering::Relaxed)
196-
== SERVER_STATE_ACTIVE
197-
})
169+
.filter(|server_id| server_stats.get(server_id).unwrap().state() == SERVER_STATE_ACTIVE)
198170
.count()
199171
.to_string(),
200172
]));

src/auth/hba_eval_tests.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,18 @@ fn local_trust_is_ignored_for_tcp() {
201201
// For SCRAM password
202202
let ci_scram = ci_from_hba(hba, false);
203203
let pw_scram = format!("{}secret", SCRAM_SHA_256);
204-
assert_eq!(eval_hba_for_pool_password(&pw_scram, &ci_scram), CheckResult::Deny);
204+
assert_eq!(
205+
eval_hba_for_pool_password(&pw_scram, &ci_scram),
206+
CheckResult::Deny
207+
);
205208

206209
// For MD5 password
207210
let ci_md5 = ci_from_hba(hba, false);
208211
let pw_md5 = format!("{}hash", MD5_PASSWORD_PREFIX);
209-
assert_eq!(eval_hba_for_pool_password(&pw_md5, &ci_md5), CheckResult::Deny);
212+
assert_eq!(
213+
eval_hba_for_pool_password(&pw_md5, &ci_md5),
214+
CheckResult::Deny
215+
);
210216
}
211217

212218
#[test]
@@ -217,10 +223,16 @@ fn local_then_host_rule_behaves_like_host_only() {
217223
// MD5 password should be allowed by the host md5 rule
218224
let ci_md5 = ci_from_hba(hba, false);
219225
let pw_md5 = format!("{}hash", MD5_PASSWORD_PREFIX);
220-
assert_eq!(eval_hba_for_pool_password(&pw_md5, &ci_md5), CheckResult::Allow);
226+
assert_eq!(
227+
eval_hba_for_pool_password(&pw_md5, &ci_md5),
228+
CheckResult::Allow
229+
);
221230

222231
// SCRAM password should be allowed to proceed (scram NotMatched, md5 Allow → overall Allow)
223232
let ci_scram = ci_from_hba(hba, false);
224233
let pw_scram = format!("{}secret", SCRAM_SHA_256);
225-
assert_eq!(eval_hba_for_pool_password(&pw_scram, &ci_scram), CheckResult::Allow);
234+
assert_eq!(
235+
eval_hba_for_pool_password(&pw_scram, &ci_scram),
236+
CheckResult::Allow
237+
);
226238
}

src/auth/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::marker::Unpin;
1111

1212
// External crate imports
1313
use crate::auth::hba::CheckResult;
14-
use log::{error, warn, info};
14+
use log::{error, info, warn};
1515
use tokio::io::{AsyncReadExt, AsyncWriteExt};
1616
// Internal crate imports
1717
use crate::auth::jwt::get_user_name_from_jwt;
@@ -53,7 +53,10 @@ where
5353
if client_identifier.hba_md5 == CheckResult::Trust
5454
|| client_identifier.hba_scram == CheckResult::Trust
5555
{
56-
info!("HBA trust for admin user: {username_from_parameters} from: {:?}.", client_identifier.addr);
56+
info!(
57+
"HBA trust for admin user: {username_from_parameters} from: {:?}.",
58+
client_identifier.addr
59+
);
5760
return Ok((false, ServerParameters::admin(), false));
5861
}
5962
if client_identifier.hba_md5 == CheckResult::Deny

src/client.rs

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,7 @@ where
902902
Some((process_id, secret_key, address, port)) => {
903903
{
904904
let mut cancel_guard = CANCELED_PIDS.lock();
905-
cancel_guard.push(*process_id);
905+
cancel_guard.insert(*process_id);
906906
}
907907
(*process_id, *secret_key, address.clone(), *port)
908908
}
@@ -945,20 +945,15 @@ where
945945
self.stats.disconnect();
946946
return Ok(());
947947
}
948-
tokio::select! {
949-
_ = self.shutdown.recv() => {
950-
if !self.admin {
951-
warn!("Dropping client {:?} because connection pooler is shutting down", self.addr);
952-
error_response_terminal(
953-
&mut self.write,
954-
"pooler is shut down now",
955-
"58006"
956-
).await?;
957-
self.stats.disconnect();
958-
return Ok(());
959-
}
960-
},
961-
_ = tokio::task::yield_now() => {}
948+
if self.shutdown.try_recv().is_ok() && !self.admin {
949+
warn!(
950+
"Dropping client {:?} because connection pooler is shutting down",
951+
self.addr
952+
);
953+
error_response_terminal(&mut self.write, "pooler is shut down now", "58006")
954+
.await?;
955+
self.stats.disconnect();
956+
return Ok(());
962957
}
963958
// Handle admin database queries.
964959
if self.admin {
@@ -977,16 +972,16 @@ where
977972

978973
match message[0] as char {
979974
'Q' => {
980-
if self.pooler_check_query_request_vec.eq(&message.to_vec()) {
975+
if self.pooler_check_query_request_vec.as_slice() == &message[..] {
981976
// This is the first message in the transaction, since we are responding with 'IZ',
982977
// then we can not expect a server connection and immediately send answer and exit transaction loop.
983978
write_all_flush(&mut self.write, &check_query_response()).await?;
984979
continue;
985980
}
986981
if message.len() < 40 && message.len() > QUERY_DEALLOCATE.len() + 5 {
987982
// Do not pass simple query with deallocate, as it will run on an unknown server.
988-
let query = message[5..QUERY_DEALLOCATE.len() + 5].to_vec();
989-
if QUERY_DEALLOCATE.eq(&query) {
983+
let query = &message[5..QUERY_DEALLOCATE.len() + 5];
984+
if QUERY_DEALLOCATE == query {
990985
write_all_flush(&mut self.write, &deallocate_response()).await?;
991986
continue;
992987
}
@@ -1046,7 +1041,7 @@ where
10461041
{
10471042
let mut guard = CANCELED_PIDS.lock();
10481043
if guard.contains(&conn.get_process_id()) {
1049-
guard.retain(|&id| id != conn.get_process_id());
1044+
guard.remove(&conn.get_process_id());
10501045
conn.mark_bad("because was canceled");
10511046
continue; // try to find another server.
10521047
}
@@ -1855,12 +1850,12 @@ where
18551850
self.stats.active_write();
18561851
if let Err(err_write) = write_all_flush(&mut self.write, &response).await {
18571852
server.wait_available().await;
1858-
// Lazy error formatting
1859-
let mut msg = String::with_capacity(64);
1860-
use std::fmt::Write;
1861-
let _ = write!(msg, "flush to client {} {:?}", self.addr, err_write);
1862-
server.mark_bad(&msg);
1863-
return Err(err_write);
1853+
if server.is_async() || server.in_copy_mode() {
1854+
server.mark_bad(
1855+
format!("flush to client {} {:?}", self.addr, err_write).as_str(),
1856+
);
1857+
return Err(err_write);
1858+
}
18641859
}
18651860

18661861
self.stats.active_idle();

src/config.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use std::hash::{Hash, Hasher};
1313
use std::mem;
1414
use std::net::IpAddr;
1515
use std::path::Path;
16-
use std::sync::atomic::{AtomicU64, Ordering};
1716
use std::sync::Arc;
1817
use tokio::fs::File;
1918
use tokio::io::AsyncReadExt;
@@ -51,8 +50,6 @@ pub struct Address {
5150
pub pool_name: String,
5251
/// Address stats
5352
pub stats: Arc<AddressStats>,
54-
/// Number of errors encountered since last successful checkout
55-
pub error_count: Arc<AtomicU64>,
5653
}
5754

5855
impl Default for Address {
@@ -66,7 +63,6 @@ impl Default for Address {
6663
password: String::from("password"),
6764
pool_name: String::from("pool_name"),
6865
stats: Arc::new(AddressStats::default()),
69-
error_count: Arc::new(AtomicU64::new(0)),
7066
}
7167
}
7268
}
@@ -111,17 +107,6 @@ impl Address {
111107
pub fn name(&self) -> String {
112108
self.pool_name.clone() + "-" + &*self.virtual_pool_id.to_string()
113109
}
114-
pub fn error_count(&self) -> u64 {
115-
self.error_count.load(Ordering::Relaxed)
116-
}
117-
118-
pub fn increment_error_count(&self) {
119-
self.error_count.fetch_add(1, Ordering::Relaxed);
120-
}
121-
122-
pub fn reset_error_count(&self) {
123-
self.error_count.store(0, Ordering::Relaxed);
124-
}
125110
}
126111

127112
/// Pool mode:

src/pool.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use log::{error, info, warn};
44
use lru::LruCache;
55
use once_cell::sync::Lazy;
66
use parking_lot::Mutex;
7-
use std::collections::HashMap;
7+
use std::collections::{HashMap, HashSet};
88
use std::fmt::{Display, Formatter};
99
use std::num::NonZeroUsize;
10-
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
10+
use std::sync::atomic::{AtomicUsize, Ordering};
1111
use std::sync::Arc;
1212
use std::time::Duration;
1313

@@ -31,8 +31,8 @@ pub type PoolMap = HashMap<PoolIdentifierVirtual, ConnectionPool>;
3131
/// This is atomic and safe and read-optimized.
3232
/// The pool is recreated dynamically when the config is reloaded.
3333
pub static POOLS: Lazy<ArcSwap<PoolMap>> = Lazy::new(|| ArcSwap::from_pointee(HashMap::default()));
34-
pub static CANCELED_PIDS: Lazy<Arc<Mutex<Vec<ProcessId>>>> =
35-
Lazy::new(|| Arc::new(Mutex::new(Vec::new())));
34+
pub static CANCELED_PIDS: Lazy<Arc<Mutex<HashSet<ProcessId>>>> =
35+
Lazy::new(|| Arc::new(Mutex::new(HashSet::new())));
3636

3737
pub type PreparedStatementCacheType = Arc<Mutex<PreparedStatementCache>>;
3838
pub type ServerParametersType = Arc<tokio::sync::Mutex<ServerParameters>>;
@@ -247,7 +247,6 @@ impl ConnectionPool {
247247
password: user.password.clone(),
248248
pool_name: pool_name.clone(),
249249
stats: Arc::new(AddressStats::default()),
250-
error_count: Arc::new(AtomicU64::new(0)),
251250
};
252251

253252
let prepared_statements_cache_size = match config.general.prepared_statements {

0 commit comments

Comments
 (0)