Skip to content

Commit e153e2c

Browse files
authored
Banning & connection pool fixes (#512)
1 parent 57d9a7c commit e153e2c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+3551
-1344
lines changed

integration/failover/dev-server.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#!/bin/bash
2+
set -e
3+
THIS_SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
4+
source ${THIS_SCRIPT_DIR}/../toxi/setup.sh
5+
pushd ${THIS_SCRIPT_DIR}
6+
cargo run
7+
popd

integration/failover/pgdog.toml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
[general]
2+
checkout_timeout = 1_000
3+
connect_timeout = 1_000
4+
ban_timeout = 30_000
5+
query_timeout = 1_000
6+
idle_healthcheck_interval = 1_000
7+
client_login_timeout = 1_000
8+
load_balancing_algorithm = "round_robin"
9+
10+
[[databases]]
11+
name = "failover"
12+
host = "127.0.0.1"
13+
port = 5435
14+
role = "primary"
15+
database_name = "pgdog"
16+
17+
[[databases]]
18+
name = "failover"
19+
host = "127.0.0.1"
20+
port = 5436
21+
role = "replica"
22+
database_name = "pgdog"
23+
read_only = true
24+
25+
[[databases]]
26+
name = "failover"
27+
host = "127.0.0.1"
28+
port = 5437
29+
role = "replica"
30+
database_name = "pgdog"
31+
read_only = true
32+
33+
[[databases]]
34+
name = "failover"
35+
host = "127.0.0.1"
36+
port = 5438
37+
role = "replica"
38+
database_name = "pgdog"
39+
read_only = true
40+
41+
[admin]
42+
password = "pgdog"
43+
user = "admin"

integration/failover/psql.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
export PGPASSWORD=pgdog
3+
psql -h 127.0.0.1 -p 6432 -U pgdog ${1}

integration/failover/users.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[[users]]
2+
database = "failover"
3+
name = "pgdog"
4+
password = "pgdog"

integration/rust/tests/integration/ban.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -70,30 +70,4 @@ async fn test_ban_unban() {
7070
}
7171

7272
ensure_client_state("idle").await;
73-
74-
for (pool, database) in conns
75-
.into_iter()
76-
.zip(["pgdog", "pgdog_sharded"].into_iter())
77-
{
78-
for _ in 0..25 {
79-
pool.execute("SELECT 1").await.unwrap();
80-
}
81-
82-
ban_unban(database, true, false).await;
83-
84-
for _ in 0..25 {
85-
let err = pool.execute("CREATE TABLE test (id BIGINT)").await;
86-
assert!(err.err().unwrap().to_string().contains("pool is banned"));
87-
}
88-
89-
ban_unban(database, false, false).await;
90-
91-
let mut t = pool.begin().await.unwrap();
92-
t.execute("CREATE TABLE test_ban_unban (id BIGINT)")
93-
.await
94-
.unwrap();
95-
t.rollback().await.unwrap();
96-
97-
pool.close().await;
98-
}
9973
}

integration/schema_sync/pgdog.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[[databases]]
2+
name = "source"
3+
host = "127.0.0.1"
4+
database_name = "pgdog"
5+
6+
[[databases]]
7+
name = "destination"
8+
host = "127.0.0.1"
9+
database_name = "pgdog1"

integration/schema_sync/users.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[[users]]
2+
name = "pgdog-4"
3+
database = "source"
4+
password = "pgdog"
5+
schema_admin = true
6+
7+
[[users]]
8+
name = "pgdog-4"
9+
database = "destination"
10+
password = "pgdog"
11+
schema_admin = true

integration/setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fi
2424
export PGPASSWORD='pgdog'
2525
export PGHOST=127.0.0.1
2626
export PGPORT=5432
27-
#export PGUSER='pgdog'
27+
export PGUSER='pgdog'
2828

2929
for db in pgdog shard_0 shard_1; do
3030
psql -c "DROP DATABASE $db" || true

integration/toxi/toxi_spec.rb

Lines changed: 147 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ def warm_up
4343

