Skip to content

Commit 3fea924

Browse files
authored
feat: client_idle_in_transaction_timeout (#696) (#697)
fix #696
1 parent 2a8e4d1 commit 3fea924

File tree

8 files changed

+133
-6
lines changed

8 files changed

+133
-6
lines changed

integration/rust/src/setup.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ pub async fn connections_sqlx() -> Vec<Pool<Postgres>> {
4444
pools
4545
}
4646

47+
pub async fn connection_sqlx_direct() -> Pool<Postgres> {
48+
PgPoolOptions::new()
49+
.max_connections(1)
50+
.connect("postgres://pgdog:pgdog@127.0.0.1:5432/pgdog?application_name=sqlx_direct")
51+
.await
52+
.unwrap()
53+
}
54+
4755
#[derive(Debug, PartialEq, Clone)]
4856
pub struct Backend {
4957
pub pid: i32,
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use std::time::Duration;
2+
3+
use rust::setup::{admin_sqlx, connection_sqlx_direct, connections_sqlx};
4+
use sqlx::{Executor, Row};
5+
use tokio::time::sleep;
6+
7+
#[tokio::test]
8+
async fn test_idle_in_transaction_timeout() {
9+
let admin = admin_sqlx().await;
10+
admin
11+
.execute("SET client_idle_in_transaction_timeout TO 500")
12+
.await
13+
.unwrap();
14+
15+
let conn_direct = connection_sqlx_direct().await;
16+
17+
for conn in connections_sqlx().await {
18+
let mut conn = conn.acquire().await.unwrap();
19+
20+
conn.execute("BEGIN").await.unwrap();
21+
let pid_before = conn
22+
.fetch_one("SELECT pg_backend_pid()")
23+
.await
24+
.unwrap()
25+
.get::<i32, _>(0);
26+
sleep(Duration::from_millis(750)).await;
27+
let err = conn.execute("SELECT 1").await.unwrap_err();
28+
assert!(err.to_string().contains("idle in transaction"));
29+
30+
sleep(Duration::from_millis(500)).await;
31+
32+
let (pid_after, query): (i32, String) =
33+
sqlx::query_as("SELECT pid, query FROM pg_stat_activity WHERE pid = $1")
34+
.bind(pid_before)
35+
.fetch_one(&conn_direct)
36+
.await
37+
.unwrap();
38+
39+
assert_eq!(
40+
pid_before, pid_after,
41+
"expexted pooler not to cycle connection"
42+
);
43+
assert_eq!(query, "ROLLBACK", "expected a rollback on the connection",);
44+
}
45+
46+
// Reset settings.
47+
admin.execute("RELOAD").await.unwrap();
48+
}

integration/rust/tests/integration/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod cross_shard_disabled;
66
pub mod distinct;
77
pub mod explain;
88
pub mod fake_transactions;
9+
pub mod idle_in_transaction;
910
pub mod maintenance_mode;
1011
pub mod notify;
1112
pub mod per_stmt_routing;

pgdog-config/src/general.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ pub struct General {
135135
/// Client idle timeout.
136136
#[serde(default = "General::default_client_idle_timeout")]
137137
pub client_idle_timeout: u64,
138+
/// Client idle in transaction timeout.
139+
#[serde(default = "General::default_client_idle_in_transaction_timeout")]
140+
pub client_idle_in_transaction_timeout: u64,
138141
/// Server lifetime.
139142
#[serde(default = "General::server_lifetime")]
140143
pub server_lifetime: u64,
@@ -236,6 +239,7 @@ impl Default for General {
236239
dry_run: Self::dry_run(),
237240
idle_timeout: Self::idle_timeout(),
238241
client_idle_timeout: Self::default_client_idle_timeout(),
242+
client_idle_in_transaction_timeout: Self::default_client_idle_in_transaction_timeout(),
239243
mirror_queue: Self::mirror_queue(),
240244
mirror_exposure: Self::mirror_exposure(),
241245
auth_type: Self::auth_type(),
@@ -364,6 +368,13 @@ impl General {
364368
)
365369
}
366370

371+
fn default_client_idle_in_transaction_timeout() -> u64 {
372+
Self::env_or_default(
373+
"PGDOG_CLIENT_IDLE_IN_TRANSACTION_TIMEOUT",
374+
Duration::MAX.as_millis() as u64,
375+
)
376+
}
377+
367378
fn default_query_timeout() -> u64 {
368379
Self::env_or_default("PGDOG_QUERY_TIMEOUT", Duration::MAX.as_millis() as u64)
369380
}
@@ -384,6 +395,10 @@ impl General {
384395
Duration::from_millis(self.connect_attempt_delay)
385396
}
386397

398+
pub fn client_idle_in_transaction_timeout(&self) -> Duration {
399+
Duration::from_millis(self.client_idle_in_transaction_timeout)
400+
}
401+
387402
fn load_balancing_strategy() -> LoadBalancingStrategy {
388403
Self::env_enum_or_default("PGDOG_LOAD_BALANCING_STRATEGY")
389404
}

pgdog/src/admin/set.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ impl Command for Set {
149149
config.config.general.query_parser = Self::from_json(&self.value)?;
150150
}
151151

152+
"client_idle_in_transaction_timeout" => {
153+
config.config.general.client_idle_in_transaction_timeout = self.value.parse()?;
154+
}
155+
152156
_ => return Err(Error::Syntax),
153157
}
154158

pgdog/src/frontend/client/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ impl Client {
521521
match timeout(idle_timeout, self.stream_buffer.read(&mut self.stream)).await {
522522
Err(_) => {
523523
self.stream
524-
.fatal(ErrorResponse::client_idle_timeout(idle_timeout))
524+
.fatal(ErrorResponse::client_idle_timeout(idle_timeout, &state))
525525
.await?;
526526
return Ok(BufferEvent::DisconnectAbrupt);
527527
}

pgdog/src/frontend/client/timeouts.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ use crate::{config::General, frontend::ClientRequest, state::State};
66
pub struct Timeouts {
77
pub(super) query_timeout: Duration,
88
pub(super) client_idle_timeout: Duration,
9+
pub(super) idle_in_transaction_timeout: Duration,
910
}
1011

1112
impl Default for Timeouts {
1213
fn default() -> Self {
1314
Self {
1415
query_timeout: Duration::MAX,
1516
client_idle_timeout: Duration::MAX,
17+
idle_in_transaction_timeout: Duration::MAX,
1618
}
1719
}
1820
}
@@ -22,6 +24,7 @@ impl Timeouts {
2224
Self {
2325
query_timeout: general.query_timeout(),
2426
client_idle_timeout: general.client_idle_timeout(),
27+
idle_in_transaction_timeout: general.client_idle_in_transaction_timeout(),
2528
}
2629
}
2730

@@ -48,7 +51,40 @@ impl Timeouts {
4851
Duration::MAX
4952
}
5053
}
54+
State::IdleInTransaction => {
55+
// Client is sending the request, don't fire.
56+
if !client_request.messages.is_empty() {
57+
Duration::MAX
58+
} else {
59+
self.idle_in_transaction_timeout
60+
}
61+
}
62+
5163
_ => Duration::MAX,
5264
}
5365
}
5466
}
67+
68+
#[cfg(test)]
69+
mod test {
70+
71+
use crate::{config::config, net::Query};
72+
73+
use super::*;
74+
75+
#[test]
76+
fn test_idle_in_transaction_timeout() {
77+
let config = config(); // Will be default.
78+
let timeout = Timeouts::from_config(&config.config.general);
79+
80+
let actual = timeout.client_idle_timeout(&State::IdleInTransaction, &ClientRequest::new());
81+
assert_eq!(actual, timeout.idle_in_transaction_timeout);
82+
assert_eq!(actual.as_millis(), u64::MAX.into());
83+
84+
let actual = timeout.client_idle_timeout(
85+
&State::IdleInTransaction,
86+
&ClientRequest::from(vec![Query::new("SELECT 1").into()]),
87+
);
88+
assert_eq!(actual, Duration::MAX);
89+
}
90+
}

