Skip to content

Commit 23a642f

Browse files
drdrshzainkabani
andauthored
Send DISCARD ALL even if client is not in transaction (#152)
* Send DISCARD ALL even if client is not in transaction * fmt * Added tests + avoided sending extra discard all * Adds set name logic to beginning of handle client * fmt * refactor dead code handling * Refactor reading command tag * remove unnecessary trim * Removing debugging statement * typo * typo{ * documentation * edit text * un-unwrap * run ci * run ci Co-authored-by: Zain Kabani <[email protected]>
1 parent 7f20dc3 commit 23a642f

File tree

5 files changed

+164
-34
lines changed

5 files changed

+164
-34
lines changed

.circleci/run_tests.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,16 @@ kill -SIGHUP $(pgrep pgcat) # Reload config again
9090
cd tests/ruby
9191
sudo gem install bundler
9292
bundle install
93-
bundle exec ruby tests.rb
94-
bundle exec rspec *_spec.rb
93+
bundle exec ruby tests.rb || exit 1
94+
bundle exec rspec *_spec.rb || exit 1
9595
cd ../..
9696

9797
#
9898
# Python tests
9999
# These tests will start and stop the pgcat server so it will need to be restarted after the tests
100100
#
101101
pip3 install -r tests/python/requirements.txt
102-
python3 tests/python/tests.py
102+
python3 tests/python/tests.py || exit 1
103103

104104
start_pgcat "info"
105105

src/client.rs

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub struct Client<S, T> {
5959
client_server_map: ClientServerMap,
6060

6161
/// Client parameters, e.g. user, client_encoding, etc.
62+
#[allow(dead_code)]
6263
parameters: HashMap<String, String>,
6364

6465
/// Statistics
@@ -82,6 +83,9 @@ pub struct Client<S, T> {
8283
/// Postgres user for this client (This comes from the user in the connection string)
8384
username: String,
8485

86+
/// Application name for this client (defaults to pgcat)
87+
application_name: String,
88+
8589
/// Used to notify clients about an impending shutdown
8690
shutdown: Receiver<()>,
8791
}
@@ -365,6 +369,11 @@ where
365369
None => return Err(Error::ClientError),
366370
};
367371

372+
let application_name = match parameters.get("application_name") {
373+
Some(application_name) => application_name,
374+
None => "pgcat",
375+
};
376+
368377
let admin = ["pgcat", "pgbouncer"]
369378
.iter()
370379
.filter(|db| *db == &pool_name)
@@ -493,6 +502,7 @@ where
493502
last_server_id: None,
494503
pool_name: pool_name.clone(),
495504
username: username.clone(),
505+
application_name: application_name.to_string(),
496506
shutdown,
497507
connected_to_server: false,
498508
});
@@ -526,6 +536,7 @@ where
526536
last_server_id: None,
527537
pool_name: String::from("undefined"),
528538
username: String::from("undefined"),
539+
application_name: String::from("undefined"),
529540
shutdown,
530541
connected_to_server: false,
531542
});
@@ -767,13 +778,10 @@ where
767778
server.address()
768779
);
769780

770-
// Set application_name if any.
771781
// TODO: investigate other parameters and set them too.
772-
if self.parameters.contains_key("application_name") {
773-
server
774-
.set_name(&self.parameters["application_name"])
775-
.await?;
776-
}
782+
783+
// Set application_name.
784+
server.set_name(&self.application_name).await?;
777785

778786
// Transaction loop. Multiple queries can be issued by the client here.
779787
// The connection belongs to the client until the transaction is over,
@@ -790,12 +798,7 @@ where
790798
Err(err) => {
791799
// Client disconnected inside a transaction.
792800
// Clean up the server and re-use it.
793-
// This prevents connection thrashing by bad clients.
794-
if server.in_transaction() {
795-
server.query("ROLLBACK").await?;
796-
server.query("DISCARD ALL").await?;
797-
server.set_name("pgcat").await?;
798-
}
801+
server.checkin_cleanup().await?;
799802

800803
return Err(err);
801804
}
@@ -837,16 +840,7 @@ where
837840