4444
it 'some connections survive' do
4545
threads = []
46-
errors = 0
46+
errors = Concurrent::AtomicFixnum.new(0)
4747
sem = Concurrent::Semaphore.new(0)
48-
(5.0 / 25 * 25.0).ceil
4948
25.times do
5049
t = Thread.new do
5150
c = 1
@@ -54,13 +53,13 @@ def warm_up
5453
c = conn
5554
break
5655
rescue StandardError
57-
errors += 1
56+
errors.increment
5857
end
5958
25.times do
6059
c.exec 'SELECT 1'
6160
rescue PG::SystemError
6261
c = conn # reconnect
63-
errors += 1
62+
errors.increment
6463
end
6564
end
6665
threads << t
@@ -69,7 +68,7 @@ def warm_up
6968
sem.release(25)
7069
threads.each(&:join)
7170
end
72-
expect(errors).to be < 25 # 5% error rate (instead of 100%)
71+
expect(errors.value).to be < 25 # 5% error rate (instead of 100%)
7372
end
7473

7574
it 'active record works' do
@@ -78,22 +77,93 @@ def warm_up
7877
# Connect (the pool is lazy)
7978
Sharded.where(id: 1).first
8079
errors = 0
80+
ok = 0
8181
# Can't ban primary because it issues SET queries
8282
# that we currently route to primary.
8383
Toxiproxy[role].toxic(toxic).apply do
8484
25.times do
8585
Sharded.where(id: 1).first
86+
ok += 1
8687
rescue StandardError
8788
errors += 1
8889
end
8990
end
90-
expect(errors).to eq(1)
91+
expect(errors).to be <= 1
92+
expect(25 - ok).to eq(errors)
93+
end
94+
end
95+
96+
describe 'healthcheck' do
97+
before :each do
98+
admin_conn = admin
99+
admin_conn.exec 'RECONNECT'
100+
admin_conn.exec "SET read_write_split TO 'exclude_primary'"
101+
admin_conn.exec 'SET ban_timeout TO 1'
102+
end
103+
104+
describe 'will heal itself' do
105+
def health(role, field = 'healthy')
106+
admin.exec('SHOW POOLS').select do |pool|
107+
pool['database'] == 'failover' && pool['role'] == role
108+
end.map { |pool| pool[field] }
109+
end
110+
111+
10.times do
112+
it 'replica' do
113+
# Cache connect params.
114+
conn.exec 'SELECT 1'
115+
116+
Toxiproxy[:replica].toxic(:reset_peer).apply do
117+
errors = 0
118+
4.times do
119+
conn.exec 'SELECT 1'
120+
rescue PG::Error
121+
errors += 1
122+
end
123+
expect(errors).to be >= 1
124+
expect(health('replica')).to include('f')
125+
sleep(0.4) # ban maintenance runs every 333ms
126+
expect(health('replica', 'banned')).to include('t')
127+
end
128+
129+
4.times do
130+
conn.exec 'SELECT 1'
131+
end
132+
133+
admin.exec 'HEALTHCHECK'
134+
sleep(0.4)
135+
136+
expect(health('replica')).to eq(%w[t t t])
137+
expect(health('replica', 'banned')).to eq(%w[f f f])
138+
end
139+
end
140+
141+
it 'primary' do
142+
# Cache connect params.
143+
conn.exec 'DELETE FROM sharded'
144+
145+
Toxiproxy[:primary].toxic(:reset_peer).apply do
146+
begin
147+
conn.exec 'DELETE FROM sharded'
148+
rescue PG::Error
149+
end
150+
expect(health('primary')).to eq(['f'])
151+
end
152+
153+
conn.exec 'DELETE FROM sharded'
154+
155+
expect(health('primary')).to eq(%w[t])
156+
end
157+
end
158+
159+
after do
160+
admin.exec 'RELOAD'
91161
end
92162
end
93163

94164
describe 'tcp' do
95165
around :each do |example|
96-
Timeout.timeout(10) do
166+
Timeout.timeout(30) do
97167
example.run
98168
end
99169
end
@@ -138,8 +208,23 @@ def warm_up
138208
end
139209

