Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 71 additions & 7 deletions src/client/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,29 @@
/// When the buffer reaches this size, it will be flushed to avoid excessive memory usage.
const BUFFER_FLUSH_THRESHOLD: usize = 8192;

/// RAII guard for CLIENTS_IN_TRANSACTIONS counter.
/// Increments on creation, decrements on drop.
struct TransactionGuard;

impl TransactionGuard {
fn new() -> Self {
CLIENTS_IN_TRANSACTIONS.fetch_add(1, Ordering::Relaxed);
Self
}
}

impl Drop for TransactionGuard {
fn drop(&mut self) {
CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed);
}
}

/// Result of waiting for the next client message while monitoring server liveness.
enum NextClientMessage {
Message(BytesMut),
ServerDead,
}

/// Action to take after processing a message in the transaction loop
enum TransactionAction {
/// Continue processing messages in the transaction loop
Expand Down Expand Up @@ -176,6 +199,26 @@
Ok(())
}

/// Wait for the next client message while monitoring server connection liveness.
/// Uses `select!` to race client read against server readability, detecting dead
/// server connections (e.g., `pg_terminate_backend`) while client is idle in transaction.
async fn wait_for_next_message(&mut self, server: &Server) -> Result<NextClientMessage, Error> {
loop {
tokio::select! {
biased;
result = read_message(&mut self.read, self.max_memory_usage) => {
return result.map(NextClientMessage::Message);
}
_ = server.server_readable() => {
if server.check_server_alive() {
continue;
}
return Ok(NextClientMessage::ServerDead);
}
}
}
}

/// Handle cancel mode - when client wants to cancel a previously issued query.
/// Opens a new separate connection to the server, sends the backend_id
/// and secret_key and then closes it for security reasons.
Expand Down Expand Up @@ -590,8 +633,9 @@
server.claim(self.process_id, self.secret_key);
self.connected_to_server = true;

// Signal that client is now in transaction (has server connection)
CLIENTS_IN_TRANSACTIONS.fetch_add(1, Ordering::Relaxed);
// RAII guard: increments CLIENTS_IN_TRANSACTIONS now,
// decrements automatically when this block exits (normal or early return).
let _tx_guard = TransactionGuard::new();

// Update statistics
self.stats.active_idle();
Expand Down Expand Up @@ -660,11 +704,32 @@
let message = match initial_message {
None => {
self.stats.active_read();
match read_message(&mut self.read, self.max_memory_usage).await {
Ok(message) => message,
match self.wait_for_next_message(server).await {
Ok(NextClientMessage::Message(msg)) => msg,
Ok(NextClientMessage::ServerDead) => {
warn!(

Check failure

Code scanning / CodeQL

Cleartext logging of sensitive information High

This operation writes
wrong_password(...)
to a log file.
This operation writes
read_password(...)
to a log file.
This operation writes
wrong_password(...)
to a log file.
This operation writes
plain_password_challenge(...)
to a log file.
This operation writes
read_password(...)
to a log file.
This operation writes
read_password(...)
to a log file.
This operation writes
read_password(...)
to a log file.
This operation writes
read_password(...)
to a log file.
This operation writes
plain_password_challenge(...)
to a log file.
This operation writes
read_password(...)
to a log file.
This operation writes
wrong_password(...)
to a log file.
This operation writes
read_password(...)
to a log file.
This operation writes
read_password(...)
to a log file.
This operation writes
wrong_password(...)
to a log file.
This operation writes
read_password(...)
to a log file.
This operation writes
wrong_password(...)
to a log file.
This operation writes
plain_password_challenge(...)
to a log file.
This operation writes
read_password(...)
to a log file.

Copilot Autofix

AI 5 days ago

Copilot could not generate an autofix suggestion

Copilot could not generate an autofix suggestion for this alert. Try pushing a new commit or if the problem persists contact support.

"Server {} connection died while client {} idle in transaction",
server, self.addr
);
server.mark_bad(
"server closed while client idle in transaction",
);
let _ = error_response(
&mut self.write,
"server closed the connection unexpectedly while client was idle in transaction",
"08006",
)
.await;
self.stats.disconnect();
self.connected_to_server = false;
self.release();
return Ok(());
}
Err(err) => {
self.stats.disconnect();
self.connected_to_server = false;
server.checkin_cleanup().await?;
self.release();
return self.process_error(err).await;
}
}
Expand Down Expand Up @@ -694,9 +759,9 @@

// Terminate
'X' => {
// принудительно закрываем чтобы не допустить длинную транзакцию
server.checkin_cleanup().await?;
self.stats.disconnect();
self.connected_to_server = false;
self.release();
return Ok(());
}
Expand Down Expand Up @@ -790,8 +855,7 @@
self.client_last_messages_in_tx.clear();
}

// Signal that client finished transaction (released server connection)
CLIENTS_IN_TRANSACTIONS.fetch_sub(1, Ordering::Relaxed);
// TransactionGuard dropped at end of block above, counter already decremented.
self.connected_to_server = false;

// If shutdown is in progress, send error to client and exit
Expand Down
19 changes: 19 additions & 0 deletions src/server/server_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,25 @@ impl Server {
self.bad = true;
}

/// Returns a future that completes when the server socket becomes readable.
/// Between queries in a transaction, BufStream is empty (everything was read
/// up to ReadyForQuery), so readable on the underlying socket correctly
/// reflects new data from the server (e.g., FATAL after idle_in_transaction_session_timeout).
pub async fn server_readable(&self) {
let _ = self.stream.get_ref().readable().await;
}

/// Verify that server_readable() readiness is genuine, not spurious.
/// Returns true if the connection is alive (WouldBlock = no real data).
/// Returns false if the server sent data or closed the connection (dead).
pub fn check_server_alive(&self) -> bool {
let mut buf = [0u8; 1];
matches!(
self.stream.get_ref().try_read(&mut buf),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock
)
}

/// Server & client are out of sync, we must discard this connection.
/// This happens with clients that misbehave.
pub fn is_bad(&self) -> bool {
Expand Down
19 changes: 19 additions & 0 deletions src/server/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,25 @@ impl StreamInner {
StreamInner::UnixSocket { stream } => stream.try_write(buf),
}
}

/// Waits until the server socket becomes readable (data or EOF/error).
/// Cancel-safe: no data is consumed, only readiness notification.
pub async fn readable(&self) -> std::io::Result<()> {
match self {
StreamInner::TCPPlain { stream } => stream.readable().await,
StreamInner::UnixSocket { stream } => stream.readable().await,
}
}

/// Non-blocking read attempt on the raw socket (bypasses BufStream).
/// Used to verify that `readable()` readiness is genuine, not spurious
/// from BufStream buffering. Returns WouldBlock if no data available.
pub fn try_read(&self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
StreamInner::TCPPlain { stream } => stream.try_read(buf),
StreamInner::UnixSocket { stream } => stream.try_read(buf),
}
}
}

