Skip to content

Commit 8ad8b75

Browse files
pfreixesdjc
authored andcommitted
Adds new statistics attributes for reaped connections
The two new statistics, `connections_closed_max_lifetime` and `connections_closed_idle_timeout` the total number of connections that were reaped due to reching the max lifetime or the idle timeout.
1 parent fc9a823 commit 8ad8b75

File tree

3 files changed

+108
-5
lines changed

3 files changed

+108
-5
lines changed

bb8/src/api.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ pub struct Statistics {
105105
pub connections_closed_broken: u64,
106106
/// Total connections that were closed due to be considered invalid.
107107
pub connections_closed_invalid: u64,
108+
/// Total connections that were closed because they reached the max
109+
/// lifetime.
110+
pub connections_closed_max_lifetime: u64,
111+
/// Total connections that were closed because they reached the max
112+
/// idle timeout.
113+
pub connections_closed_idle_timeout: u64,
108114
}
109115

110116
/// A builder for a connection pool.

bb8/src/internals.rs

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ where
4949

5050
pub(crate) fn reap(&self) -> ApprovalIter {
5151
let mut locked = self.internals.lock();
52-
locked.reap(&self.statics)
52+
let (iter, closed_idle_timeout, closed_max_lifetime) = locked.reap(&self.statics);
53+
drop(locked);
54+
self.statistics
55+
.record_connections_reaped(closed_idle_timeout, closed_max_lifetime);
56+
iter
5357
}
5458

5559
pub(crate) fn forward_error(&self, err: M::Error) {
@@ -139,22 +143,34 @@ where
139143
ApprovalIter { num: num as usize }
140144
}
141145

142-
pub(crate) fn reap(&mut self, config: &Builder<M>) -> ApprovalIter {
146+
pub(crate) fn reap(&mut self, config: &Builder<M>) -> (ApprovalIter, u64, u64) {
147+
let mut closed_max_lifetime = 0;
148+
let mut closed_idle_timeout = 0;
143149
let now = Instant::now();
144150
let before = self.conns.len();
145151

146152
self.conns.retain(|conn| {
147153
let mut keep = true;
148154
if let Some(timeout) = config.idle_timeout {
149-
keep &= now - conn.idle_start < timeout;
155+
if now - conn.idle_start >= timeout {
156+
closed_idle_timeout += 1;
157+
keep &= false;
158+
}
150159
}
151160
if let Some(lifetime) = config.max_lifetime {
152-
keep &= now - conn.conn.birth < lifetime;
161+
if now - conn.conn.birth >= lifetime {
162+
closed_max_lifetime += 1;
163+
keep &= false;
164+
}
153165
}
154166
keep
155167
});
156168

157-
self.dropped((before - self.conns.len()) as u32, config)
169+
(
170+
self.dropped((before - self.conns.len()) as u32, config),
171+
closed_idle_timeout,
172+
closed_max_lifetime,
173+
)
158174
}
159175

160176
pub(crate) fn state(&self, statistics: Statistics) -> State {
@@ -258,6 +274,8 @@ pub(crate) struct AtomicStatistics {
258274
pub(crate) get_wait_time_micros: AtomicU64,
259275
pub(crate) connections_closed_broken: AtomicU64,
260276
pub(crate) connections_closed_invalid: AtomicU64,
277+
pub(crate) connections_closed_max_lifetime: AtomicU64,
278+
pub(crate) connections_closed_idle_timeout: AtomicU64,
261279
}
262280

263281
impl AtomicStatistics {
@@ -282,6 +300,17 @@ impl AtomicStatistics {
282300
}
283301
.fetch_add(1, Ordering::SeqCst);
284302
}
303+
304+
pub(crate) fn record_connections_reaped(
305+
&self,
306+
closed_idle_timeout: u64,
307+
closed_max_lifetime: u64,
308+
) {
309+
self.connections_closed_idle_timeout
310+
.fetch_add(closed_idle_timeout, Ordering::SeqCst);
311+
self.connections_closed_max_lifetime
312+
.fetch_add(closed_max_lifetime, Ordering::SeqCst);
313+
}
285314
}
286315

287316
impl From<&AtomicStatistics> for Statistics {
@@ -293,6 +322,12 @@ impl From<&AtomicStatistics> for Statistics {
293322
get_wait_time: Duration::from_micros(item.get_wait_time_micros.load(Ordering::SeqCst)),
294323
connections_closed_broken: item.connections_closed_broken.load(Ordering::SeqCst),
295324
connections_closed_invalid: item.connections_closed_invalid.load(Ordering::SeqCst),
325+
connections_closed_max_lifetime: item
326+
.connections_closed_max_lifetime
327+
.load(Ordering::SeqCst),
328+
connections_closed_idle_timeout: item
329+
.connections_closed_idle_timeout
330+
.load(Ordering::SeqCst),
296331
}
297332
}
298333
}

bb8/tests/test.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,65 @@ async fn test_now_invalid() {
459459
assert_eq!(pool.state().statistics.connections_closed_invalid, 2);
460460
}
461461

462+
#[tokio::test]
463+
async fn test_idle_timeout() {
464+
static DROPPED: AtomicUsize = AtomicUsize::new(0);
465+
466+
#[derive(Default)]
467+
struct Connection;
468+
impl Drop for Connection {
469+
fn drop(&mut self) {
470+
DROPPED.fetch_add(1, Ordering::SeqCst);
471+
}
472+
}
473+
474+
let manager = NthConnectionFailManager::<Connection>::new(5);
475+
let pool = Pool::builder()
476+
.idle_timeout(Some(Duration::from_secs(1)))
477+
.connection_timeout(Duration::from_secs(1))
478+
.reaper_rate(Duration::from_secs(1))
479+
.max_size(5)
480+
.min_idle(Some(5))
481+
.build(manager)
482+
.await
483+
.unwrap();
484+
485+
let (tx1, rx1) = oneshot::channel();
486+
let (tx2, rx2) = oneshot::channel();
487+
let clone = pool.clone();
488+
tokio::spawn(async move {
489+
let conn = clone.get().await.unwrap();
490+
tx1.send(()).unwrap();
491+
// NB: If we sleep here we'll block this thread's event loop, and the
492+
// reaper can't run.
493+
let _ = rx2
494+
.map(|r| match r {
495+
Ok(v) => Ok((v, conn)),
496+
Err(_) => Err((Error, conn)),
497+
})
498+
.await;
499+
});
500+
501+
rx1.await.unwrap();
502+
503+
// And wait.
504+
assert!(timeout(Duration::from_secs(2), pending::<()>())
505+
.await
506+
.is_err());
507+
assert_eq!(DROPPED.load(Ordering::SeqCst), 4);
508+
509+
tx2.send(()).unwrap();
510+
511+
// And wait some more.
512+
assert!(timeout(Duration::from_secs(3), pending::<()>())
513+
.await
514+
.is_err());
515+
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);
516+
517+
// all 5 idle connections were closed due to max idle time
518+
assert_eq!(pool.state().statistics.connections_closed_idle_timeout, 5);
519+
}
520+
462521
#[tokio::test]
463522
async fn test_max_lifetime() {
464523
static DROPPED: AtomicUsize = AtomicUsize::new(0);
@@ -513,6 +572,9 @@ async fn test_max_lifetime() {
513572
.await
514573
.is_err());
515574
assert_eq!(DROPPED.load(Ordering::SeqCst), 5);
575+
576+
// all 5 connections were closed due to max lifetime
577+
assert_eq!(pool.state().statistics.connections_closed_max_lifetime, 5);
516578
}
517579

518580
#[tokio::test]

0 commit comments

Comments
 (0)