pgdog/src/net/messages/error_response.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ use std::fmt::Display;
44
use std::time::Duration;
55

66
use super::prelude::*;
7-
use crate::net::{c_string_buf, code};
7+
use crate::{
8+
net::{c_string_buf, code},
9+
state::State,
10+
};
811

912
/// ErrorResponse (B) message.
1013
#[derive(Debug, Clone)]
@@ -50,7 +53,7 @@ impl ErrorResponse {
5053
}
5154

5255
pub fn client_login_timeout(timeout: Duration) -> ErrorResponse {
53-
let mut error = Self::client_idle_timeout(timeout);
56+
let mut error = Self::client_idle_timeout(timeout, &State::Active);
5457
error.message = "client login timeout".into();
5558
error.detail = Some(format!(
5659
"client_login_timeout of {}ms expired",
@@ -89,13 +92,25 @@ impl ErrorResponse {
8992
}
9093
}
9194

92-
pub fn client_idle_timeout(duration: Duration) -> ErrorResponse {
95+
pub fn client_idle_timeout(duration: Duration, state: &State) -> ErrorResponse {
9396
ErrorResponse {
9497
severity: "FATAL".into(),
9598
code: "57P05".into(),
96-
message: "disconnecting idle client".into(),
99+
message: format!(
100+
"disconnecting {} client",
101+
if state == &State::IdleInTransaction {
102+
"idle in transaction"
103+
} else {
104+
"idle"
105+
}
106+
),
97107
detail: Some(format!(
98-
"client_idle_timeout of {}ms expired",
108+
"{} of {}ms expired",
109+
if state == &State::IdleInTransaction {
110+
"client_idle_in_transaction_timeout"
111+
} else {
112+
"client_idle_timeout"
113+
},
99114
duration.as_millis()
100115
)),
101116
context: None,

0 commit comments

Comments
 (0)