pub(crate) async fn create_unix_stream_inner(host: &str, port: u16) -> Result<StreamInner, Error> {
Expand Down
61 changes: 61 additions & 0 deletions tests/bdd/features/stale-server-detection.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
@rust @rust-4 @stale-server-detection
Feature: Detect stale server connections during client idle in transaction
When a client holds a server connection inside a transaction and the server
terminates (e.g., idle_in_transaction_session_timeout, pg_terminate_backend),
pg_doorman should detect this and release the pool slot.

Background:
Given PostgreSQL started with pg_hba.conf:
"""
local all all trust
host all all 127.0.0.1/32 trust
"""
And fixtures from "tests/fixture.sql" applied

@stale-server-pg-terminate-backend
Scenario: Detect server killed by pg_terminate_backend
# Client holds transaction, then backend is killed via pg_terminate_backend
# pg_doorman should detect via server_readable() and release the slot
Given pg_doorman started with config:
"""
[general]
host = "127.0.0.1"
port = ${DOORMAN_PORT}
admin_username = "admin"
admin_password = "admin"
pg_hba.content = "host all all 127.0.0.1/32 trust"
server_lifetime = 60000
server_idle_check_timeout = 0

[pools.example_db]
server_host = "127.0.0.1"
server_port = ${PG_PORT}

[[pools.example_db.users]]
username = "example_user_1"
password = ""
pool_size = 1

[[pools.example_db.users]]
username = "postgres"
password = ""
pool_size = 2
"""
# Client opens transaction and gets backend_pid
When we create session "victim" to pg_doorman as "example_user_1" with password "" and database "example_db"
When we send SimpleQuery "SELECT pg_backend_pid()" to session "victim" and store backend_pid as "victim_pid"
When we send SimpleQuery "BEGIN" to session "victim" without waiting
Then we read SimpleQuery response from session "victim" within 2000ms
When we send SimpleQuery "SELECT 1" to session "victim" without waiting
Then we read SimpleQuery response from session "victim" within 2000ms
# Kill the backend through a separate superuser connection
When we create session "killer" to pg_doorman as "postgres" with password "" and database "example_db"
When we terminate backend "victim_pid" from session "victim" via session "killer"
# Wait for pg_doorman to detect the dead server
When we sleep for 500 milliseconds
# Pool slot should be released — new client should succeed
When we create session "client2" to pg_doorman as "example_user_1" with password "" and database "example_db"
When we send SimpleQuery "SELECT 2" to session "client2" without waiting
Then we read SimpleQuery response from session "client2" within 5000ms
Then session "client2" should receive DataRow with "2"

Loading