Skip to content

Commit c8f474e

Browse files
authored
feat: track server last sent/received (#728)
### Description - Track `last_sent` / `last_received` for debugging stuck connections - Remove contention from server stats mutex - one mutex per connection for stats instead - time stats are now subms precision (e.g. 32.342ms) including in admin DB and Prometheus
1 parent 1a7df2e commit c8f474e

File tree

15 files changed

+351
-245
lines changed

15 files changed

+351
-245
lines changed

cli.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# Dev CLI.
44
#
55

6+
set -ex
7+
68
# Connect to the admin database.
79
function admin() {
810
PGPASSWORD=pgdog psql -h 127.0.0.1 -p 6432 -U admin admin
@@ -15,7 +17,7 @@ function admin() {
1517
# - protocol: simple|extended|prepared
1618
#
1719
function bench() {
18-
PGPASSWORD=pgdog pgbench -h 127.0.0.1 -p 6432 -U pgdog pgdog --protocol ${2:-simple} -t 100000 -c 10 -P 1 -S
20+
PGPASSWORD=pgdog pgbench -h 127.0.0.1 -p 6432 -U pgdog pgdog --protocol ${1:-simple} -t 100000000 -c 10 -P 1 -S
1921
}
2022

2123
function bench_init() {

pgdog-stats/src/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ pub struct Stats {
9595
pub last_checkout: Counts,
9696
pub pool_id: u64,
9797
pub memory: MemoryStats,
98+
pub last_sent: u8,
99+
pub last_received: u8,
98100
}
99101

100102
impl Default for Stats {
@@ -106,6 +108,8 @@ impl Default for Stats {
106108
last_checkout: Counts::default(),
107109
pool_id: 0,
108110
memory: MemoryStats::default(),
111+
last_sent: 0,
112+
last_received: 0,
109113
}
110114
}
111115
}

pgdog/src/admin/show_server_memory.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@ impl Command for ShowServerMemory {
3838
let stats = stats();
3939
for (_, server) in stats {
4040
let mut row = DataRow::new();
41-
let stats = server.stats;
42-
let memory = &stats.memory;
41+
let memory = &server.stats.memory;
4342

44-
row.add(stats.pool_id as i64)
43+
row.add(server.stats.pool_id as i64)
4544
.add(server.addr.database_name.as_str())
4645
.add(server.addr.user.as_str())
4746
.add(server.addr.host.as_str())
4847
.add(server.addr.port as i64)
49-
.add(stats.id.pid as i64)
48+
.add(server.stats.id.pid as i64)
5049
.add(memory.buffer.reallocs as i64)
5150
.add(memory.buffer.reclaims as i64)
5251
.add(memory.buffer.bytes_used as i64)

pgdog/src/admin/show_servers.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl Command for ShowServers {
5353
Field::text("connect_time"),
5454
Field::text("request_time"),
5555
Field::numeric("remote_pid"),
56-
// Field::bigint("client_id"),
56+
Field::bigint("client_id"),
5757
Field::numeric("transactions"),
5858
Field::numeric("queries"),
5959
Field::numeric("rollbacks"),
@@ -62,6 +62,8 @@ impl Command for ShowServers {
6262
Field::numeric("errors"),
6363
Field::numeric("bytes_received"),
6464
Field::numeric("bytes_sent"),
65+
Field::text("last_sent"),
66+
Field::text("last_received"),
6567
Field::numeric("age"),
6668
Field::text("application_name"),
6769
],
@@ -78,32 +80,45 @@ impl Command for ShowServers {
7880
let now_time = SystemTime::now();
7981

8082
for (_, server) in stats {
81-
let stats = server.stats;
82-
let age = now.duration_since(stats.created_at);
83-
let request_age = now.duration_since(stats.last_used);
83+
let age = now.duration_since(server.stats.created_at);
84+
let request_age = now.duration_since(server.stats.last_used);
8485
let request_time = now_time - request_age;
8586

8687
let dr = self
8788
.row
8889
.clone()
89-
.add("pool_id", stats.pool_id)
90+
.add("pool_id", server.stats.pool_id)
9091
.add("database", server.addr.database_name)
9192
.add("user", server.addr.user)
9293
.add("addr", server.addr.host.as_str())
9394
.add("port", server.addr.port.to_string())
94-
.add("state", stats.state.to_string())
95-
.add("connect_time", format_time(stats.created_at_time.into()))
95+
.add("state", server.stats.state.to_string())
96+
.add(
97+
"connect_time",
98+
format_time(server.stats.created_at_time.into()),
99+
)
96100
.add("request_time", format_time(request_time.into()))
97-
.add("remote_pid", stats.id.pid as i64)
98-
// .add("client_id", stats.client_id.map(|client| client.pid as i64))
99-
.add("transactions", stats.total.transactions)
100-
.add("queries", stats.total.queries)
101-
.add("rollbacks", stats.total.rollbacks)
102-
.add("prepared_statements", stats.total.prepared_statements)
103-
.add("healthchecks", stats.total.healthchecks)
104-
.add("errors", stats.total.errors)
105-
.add("bytes_received", stats.total.bytes_received)
106-
.add("bytes_sent", stats.total.bytes_sent)
101+
.add("remote_pid", server.stats.id.pid as i64)
102+
.add(
103+
"client_id",
104+
server.stats.client_id.map(|client| client.pid as i64),
105+
)
106+
.add("transactions", server.stats.total.transactions)
107+
.add("queries", server.stats.total.queries)
108+
.add("rollbacks", server.stats.total.rollbacks)
109+
.add(
110+
"prepared_statements",
111+
server.stats.total.prepared_statements,
112+
)
113+
.add("healthchecks", server.stats.total.healthchecks)
114+
.add("errors", server.stats.total.errors)
115+
.add("bytes_received", server.stats.total.bytes_received)
116+
.add("bytes_sent", server.stats.total.bytes_sent)
117+
.add("last_sent", (server.stats.last_sent as char).to_string())
118+
.add(
119+
"last_received",
120+
(server.stats.last_received as char).to_string(),
121+
)
107122
.add("age", age.as_secs() as i64)
108123
.add("application_name", server.application_name.as_str())
109124
.data_row();

pgdog/src/admin/show_stats.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! SHOW STATS.
22
use crate::backend::databases::databases;
3+
use crate::util::millis;
34

45
use super::prelude::*;
56

@@ -83,17 +84,17 @@ impl Command for ShowStats {
8384
.add(stat.server_assignment_count)
8485
.add(stat.received)
8586
.add(stat.sent)
86-
.add(stat.xact_time.as_millis() as u64)
87-
.add(stat.idle_xact_time.as_millis() as u64)
88-
.add(stat.query_time.as_millis() as u64)
89-
.add(stat.wait_time.as_millis() as u64)
87+
.add(millis(stat.xact_time))
88+
.add(millis(stat.idle_xact_time))
89+
.add(millis(stat.query_time))
90+
.add(millis(stat.wait_time))
9091
.add(stat.parse_count)
9192
.add(stat.bind_count)
9293
.add(stat.close)
9394
.add(stat.errors)
9495
.add(stat.cleaned)
9596
.add(stat.rollbacks)
96-
.add(stat.connect_time.as_millis() as u64)
97+
.add(millis(stat.connect_time))
9798
.add(stat.connect_count);
9899
}
99100

pgdog/src/backend/pool/connection/binding.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,14 +238,18 @@ impl Binding {
238238
Binding::Direct(Some(server)) => {
239239
debug!(
240240
"server is in \"{}\" state [{}]",
241-
server.stats().state,
241+
server.stats().get_state(),
242242
server.addr()
243243
);
244-
server.stats().state == state
244+
server.stats().get_state() == state
245245
}
246246
Binding::MultiShard(servers, _) => servers.iter().all(|s| {
247-
debug!("server is in \"{}\" state [{}]", s.stats().state, s.addr());
248-
s.stats().state == state
247+
debug!(
248+
"server is in \"{}\" state [{}]",
249+
s.stats().get_state(),
250+
s.addr()
251+
);
252+
s.stats().get_state() == state
249253
}),
250254
_ => true,
251255
}

pgdog/src/backend/pool/guard.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl Guard {
9696
} else {
9797
debug!(
9898
"[cleanup] no cleanup needed, server in \"{}\" state [{}]",
99-
server.stats().state,
99+
server.stats().get_state(),
100100
server.addr(),
101101
);
102102
if let Err(err) = pool.checkin(server) {
@@ -120,7 +120,7 @@ impl Guard {
120120
// Receive whatever data the client left before disconnecting.
121121
debug!(
122122
"[cleanup] draining data from \"{}\" server [{}]",
123-
server.stats().state,
123+
server.stats().get_state(),
124124
server.addr()
125125
);
126126
server.drain().await?;
@@ -138,7 +138,7 @@ impl Guard {
138138
if conn_recovery.can_rollback() {
139139
debug!(
140140
"[cleanup] rolling back server transaction, in \"{}\" state [{}]",
141-
server.stats().state,
141+
server.stats().get_state(),
142142
server.addr(),
143143
);
144144
server.rollback().await?;
@@ -152,7 +152,7 @@ impl Guard {
152152
debug!(
153153
"[cleanup] running {} cleanup queries, server in \"{}\" state [{}]",
154154
cleanup.len(),
155-
server.stats().state,
155+
server.stats().get_state(),
156156
server.addr()
157157
);
158158
server.execute_batch(cleanup.queries()).await?;
@@ -180,7 +180,7 @@ impl Guard {
180180
if sync_prepared {
181181
debug!(
182182
"[cleanup] syncing prepared statements, server in \"{}\" state [{}]",
183-
server.stats().state,
183+
server.stats().get_state(),
184184
server.addr()
185185
);
186186
server.sync_prepared_statements().await?;
@@ -510,7 +510,7 @@ mod test {
510510
.unwrap();
511511

512512
use crate::state::State;
513-
assert_eq!(server.stats().state, State::ForceClose);
513+
assert_eq!(server.stats().get_state(), State::ForceClose);
514514
assert!(server.needs_drain());
515515
}
516516

@@ -556,7 +556,7 @@ mod test {
556556
.unwrap();
557557

558558
use crate::state::State;
559-
assert_eq!(server.stats().state, State::ForceClose);
559+
assert_eq!(server.stats().get_state(), State::ForceClose);
560560
assert!(server.needs_drain());
561561
}
562562

@@ -669,7 +669,7 @@ mod test {
669669
.unwrap();
670670

671671
use crate::state::State;
672-
assert_eq!(server.stats().state, State::ForceClose);
672+
assert_eq!(server.stats().get_state(), State::ForceClose);
673673
assert!(server.in_transaction());
674674
}
675675

@@ -730,7 +730,7 @@ mod test {
730730
.unwrap();
731731

732732
use crate::state::State;
733-
assert_eq!(server.stats().state, State::ForceClose);
733+
assert_eq!(server.stats().get_state(), State::ForceClose);
734734
assert!(server.needs_drain());
735735
assert!(server.in_transaction());
736736
}

pgdog/src/backend/pool/inner.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ impl Inner {
293293
let taken = std::mem::take(&mut self.taken);
294294

295295
for conn in idle.iter_mut() {
296-
conn.stats_mut().pool_id = destination.id();
296+
conn.stats_mut().set_pool_id(destination.id());
297297
}
298298

299299
(idle, taken)
@@ -319,8 +319,8 @@ impl Inner {
319319
result.replenish = false;
320320
// Prevents deadlocks.
321321
if moved.id() != self.id {
322-
server.stats_mut().pool_id = moved.id();
323-
server.stats_mut().update();
322+
server.stats_mut().set_pool_id(moved.id());
323+
server.stats().update();
324324
moved.lock().maybe_check_in(server, now, stats)?;
325325
return Ok(result);
326326
}

pgdog/src/backend/pool/pool_impl.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,15 @@ impl Pool {
220220
let now = if server.pooler_mode() == &PoolerMode::Session {
221221
Instant::now()
222222
} else {
223-
server.stats().last_used
223+
server.stats().last_used()
224224
};
225225

226226
let counts = {
227227
let stats = server.stats_mut();
228-
stats.client_id = None;
229-
stats.reset_last_checkout()
228+
stats.clear_client_id();
229+
let counts = stats.reset_last_checkout();
230+
stats.update();
231+
counts
230232
};
231233

232234
// Check everything and maybe check the connection

pgdog/src/backend/pool/test/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ async fn test_prepared_statements_limit() {
440440
|| guard.prepared_statements_mut().contains("__pgdog_98")
441441
);
442442
assert_eq!(guard.prepared_statements_mut().len(), 2);
443-
assert_eq!(guard.stats().total.prepared_statements, 2); // stats are accurate.
443+
assert_eq!(guard.stats().total().prepared_statements, 2); // stats are accurate.
444444

445445
let pool = pool_with_prepared_capacity(100);
446446

@@ -468,14 +468,14 @@ async fn test_prepared_statements_limit() {
468468
let mut guard = pool.get(&Request::default()).await.unwrap();
469469
assert!(guard.prepared_statements_mut().contains("__pgdog_99"));
470470
assert_eq!(guard.prepared_statements_mut().len(), 100);
471-
assert_eq!(guard.stats().total.prepared_statements, 100); // stats are accurate.
471+
assert_eq!(guard.stats().total().prepared_statements, 100); // stats are accurate.
472472

473473
// Let's make sure Postgres agreees.
474474
guard.sync_prepared_statements().await.unwrap();
475475

476476
assert!(guard.prepared_statements_mut().contains("__pgdog_99"));
477477
assert_eq!(guard.prepared_statements_mut().len(), 100);
478-
assert_eq!(guard.stats().total.prepared_statements, 100); // stats are accurate.
478+
assert_eq!(guard.stats().total().prepared_statements, 100); // stats are accurate.
479479
}
480480

481481
#[tokio::test]
@@ -612,7 +612,7 @@ async fn test_move_conns_to() {
612612
assert_eq!(source.lock().total(), 0);
613613
let new_pool_id = destination.id();
614614
for conn in destination.lock().idle_conns() {
615-
assert_eq!(conn.stats().pool_id, new_pool_id);
615+
assert_eq!(conn.stats().pool_id(), new_pool_id);
616616
}
617617

618618
drop(conn2);

0 commit comments

Comments
 (0)