140210
describe 'both down' do
141-
it 'unbans all pools' do
142-
25.times do
211+
10.times do
212+
it 'unbans all pools' do
213+
rw_config = admin.exec('SHOW CONFIG').select do |config|
214+
config['name'] == 'read_write_split'
215+
end[0]['value']
216+
expect(rw_config).to eq('include_primary')
217+
218+
def pool_stat(field, value)
219+
failover = admin.exec('SHOW POOLS').select do |pool|
220+
pool['database'] == 'failover'
221+
end
222+
entries = failover.select { |item| item[field] == value }
223+
entries.size
224+
end
225+
226+
admin.exec 'SET checkout_timeout TO 100'
227+
143228
Toxiproxy[:primary].toxic(:reset_peer).apply do
144229
Toxiproxy[:replica].toxic(:reset_peer).apply do
145230
Toxiproxy[:replica2].toxic(:reset_peer).apply do
@@ -148,19 +233,16 @@ def warm_up
148233
conn.exec_params 'SELECT $1::bigint', [1]
149234
rescue StandardError
150235
end
151-
banned = admin.exec('SHOW POOLS').select do |pool|
152-
pool['database'] == 'failover'
153-
end.select { |item| item['banned'] == 't' }
154-
expect(banned.size).to eq(4)
236+
237+
expect(pool_stat('healthy', 'f')).to eq(4)
155238
end
156239
end
157240
end
158241
end
159-
conn.exec 'SELECT $1::bigint', [25]
160-
banned = admin.exec('SHOW POOLS').select do |pool|
161-
pool['database'] == 'failover'
162-
end.select { |item| item['banned'] == 't' }
163-
expect(banned.size).to eq(0)
242+
243+
4.times do
244+
conn.exec 'SELECT $1::bigint', [25]
245+
end
164246
end
165247
end
166248
end
@@ -172,25 +254,21 @@ def warm_up
172254
Toxiproxy[:primary].toxic(:reset_peer).apply do
173255
c = conn
174256
c.exec 'BEGIN'
175-
c.exec 'CREATE TABLE test(id BIGINT)'
257+
c.exec 'CREATE TABLE IF NOT EXISTS test(id BIGINT)'
176258
c.exec 'ROLLBACK'
177259
rescue StandardError
178260
end
261+
179262
banned = admin.exec('SHOW POOLS').select do |pool|
180263
pool['database'] == 'failover' && pool['role'] == 'primary'
181264
end
182-
expect(banned[0]['banned']).to eq('t')
265+
expect(banned[0]['healthy']).to eq('f')
183266

184267
c = conn
185268
c.exec 'BEGIN'
186-
c.exec 'CREATE TABLE test(id BIGINT)'
269+
c.exec 'CREATE TABLE IF NOT EXISTS test(id BIGINT)'
187270
c.exec 'SELECT * FROM test'
188271
c.exec 'ROLLBACK'
189-
190-
banned = admin.exec('SHOW POOLS').select do |pool|
191-
pool['database'] == 'failover' && pool['role'] == 'primary'
192-
end
193-
expect(banned[0]['banned']).to eq('f')
194272
end
195273

196274
it 'active record works' do
@@ -199,16 +277,58 @@ def warm_up
199277
# Connect (the pool is lazy)
200278
Sharded.where(id: 1).first
201279
errors = 0
280+
ok = 0
202281
# Can't ban primary because it issues SET queries
203282
# that we currently route to primary.
204283
Toxiproxy[:primary].toxic(:reset_peer).apply do
205284
25.times do
206285
Sharded.where(id: 1).first
286+
ok += 1
207287
rescue StandardError
208288
errors += 1
209289
end
210290
end
211-
expect(errors).to eq(1)
291+
expect(errors).to be <= 1
292+
expect(25 - ok).to eq(errors)
293+
end
294+
295+
it 'clients can connect when all servers are down after caching connection params' do
296+
# First, establish a connection to cache connection parameters
297+
c = conn
298+
c.exec 'SELECT 1'
299+
c.close
300+
301+
# Verify initial state - all pools should be healthy before toxics
302+
pools = admin.exec('SHOW POOLS').select do |pool|
303+
pool['database'] == 'failover'
304+
end
305+
expect(pools.all? { |p| p['healthy'] == 't' }).to be true
306+
307+
# Now bring down all servers
308+
Toxiproxy[:primary].toxic(:reset_peer).apply do
309+
Toxiproxy[:replica].toxic(:reset_peer).apply do
310+
Toxiproxy[:replica2].toxic(:reset_peer).apply do
311+
Toxiproxy[:replica3].toxic(:reset_peer).apply do
312+
# Try to establish many connections
313+
connections = []
314+
50.times do
315+
c = conn
316+
expect(c).not_to be_nil
317+
connections << c
318+
end
319+
320+
# Check internal state - verify we have active client connections
321+
clients = admin.exec('SHOW CLIENTS').select do |client|
322+
client['database'] == 'failover'
323+
end
324+
expect(clients.size).to be >= 50
325+
326+
# Clean up connections without executing queries to avoid timeouts
327+
connections.each { |c| c.close rescue nil }
328+
end
329+
end
330+
end
331+
end
212332
end
213333
end
214334
end

0 commit comments

Comments
 (0)