838841
// Terminate
839842
'X' => {
840-
// Client closing. Rollback and clean up
841-
// connection before releasing into the pool.
842-
// Pgbouncer closes the connection which leads to
843-
// connection thrashing when clients misbehave.
844-
if server.in_transaction() {
845-
server.query("ROLLBACK").await?;
846-
server.query("DISCARD ALL").await?;
847-
server.set_name("pgcat").await?;
848-
}
849-
843+
server.checkin_cleanup().await?;
850844
self.release();
851845

852846
return Ok(());
@@ -950,8 +944,10 @@ where
950944

951945
// The server is no longer bound to us, we can't cancel it's queries anymore.
952946
debug!("Releasing server back into the pool");
947+
server.checkin_cleanup().await?;
953948
self.stats.server_idle(server.process_id(), address.id);
954949
self.connected_to_server = false;
950+
955951
self.release();
956952
self.stats.client_idle(self.process_id, address.id);
957953
}

src/server.rs

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
/// Implementation of the PostgreSQL server (database) protocol.
22
/// Here we are pretending to the a Postgres client.
33
use bytes::{Buf, BufMut, BytesMut};
4-
use log::{debug, error, info, trace};
4+
use log::{debug, error, info, trace, warn};
5+
use std::io::Read;
56
use std::time::SystemTime;
67
use tokio::io::{AsyncReadExt, BufReader};
78
use tokio::net::{
@@ -48,6 +49,9 @@ pub struct Server {
4849
/// Is the server broken? We'll remote it from the pool if so.
4950
bad: bool,
5051

52+
/// If server connection requires a DISCARD ALL before checkin
53+
needs_cleanup: bool,
54+
5155
/// Mapping of clients and servers used for query cancellation.
5256
client_server_map: ClientServerMap,
5357

@@ -316,6 +320,7 @@ impl Server {
316320
in_transaction: false,
317321
data_available: false,
318322
bad: false,
323+
needs_cleanup: false,
319324
client_server_map: client_server_map,
320325
connected_at: chrono::offset::Utc::now().naive_utc(),
321326
stats: stats,
@@ -440,6 +445,29 @@ impl Server {
440445
break;
441446
}
442447

448+
// CommandComplete
449+
'C' => {
450+
let mut command_tag = String::new();
451+
match message.reader().read_to_string(&mut command_tag) {
452+
Ok(_) => {
453+
// Non-exhaustive list of commands that are likely to change session variables/resources
454+
// which can leak between clients. This is a best effort to block bad clients
455+
// from poisoning a transaction-mode pool by setting inappropriate session variables
456+
match command_tag.as_str() {
457+
"SET\0" | "PREPARE\0" => {
458+
debug!("Server connection marked for clean up");
459+
self.needs_cleanup = true;
460+
}
461+
_ => (),
462+
}
463+
}
464+
465+
Err(err) => {
466+
warn!("Encountered an error while parsing CommandTag {}", err);
467+
}
468+
}
469+
}
470+
443471
// DataRow
444472
'D' => {
445473
// More data is available after this message, this is not the end of the reply.
@@ -553,14 +581,43 @@ impl Server {
553581
Ok(())
554582
}
555583

584+
/// Perform any necessary cleanup before putting the server
585+
/// connection back in the pool
586+
pub async fn checkin_cleanup(&mut self) -> Result<(), Error> {
587+
// Client disconnected with an open transaction on the server connection.
588+
// Pgbouncer behavior is to close the server connection but that can cause
589+
// server connection thrashing if clients repeatedly do this.
590+
// Instead, we ROLLBACK that transaction before putting the connection back in the pool
591+
if self.in_transaction() {
592+
self.query("ROLLBACK").await?;
593+
}
594+
595+
// Client disconnected but it perfromed session-altering operations such as
596+
// SET statement_timeout to 1 or create a prepared statement. We clear that
597+
// to avoid leaking state between clients. For performance reasons we only
598+
// send `DISCARD ALL` if we think the session is altered instead of just sending
599+
// it before each checkin.
600+
if self.needs_cleanup {
601+
self.query("DISCARD ALL").await?;
602+
self.needs_cleanup = false;
603+
}
604+
605+
return Ok(());
606+
}
607+
556608
/// A shorthand for `SET application_name = $1`.
557-
#[allow(dead_code)]
558609
pub async fn set_name(&mut self, name: &str) -> Result<(), Error> {
559610
if self.application_name != name {
560611
self.application_name = name.to_string();
561-
Ok(self
612+
// We don't want `SET application_name` to mark the server connection
613+
// as needing cleanup
614+
let needs_cleanup_before = self.needs_cleanup;
615+
616+
let result = Ok(self
562617
.query(&format!("SET application_name = '{}'", name))
563-
.await?)
618+
.await?);
619+
self.needs_cleanup = needs_cleanup_before;
620+
return result;
564621
} else {
565622
Ok(())
566623
}

tests/ruby/helpers/pgcat_helper.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
module Helpers
77
module Pgcat
8-
def self.three_shard_setup(pool_name, pool_size)
8+
def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction")
99
user = {
1010
"password" => "sharding_user",
1111
"pool_size" => pool_size,
@@ -22,7 +22,7 @@ def self.three_shard_setup(pool_name, pool_size)
2222
pgcat_cfg["pools"] = {
2323
"#{pool_name}" => {
2424
"default_role" => "any",
25-
"pool_mode" => "transaction",
25+
"pool_mode" => pool_mode,
2626
"primary_reads_enabled" => false,
2727
"query_parser_enabled" => false,
2828
"sharding_function" => "pg_bigint_hash",
@@ -46,7 +46,7 @@ def self.three_shard_setup(pool_name, pool_size)
4646
end
4747
end
4848

49-
def self.single_shard_setup(pool_name, pool_size)
49+
def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction")
5050
user = {
5151
"password" => "sharding_user",
5252
"pool_size" => pool_size,
@@ -66,7 +66,7 @@ def self.single_shard_setup(pool_name, pool_size)
6666
pgcat_cfg["pools"] = {
6767
"#{pool_name}" => {
6868
"default_role" => "any",
69-
"pool_mode" => "transaction",
69+
"pool_mode" => pool_mode,
7070
"primary_reads_enabled" => false,
7171
"query_parser_enabled" => false,
7272
"sharding_function" => "pg_bigint_hash",

tests/ruby/misc_spec.rb

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@
9191
conn.close
9292

9393
expect(processes.primary.count_query("ROLLBACK")).to eq(1)
94-
expect(processes.primary.count_query("DISCARD ALL")).to eq(1)
9594
end
9695
end
9796

@@ -106,4 +105,82 @@
106105
admin_conn.close
107106
end
108107
end
108+
109+
describe "State clearance" do
110+
context "session mode" do
111+
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") }
112+
113+
it "Clears state before connection checkin" do
114+
# Both modes of operation should not raise
115+
# ERROR: prepared statement "prepared_q" already exists
116+
15.times do
117+
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
118+
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
119+
conn.close
120+
end
121+
122+
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
123+
initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
124+
conn.async_exec("SET statement_timeout to 1000")
125+
current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
126+
expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s")
127+
conn.close
128+
end
129+
130+
it "Does not send DISCARD ALL unless necessary" do
131+
10.times do
132+
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
133+
conn.async_exec("SET SERVER ROLE to 'primary'")
134+
conn.async_exec("SELECT 1")
135+
conn.close
136+
end
137+
138+
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
139+
140+
10.times do
141+
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
142+
conn.async_exec("SET SERVER ROLE to 'primary'")
143+
conn.async_exec("SELECT 1")
144+
conn.async_exec("SET statement_timeout to 5000")
145+
conn.close
146+
end
147+
148+
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
149+
end
150+
end
151+
152+
context "transaction mode" do
153+
let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
154+
it "Clears state before connection checkin" do
155+
# Both modes of operation should not raise
156+
# ERROR: prepared statement "prepared_q" already exists
157+
15.times do
158+
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
159+
conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
160+
conn.close
161+
end
162+
end
163+
164+
it "Does not send DISCARD ALL unless necessary" do
165+
10.times do
166+
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
167+
conn.async_exec("SET SERVER ROLE to 'primary'")
168+
conn.async_exec("SELECT 1")
169+
conn.close
170+
end
171+
172+
expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
173+
174+
10.times do
175+
conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
176+
conn.async_exec("SET SERVER ROLE to 'primary'")
177+
conn.async_exec("SELECT 1")
178+
conn.async_exec("SET statement_timeout to 5000")
179+
conn.close
180+
end
181+
182+
expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
183+
end
184+
end
185+
end
109186
end

0 commit comments

Comments
 (0)