Skip to content

Commit a272ce2

Browse files
authored
Pool checkout tweak (#615)
1 parent a092d1d commit a272ce2

File tree

6 files changed

+122
-109
lines changed

6 files changed

+122
-109
lines changed

cli.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,18 @@ function bench_init() {
2222
PGPASSWORD=pgdog pgbench -h 127.0.0.1 -p 6432 -U pgdog pgdog -i
2323
}
2424

25+
function psql_cmd() {
26+
PGPASSWORD=pgdog psql -h 127.0.0.1 -p 6432 -U pgdog pgdog
27+
}
28+
2529
# Parse command
2630
case "$1" in
2731
admin)
2832
admin
2933
;;
34+
psql)
35+
psql_cmd
36+
;;
3037
binit)
3138
bench_init
3239
;;

pgdog/src/backend/pool/healthcheck.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,16 @@ impl<'a> Healtcheck<'a> {
4949

5050
/// Perform the healtcheck if it's required.
5151
pub async fn healthcheck(&mut self) -> Result<(), Error> {
52-
let healtcheck_age = self.conn.healthcheck_age(self.now);
52+
let health_check_age = self.conn.healthcheck_age(self.now);
5353

54-
if healtcheck_age < self.healthcheck_interval {
54+
if health_check_age < self.healthcheck_interval {
5555
return Ok(());
5656
}
5757

5858
match timeout(self.healthcheck_timeout, self.conn.healthcheck(";")).await {
5959
Ok(Ok(())) => Ok(()),
6060
Ok(Err(err)) => {
61-
error!("healthcheck server error: {} [{}]", err, self.pool.addr());
61+
error!("health check server error: {} [{}]", err, self.pool.addr());
6262
Err(Error::HealthcheckError)
6363
}
6464
Err(_) => Err(Error::HealthcheckError),

pgdog/src/backend/pool/pool_impl.rs

Lines changed: 62 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::time::Duration;
66

77
use once_cell::sync::{Lazy, OnceCell};
88
use parking_lot::{lock_api::MutexGuard, Mutex, RawMutex};
9-
use tokio::time::Instant;
9+
use tokio::time::{timeout, Instant};
1010
use tracing::error;
1111

1212
use crate::backend::{Server, ServerOptions};
@@ -96,60 +96,78 @@ impl Pool {
9696
}
9797

9898
pub async fn get(&self, request: &Request) -> Result<Guard, Error> {
99-
self.get_internal(request).await
99+
match timeout(self.config().checkout_timeout, self.get_internal(request)).await {
100+
Ok(Ok(conn)) => Ok(conn),
101+
Err(_) => {
102+
self.inner.health.toggle(false);
103+
Err(Error::CheckoutTimeout)
104+
}
105+
Ok(Err(err)) => {
106+
self.inner.health.toggle(false);
107+
Err(err.into())
108+
}
109+
}
100110
}
101111

102112
/// Get a connection from the pool.
103113
async fn get_internal(&self, request: &Request) -> Result<Guard, Error> {
104-
let pool = self.clone();
105-
106-
// Fast path, idle connection probably available.
107-
let (server, granted_at, paused) = {
108-
// Ask for time before we acquire the lock
109-
// and only if we actually waited for a connection.
110-
let granted_at = request.created_at;
111-
let elapsed = granted_at.saturating_duration_since(request.created_at);
112-
let mut guard = self.lock();
114+
loop {
115+
let pool = self.clone();
113116

114-
if !guard.online {
115-
return Err(Error::Offline);
116-
}
117+
// Fast path, idle connection probably available.
118+
let (server, granted_at, paused) = {
119+
// Ask for time before we acquire the lock
120+
// and only if we actually waited for a connection.
121+
let granted_at = request.created_at;
122+
let elapsed = granted_at.saturating_duration_since(request.created_at);
123+
let mut guard = self.lock();
117124

118-
let conn = guard.take(request);
125+
if !guard.online {
126+
return Err(Error::Offline);
127+
}
119128

120-
if conn.is_some() {
121-
guard.stats.counts.wait_time += elapsed;
122-
guard.stats.counts.server_assignment_count += 1;
123-
}
129+
let conn = guard.take(request);
124130

125-
(conn, granted_at, guard.paused)
126-
};
131+
if conn.is_some() {
132+
guard.stats.counts.wait_time += elapsed;
133+
guard.stats.counts.server_assignment_count += 1;
134+
}
127135

128-
if paused {
129-
self.comms().ready.notified().await;
130-
}
136+
(conn, granted_at, guard.paused)
137+
};
131138

132-
let (server, granted_at) = if let Some(mut server) = server {
133-
server
134-
.prepared_statements_mut()
135-
.set_capacity(self.inner.config.prepared_statements_limit);
136-
server.set_pooler_mode(self.inner.config.pooler_mode);
137-
(Guard::new(pool, server, granted_at), granted_at)
138-
} else {
139-
// Slow path, pool is empty, will create new connection
140-
// or wait for one to be returned if the pool is maxed out.
141-
let mut waiting = Waiting::new(pool, request)?;
142-
waiting.wait().await?
143-
};
139+
if paused {
140+
self.comms().ready.notified().await;
141+
}
144142

145-
return self
146-
.maybe_healthcheck(
147-
server,
148-
self.inner.config.healthcheck_timeout,
149-
self.inner.config.healthcheck_interval,
150-
granted_at,
151-
)
152-
.await;
143+
let (server, granted_at) = if let Some(mut server) = server {
144+
server
145+
.prepared_statements_mut()
146+
.set_capacity(self.inner.config.prepared_statements_limit);
147+
server.set_pooler_mode(self.inner.config.pooler_mode);
148+
(Guard::new(pool, server, granted_at), granted_at)
149+
} else {
150+
// Slow path, pool is empty, will create new connection
151+
// or wait for one to be returned if the pool is maxed out.
152+
let mut waiting = Waiting::new(pool, request)?;
153+
waiting.wait().await?
154+
};
155+
156+
match self
157+
.maybe_healthcheck(
158+
server,
159+
self.inner.config.healthcheck_timeout,
160+
self.inner.config.healthcheck_interval,
161+
granted_at,
162+
)
163+
.await
164+
{
165+
Ok(conn) => return Ok(conn),
166+
// Try another connection.
167+
Err(Error::HealthcheckError) => continue,
168+
Err(err) => return Err(err),
169+
}
170+
}
153171
}
154172

155173
/// Get server parameters, fetch them if necessary.

pgdog/src/backend/pool/test/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,33 @@ async fn test_idle_healthcheck_loop() {
513513
);
514514
}
515515

516+
#[tokio::test]
517+
async fn test_checkout_timeout() {
518+
crate::logger();
519+
520+
let config = Config {
521+
max: 1,
522+
min: 1,
523+
checkout_timeout: Duration::from_millis(100),
524+
..Default::default()
525+
};
526+
527+
let pool = Pool::new(&PoolConfig {
528+
address: Address::new_test(),
529+
config,
530+
});
531+
pool.launch();
532+
533+
// Hold the only connection
534+
let _conn = pool.get(&Request::default()).await.unwrap();
535+
536+
// Try to get another connection - should timeout
537+
let result = pool.get(&Request::default()).await;
538+
assert!(result.is_err());
539+
assert_eq!(result.unwrap_err(), Error::CheckoutTimeout);
540+
assert!(pool.lock().waiting.is_empty());
541+
}
542+
516543
#[tokio::test]
517544
async fn test_move_conns_to() {
518545
crate::logger();

pgdog/src/backend/pool/waiting.rs

Lines changed: 22 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
1-
use std::ops::Deref;
2-
31
use crate::backend::Server;
42

53
use super::{Error, Guard, Pool, Request};
6-
use tokio::{
7-
sync::oneshot::*,
8-
time::{timeout, Instant},
9-
};
4+
use tokio::{sync::oneshot::*, time::Instant};
105

116
pub(super) struct Waiting {
127
pool: Pool,
@@ -24,6 +19,10 @@ impl Drop for Waiting {
2419
}
2520

2621
impl Waiting {
22+
/// Create new waiter.
23+
///
24+
/// N.B. You must call and await `Waiting::wait`, otherwise you'll leak waiters.
25+
///
2726
pub(super) fn new(pool: Pool, request: &Request) -> Result<Self, Error> {
2827
let request = *request;
2928
let (tx, rx) = channel();
@@ -49,68 +48,27 @@ impl Waiting {
4948

5049
/// Wait for connection from the pool.
5150
pub(super) async fn wait(&mut self) -> Result<(Guard, Instant), Error> {
52-
let checkout_timeout = self.pool.inner().config.checkout_timeout;
5351
let rx = self.rx.take().expect("waiter rx taken");
5452

55-
// Make this cancellation-safe.
56-
let mut wait_guard = WaitGuard::new(self);
57-
let server = timeout(checkout_timeout, rx).await;
58-
wait_guard.disarm();
59-
drop(wait_guard);
53+
// Can be cancelled. Drop will remove the waiter from the queue.
54+
let server = rx.await;
55+
56+
// Disarm the guard. We can't be cancelled beyond this point.
57+
self.waiting = false;
6058

6159
let now = Instant::now();
6260
match server {
63-
Ok(Ok(server)) => {
61+
Ok(server) => {
6462
let server = server?;
6563
Ok((Guard::new(self.pool.clone(), server, now), now))
6664
}
6765

68-
Err(_err) => {
69-
let mut guard = self.pool.lock();
70-
guard.remove_waiter(&self.request.id);
71-
self.pool.inner().health.toggle(false);
66+
Err(_) => {
67+
// Should not be possible.
68+
// This means someone else removed my waiter from the wait queue,
69+
// indicating a bug in the pool.
7270
Err(Error::CheckoutTimeout)
7371
}
74-
75-
// Should not be possible.
76-
// This means someone else removed my waiter from the wait queue,
77-
// indicating a bug in the pool.
78-
Ok(Err(_)) => Err(Error::CheckoutTimeout),
79-
}
80-
}
81-
}
82-
83-
struct WaitGuard<'a> {
84-
waiting: &'a Waiting,
85-
armed: bool,
86-
}
87-
88-
impl<'a> Deref for WaitGuard<'a> {
89-
type Target = &'a Waiting;
90-
91-
fn deref(&self) -> &Self::Target {
92-
&self.waiting
93-
}
94-
}
95-
96-
impl<'a> WaitGuard<'a> {
97-
fn new(waiting: &'a Waiting) -> Self {
98-
Self {
99-
waiting,
100-
armed: true,
101-
}
102-
}
103-
104-
fn disarm(&mut self) {
105-
self.armed = false;
106-
}
107-
}
108-
109-
impl Drop for WaitGuard<'_> {
110-
fn drop(&mut self) {
111-
if self.armed {
112-
let id = self.waiting.request.id;
113-
self.waiting.pool.lock().remove_waiter(&id);
11472
}
11573
}
11674
}
@@ -126,7 +84,7 @@ mod tests {
12684
use super::*;
12785
use crate::backend::pool::Pool;
12886
use crate::net::messages::BackendKeyData;
129-
use tokio::time::{sleep, Duration};
87+
use tokio::time::{sleep, timeout, Duration};
13088

13189
#[tokio::test]
13290
async fn test_cancellation_safety() {
@@ -200,12 +158,14 @@ mod tests {
200158
let _conn = pool.get(&Request::default()).await.unwrap();
201159

202160
let request = Request::new(BackendKeyData::new());
203-
let mut waiting = Waiting::new(pool.clone(), &request).unwrap();
204-
205-
let result = waiting.wait().await;
161+
let waiter_pool = pool.clone();
162+
let get_conn = async move {
163+
let mut waiting = Waiting::new(waiter_pool.clone(), &request).unwrap();
164+
waiting.wait().await
165+
};
166+
let result = timeout(Duration::from_millis(100), get_conn).await;
206167

207168
assert!(result.is_err());
208-
assert!(matches!(result.unwrap_err(), Error::CheckoutTimeout));
209169

210170
let pool_guard = pool.lock();
211171
assert!(

pgdog/src/backend/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,7 @@ impl Server {
653653
debug!("running healthcheck \"{}\" [{}]", query, self.addr);
654654

655655
self.execute(query).await?;
656+
656657
self.stats.healthcheck();
657658

658659
Ok(())

0 commit comments

Comments
 (0)