Skip to content

Commit f1265a5

Browse files
authored
Introduce tcp_keepalives to PgCat (#315)
We have encountered a case where PgCat pools were stuck following a database incident. Our best understanding at this point is that the PgCat -> Postgres connections died silently and because Tokio defaults to disabling keepalives, connections in the pool were marked as busy forever. Only when we deployed PgCat did we see recovery. This PR introduces tcp_keepalives to PgCat. This sets the defaults to be keepalives_idle: 5 # seconds keepalives_interval: 5 # seconds keepalives_count: 5 # a count These settings can detect the death of an idle connection within 30 seconds of its death. Please note that the connection can remain idle forever (from an application perspective) as long as the keepalive packets are flowing so disconnection will only occur if the other end is not acknowledging keepalive packets (keepalive packet acks are handled by the OS, the application does not need to do anything). I plan to add tcp_user_timeout in a follow-up PR.
1 parent d81a744 commit f1265a5

File tree

11 files changed

+114
-13
lines changed

11 files changed

+114
-13
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ jobs:
5454
command: "cargo fmt --check"
5555
- run:
5656
name: "Install dependencies"
57-
command: "sudo apt-get update && sudo apt-get install -y psmisc postgresql-contrib-12 postgresql-client-12 ruby ruby-dev libpq-dev python3 python3-pip lcov llvm-11 && sudo apt-get upgrade curl"
57+
command: "sudo apt-get update && sudo apt-get install -y psmisc postgresql-contrib-12 postgresql-client-12 ruby ruby-dev libpq-dev python3 python3-pip lcov llvm-11 iproute2 && sudo apt-get upgrade curl"
5858
- run:
5959
name: "Install rust tools"
6060
command: "cargo install cargo-binutils rustfilt && rustup component add llvm-tools-preview"

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ hyper = { version = "0.14", features = ["full"] }
3535
phf = { version = "0.11.1", features = ["macros"] }
3636
exitcode = "1.1.2"
3737
futures = "0.3"
38+
socket2 = { version = "0.4.7", features = ["all"] }
3839

3940
[target.'cfg(not(target_env = "msvc"))'.dependencies]
4041
jemallocator = "0.5.0"

src/config.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,13 @@ pub struct General {
160160
#[serde(default = "General::default_idle_timeout")]
161161
pub idle_timeout: u64,
162162

163+
#[serde(default = "General::default_tcp_keepalives_idle")]
164+
pub tcp_keepalives_idle: u64,
165+
#[serde(default = "General::default_tcp_keepalives_count")]
166+
pub tcp_keepalives_count: u32,
167+
#[serde(default = "General::default_tcp_keepalives_interval")]
168+
pub tcp_keepalives_interval: u64,
169+
163170
#[serde(default)] // False
164171
pub log_client_connections: bool,
165172

@@ -203,6 +210,21 @@ impl General {
203210
1000
204211
}
205212

213+
// These keepalive defaults should detect a dead connection within 30 seconds.
214+
// Tokio defaults to disabling keepalives which keeps dead connections around indefinitely.
215+
// This can lead to permenant server pool exhaustion
216+
pub fn default_tcp_keepalives_idle() -> u64 {
217+
5 // 5 seconds
218+
}
219+
220+
pub fn default_tcp_keepalives_count() -> u32 {
221+
5 // 5 time
222+
}
223+
224+
pub fn default_tcp_keepalives_interval() -> u64 {
225+
5 // 5 seconds
226+
}
227+
206228
pub fn default_idle_timeout() -> u64 {
207229
60000 // 10 minutes
208230
}
@@ -242,6 +264,9 @@ impl Default for General {
242264
healthcheck_delay: Self::default_healthcheck_delay(),
243265
ban_time: Self::default_ban_time(),
244266
worker_threads: Self::default_worker_threads(),
267+
tcp_keepalives_idle: Self::default_tcp_keepalives_idle(),
268+
tcp_keepalives_count: Self::default_tcp_keepalives_count(),
269+
tcp_keepalives_interval: Self::default_tcp_keepalives_interval(),
245270
log_client_connections: false,
246271
log_client_disconnections: false,
247272
autoreload: false,

src/messages.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
/// Helper functions to send one-off protocol messages
22
/// and handle TcpStream (TCP socket).
33
use bytes::{Buf, BufMut, BytesMut};
4+
use log::error;
45
use md5::{Digest, Md5};
6+
use socket2::{SockRef, TcpKeepalive};
57
use tokio::io::{AsyncReadExt, AsyncWriteExt};
68
use tokio::net::TcpStream;
79

10+
use crate::config::get_config;
811
use crate::errors::Error;
912
use std::collections::HashMap;
1013
use std::io::{BufRead, Cursor};
1114
use std::mem;
15+
use std::time::Duration;
1216

1317
/// Postgres data type mappings
1418
/// used in RowDescription ('T') message.
@@ -550,6 +554,26 @@ pub fn server_parameter_message(key: &str, value: &str) -> BytesMut {
550554
server_info
551555
}
552556

557+
pub fn configure_socket(stream: &TcpStream) {
558+
let sock_ref = SockRef::from(stream);
559+
let conf = get_config();
560+
561+
match sock_ref.set_keepalive(true) {
562+
Ok(_) => {
563+
match sock_ref.set_tcp_keepalive(
564+
&TcpKeepalive::new()
565+
.with_interval(Duration::from_secs(conf.general.tcp_keepalives_interval))
566+
.with_retries(conf.general.tcp_keepalives_count)
567+
.with_time(Duration::from_secs(conf.general.tcp_keepalives_idle)),
568+
) {
569+
Ok(_) => (),
570+
Err(err) => error!("Could not configure socket: {}", err),
571+
}
572+
}
573+
Err(err) => error!("Could not configure socket: {}", err),
574+
}
575+
}
576+
553577
pub trait BytesMutReader {
554578
fn read_string(&mut self) -> Result<String, Error>;
555579
}

src/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ impl Server {
9292
)));
9393
}
9494
};
95+
configure_socket(&stream);
9596

9697
trace!("Sending StartupMessage");
9798

@@ -368,6 +369,7 @@ impl Server {
368369
return Err(Error::SocketError(format!("Error reading cancel message")));
369370
}
370371
};
372+
configure_socket(&stream);
371373

372374
debug!("Sending CancelRequest");
373375

tests/docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
FROM rust:bullseye
22

3-
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov sudo curl -y
3+
RUN apt-get update && apt-get install llvm-11 psmisc postgresql-contrib postgresql-client ruby ruby-dev libpq-dev python3 python3-pip lcov curl sudo iproute2 -y
44
RUN cargo install cargo-binutils rustfilt
55
RUN rustup component add llvm-tools-preview

tests/ruby/helpers/pgcat_helper.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55

66
module Helpers
77
module Pgcat
8-
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random")
8+
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info")
99
user = {
1010
"password" => "sharding_user",
1111
"pool_size" => pool_size,
1212
"statement_timeout" => 0,
1313
"username" => "sharding_user"
1414
}
1515

16-
pgcat = PgcatProcess.new("info")
16+
pgcat = PgcatProcess.new(log_level)
1717
primary0 = PgInstance.new(5432, user["username"], user["password"], "shard0")
1818
primary1 = PgInstance.new(7432, user["username"], user["password"], "shard1")
1919
primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
@@ -47,15 +47,15 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mod
4747
end
4848
end
4949

50-
def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random")
50+
def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="trace")
5151
user = {
5252
"password" => "sharding_user",
5353
"pool_size" => pool_size,
5454
"statement_timeout" => 0,
5555
"username" => "sharding_user"
5656
}
5757

58-
pgcat = PgcatProcess.new("trace")
58+
pgcat = PgcatProcess.new(log_level)
5959
pgcat_cfg = pgcat.current_config
6060

6161
primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
@@ -92,15 +92,15 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb
9292
end
9393
end
9494

95-
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random")
95+
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info")
9696
user = {
9797
"password" => "sharding_user",
9898
"pool_size" => pool_size,
9999
"statement_timeout" => 0,
100100
"username" => "sharding_user"
101101
}
102102

103-
pgcat = PgcatProcess.new("info")
103+
pgcat = PgcatProcess.new(log_level)
104104
pgcat_cfg = pgcat.current_config
105105

106106
primary = PgInstance.new(5432, user["username"], user["password"], "shard0")

tests/ruby/helpers/pgcat_process.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class PgcatProcess
88
attr_reader :pid
99

1010
def self.finalize(pid, log_filename, config_filename)
11-
`kill #{pid}`
11+
`kill #{pid}` if pid
1212
File.delete(config_filename) if File.exist?(config_filename)
1313
File.delete(log_filename) if File.exist?(log_filename)
1414
end
@@ -75,8 +75,11 @@ def wait_until_ready
7575
end
7676

7777
def stop
78+
return unless @pid
79+
7880
`kill #{@pid}`
7981
sleep 0.1
82+
@pid = nil
8083
end
8184

8285
def shutdown

tests/ruby/load_balancing_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
end
8989

9090
context "under heterogeneous load" do
91-
it "balances query volume between all instances based on how busy they are" do
91+
xit "balances query volume between all instances based on how busy they are" do
9292
slow_query_count = 2
9393
threads = Array.new(slow_query_count) do
9494
Thread.new do

0 commit comments

Comments
 (0)