From c3c0039db3976fa120433eeb9d7f8d749e2a2d16 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 10:53:33 -0400 Subject: [PATCH 01/25] wip --- pgdog/src/frontend/client/engine/context.rs | 12 ++++-- pgdog/src/frontend/client/engine/mod.rs | 9 ++++- pgdog/src/frontend/client/mod.rs | 41 ++++++++++++++------- pgdog/src/frontend/client/test/mod.rs | 14 ++++--- pgdog/src/frontend/error.rs | 3 ++ 5 files changed, 54 insertions(+), 25 deletions(-) diff --git a/pgdog/src/frontend/client/engine/context.rs b/pgdog/src/frontend/client/engine/context.rs index 909b0f09..a52cd6a4 100644 --- a/pgdog/src/frontend/client/engine/context.rs +++ b/pgdog/src/frontend/client/engine/context.rs @@ -1,5 +1,7 @@ use crate::{ - frontend::{client::Inner, Buffer, Client, PreparedStatements}, + frontend::{ + client::Inner, logical_transaction::LogicalTransaction, Buffer, Client, PreparedStatements, + }, net::Parameters, }; @@ -13,7 +15,7 @@ pub struct EngineContext<'a> { /// Client parameters. pub(super) params: &'a Parameters, /// Is the client inside a transaction? - pub(super) in_transaction: bool, + pub(super) transaction: &'a LogicalTransaction, /// Messages currently in client's buffer. pub(super) buffer: &'a Buffer, } @@ -23,9 +25,13 @@ impl<'a> EngineContext<'a> { Self { prepared_statements: &mut client.prepared_statements, params: &client.params, - in_transaction: client.in_transaction, + transaction: &client.logical_transaction, connected: inner.connected(), buffer: &client.request_buffer, } } + + pub fn in_transaction(&self) -> bool { + self.transaction.is_some() + } } diff --git a/pgdog/src/frontend/client/engine/mod.rs b/pgdog/src/frontend/client/engine/mod.rs index fa5aae16..9698a622 100644 --- a/pgdog/src/frontend/client/engine/mod.rs +++ b/pgdog/src/frontend/client/engine/mod.rs @@ -54,7 +54,8 @@ impl<'a> Engine<'a> { 'S' => { if only_close || only_sync && !self.context.connected { messages.push( - ReadyForQuery::in_transaction(self.context.in_transaction).message()?, + ReadyForQuery::in_transaction(self.context.in_transaction()) + .message()?, ) } } @@ -68,6 +69,10 @@ impl<'a> Engine<'a> { Ok(messages) } + + pub fn in_transaction(&self) -> bool { + self.context.transaction.is_some() + } } #[cfg(test)] @@ -97,7 +102,7 @@ mod test { connected: false, prepared_statements: &mut prepared, params: ¶ms, - in_transaction: false, + transaction: &None, buffer: &buf, }; diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index ca5c2032..de49271c 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -10,6 +10,7 @@ use tokio::time::timeout; use tokio::{select, spawn}; use tracing::{debug, enabled, error, info, trace, Level as LogLevel}; +use super::logical_transaction::{LogicalTransaction, TransactionStatus}; use super::{Buffer, Command, Comms, Error, PreparedStatements}; use crate::auth::{md5, scram::Server}; use crate::backend::{ @@ -51,7 +52,8 @@ pub struct Client { streaming: bool, shutdown: bool, prepared_statements: PreparedStatements, - in_transaction: bool, + // in_transaction: bool, + logical_transaction: LogicalTransaction, timeouts: Timeouts, request_buffer: Buffer, stream_buffer: BytesMut, @@ -236,7 +238,7 @@ impl Client { replication_mode, connect_params: params, prepared_statements: PreparedStatements::new(), - in_transaction: false, + logical_transaction: LogicalTransaction::new(), timeouts: Timeouts::from_config(&config.config.general), request_buffer: Buffer::new(), stream_buffer: BytesMut::new(), @@ -277,7 +279,7 @@ impl Client { connect_params: connect_params.clone(), params: connect_params, admin: false, - in_transaction: false, + logical_transaction: LogicalTransaction::new(), timeouts: Timeouts::from_config(&config().config.general), request_buffer: Buffer::new(), stream_buffer: BytesMut::new(), @@ -378,7 +380,7 @@ impl Client { "{} [{}] (in transaction: {})", query.query(), self.addr, - self.in_transaction + self.in_transaction() ); QueryLogger::new(&self.request_buffer).log().await?; } @@ -393,7 +395,7 @@ impl Client { match engine.execute().await? { Action::Intercept(msgs) => { self.stream.send_many(&msgs).await?; - inner.done(self.in_transaction); + inner.done(self.in_transaction()); self.update_stats(&mut inner); return Ok(false); } @@ -407,25 +409,25 @@ impl Client { &mut self.request_buffer, &mut self.prepared_statements, &self.params, - self.in_transaction, + self.in_transaction(), ) { Ok(command) => command, Err(err) => { if err.empty_query() { self.stream.send(&EmptyQueryResponse).await?; self.stream - .send_flush(&ReadyForQuery::in_transaction(self.in_transaction)) + .send_flush(&ReadyForQuery::in_transaction(self.in_transaction())) .await?; } else { error!("{:?} [{}]", err, self.addr); self.stream .error( ErrorResponse::syntax(err.to_string().as_str()), - self.in_transaction, + self.in_transaction(), ) .await?; } - inner.done(self.in_transaction); + inner.done(self.in_transaction()); return Ok(false); } }; @@ -444,9 +446,9 @@ impl Client { Some(Command::StartTransaction(query)) => { if let BufferedQuery::Query(_) = query { self.start_transaction().await?; + inner.start_transaction = Some(query.clone()); - self.in_transaction = true; - inner.done(self.in_transaction); + inner.done(self.in_transaction()); return Ok(false); } } @@ -758,13 +760,17 @@ impl Client { /// Tell the client we started a transaction. async fn start_transaction(&mut self) -> Result<(), Error> { + self.logical_transaction.soft_begin()?; + self.stream .send_many(&[ CommandComplete::new_begin().message()?.backend(), ReadyForQuery::in_transaction(true).message()?, ]) .await?; + debug!("transaction started"); + Ok(()) } @@ -774,11 +780,13 @@ impl Client { /// with no queries. async fn end_transaction(&mut self, rollback: bool) -> Result<(), Error> { let cmd = if rollback { + self.logical_transaction.rollback(); CommandComplete::new_rollback() } else { + self.logical_transaction.commit(); CommandComplete::new_commit() }; - let mut messages = if !self.in_transaction { + let mut messages = if !self.in_transaction() { vec![NoticeResponse::from(ErrorResponse::no_transaction()).message()?] } else { vec![] @@ -805,10 +813,10 @@ impl Client { self.stream .send_many(&[ CommandComplete::from_str(command).message()?.backend(), - ReadyForQuery::in_transaction(self.in_transaction).message()?, + ReadyForQuery::in_transaction(self.in_transaction()).message()?, ]) .await?; - inner.done(self.in_transaction); + inner.done(self.in_transaction()); Ok(()) } @@ -819,6 +827,11 @@ impl Client { .prepared_statements(self.prepared_statements.len_local()); inner.stats.memory_used(self.memory_usage()); } + + fn in_transaction(&self) -> bool { + self.logical_transaction.status == TransactionStatus::BeginPending + || self.logical_transaction.status == TransactionStatus::InProgress + } } impl Drop for Client { diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index aaa089f9..779ecc51 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -128,18 +128,20 @@ async fn test_test_client() { let disconnect = client.client_messages(inner.get()).await.unwrap(); assert!(!disconnect); - assert!(!client.in_transaction); + assert!(!client.in_transaction()); assert_eq!(inner.stats.state, State::Active); // Buffer not cleared yet. assert_eq!(client.request_buffer.total_message_len(), query.len()); assert!(inner.backend.connected()); + + let in_trasaction = client.in_transaction(); let command = inner .command( &mut client.request_buffer, &mut client.prepared_statements, &client.params, - client.in_transaction, + in_trasaction, ) .unwrap(); assert!(matches!(command, Some(Command::Query(_)))); @@ -451,7 +453,7 @@ async fn test_transaction_state() { client.client_messages(inner.get()).await.unwrap(); read!(conn, ['C', 'Z']); - assert!(client.in_transaction); + assert!(client.in_transaction()); assert!(inner.router.route().is_write()); assert!(inner.router.in_transaction()); @@ -467,7 +469,7 @@ async fn test_transaction_state() { client.client_messages(inner.get()).await.unwrap(); assert!(inner.router.routed()); - assert!(client.in_transaction); + assert!(client.in_transaction()); assert!(inner.router.route().is_write()); assert!(inner.router.in_transaction()); @@ -511,7 +513,7 @@ async fn test_transaction_state() { read!(conn, ['2', 'D', 'C', 'Z']); assert!(inner.router.routed()); - assert!(client.in_transaction); + assert!(client.in_transaction()); assert!(inner.router.route().is_write()); assert!(inner.router.in_transaction()); @@ -531,7 +533,7 @@ async fn test_transaction_state() { read!(conn, ['C', 'Z']); - assert!(!client.in_transaction); + assert!(!client.in_transaction()); assert!(!inner.router.routed()); } diff --git a/pgdog/src/frontend/error.rs b/pgdog/src/frontend/error.rs index fbbc4b8e..9268ae9d 100644 --- a/pgdog/src/frontend/error.rs +++ b/pgdog/src/frontend/error.rs @@ -45,6 +45,9 @@ pub enum Error { #[error("join error")] Join(#[from] tokio::task::JoinError), + + #[error("transaction error: {0}")] + Transaction(#[from] super::logical_transaction::TransactionError), } impl Error { From ed0c3612974c5b07c8eb94b41674fa242c4bfbee Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 11:15:55 -0400 Subject: [PATCH 02/25] wip --- pgdog/src/frontend/client/engine/context.rs | 8 +-- pgdog/src/frontend/client/engine/mod.rs | 16 +++--- pgdog/src/frontend/client/inner.rs | 17 +++--- pgdog/src/frontend/client/mod.rs | 59 ++++++++++++++------- pgdog/src/frontend/client/test/mod.rs | 3 +- pgdog/src/frontend/logical_transaction.rs | 9 ++++ 6 files changed, 70 insertions(+), 42 deletions(-) diff --git a/pgdog/src/frontend/client/engine/context.rs b/pgdog/src/frontend/client/engine/context.rs index a52cd6a4..7e436131 100644 --- a/pgdog/src/frontend/client/engine/context.rs +++ b/pgdog/src/frontend/client/engine/context.rs @@ -15,7 +15,7 @@ pub struct EngineContext<'a> { /// Client parameters. pub(super) params: &'a Parameters, /// Is the client inside a transaction? - pub(super) transaction: &'a LogicalTransaction, + pub(super) logical_transaction: &'a LogicalTransaction, /// Messages currently in client's buffer. pub(super) buffer: &'a Buffer, } @@ -25,13 +25,9 @@ impl<'a> EngineContext<'a> { Self { prepared_statements: &mut client.prepared_statements, params: &client.params, - transaction: &client.logical_transaction, + logical_transaction: &client.logical_transaction, connected: inner.connected(), buffer: &client.request_buffer, } } - - pub fn in_transaction(&self) -> bool { - self.transaction.is_some() - } } diff --git a/pgdog/src/frontend/client/engine/mod.rs b/pgdog/src/frontend/client/engine/mod.rs index 9698a622..747dbf9f 100644 --- a/pgdog/src/frontend/client/engine/mod.rs +++ b/pgdog/src/frontend/client/engine/mod.rs @@ -54,8 +54,10 @@ impl<'a> Engine<'a> { 'S' => { if only_close || only_sync && !self.context.connected { messages.push( - ReadyForQuery::in_transaction(self.context.in_transaction()) - .message()?, + ReadyForQuery::in_transaction( + self.context.logical_transaction.in_transaction(), + ) + .message()?, ) } } @@ -69,16 +71,12 @@ impl<'a> Engine<'a> { Ok(messages) } - - pub fn in_transaction(&self) -> bool { - self.context.transaction.is_some() - } } #[cfg(test)] mod test { use crate::{ - frontend::{Buffer, PreparedStatements}, + frontend::{logical_transaction::LogicalTransaction, Buffer, PreparedStatements}, net::{Parameters, Parse, Sync}, }; @@ -98,11 +96,13 @@ mod test { Sync.into(), ]); + let logical_transaction = LogicalTransaction::new(); + let context = EngineContext { connected: false, prepared_statements: &mut prepared, params: ¶ms, - transaction: &None, + logical_transaction: &logical_transaction, buffer: &buf, }; diff --git a/pgdog/src/frontend/client/inner.rs b/pgdog/src/frontend/client/inner.rs index 21051386..f40ed938 100644 --- a/pgdog/src/frontend/client/inner.rs +++ b/pgdog/src/frontend/client/inner.rs @@ -6,8 +6,9 @@ use crate::{ Error as BackendError, }, frontend::{ - buffer::BufferedQuery, router::Error as RouterError, Buffer, Command, Comms, - PreparedStatements, Router, RouterContext, Stats, + buffer::BufferedQuery, logical_transaction::LogicalTransaction, + router::Error as RouterError, Buffer, Command, Comms, PreparedStatements, Router, + RouterContext, Stats, }, net::Parameters, state::State, @@ -58,7 +59,7 @@ impl Inner { buffer: &mut Buffer, prepared_statements: &mut PreparedStatements, params: &Parameters, - in_transaction: bool, + logical_transaction: &LogicalTransaction, ) -> Result, RouterError> { let command = self .backend @@ -67,11 +68,11 @@ impl Inner { .map(|cluster| { // Build router context. let context = RouterContext::new( - buffer, // Query and parameters. - cluster, // Cluster configuration. - prepared_statements, // Prepared statements. - params, // Client connection parameters. - in_transaction, // Client in explcitely started transaction. + buffer, // Query and parameters. + cluster, // Cluster configuration. + prepared_statements, // Prepared statements. + params, // Client connection parameters. + logical_transaction.in_transaction(), // Client in explcitely started transaction. )?; self.router.query(context) }) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index de49271c..d2e902f5 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -409,7 +409,7 @@ impl Client { &mut self.request_buffer, &mut self.prepared_statements, &self.params, - self.in_transaction(), + &self.logical_transaction, ) { Ok(command) => command, Err(err) => { @@ -454,16 +454,20 @@ impl Client { } Some(Command::RollbackTransaction) => { inner.start_transaction = None; + self.end_transaction(true).await?; - self.in_transaction = false; - inner.done(self.in_transaction); + self.logical_transaction.rollback()?; + + inner.done(self.logical_transaction.in_transaction()); return Ok(false); } Some(Command::CommitTransaction) => { inner.start_transaction = None; + self.end_transaction(false).await?; - self.in_transaction = false; - inner.done(self.in_transaction); + self.logical_transaction.commit()?; + + inner.done(self.logical_transaction.in_transaction()); return Ok(false); } // How many shards are configured. @@ -472,11 +476,14 @@ impl Client { let mut dr = DataRow::new(); dr.add(*shards as i64); let cc = CommandComplete::from_str("SHOW"); - let rfq = ReadyForQuery::in_transaction(self.in_transaction); + let rfq = + ReadyForQuery::in_transaction(self.logical_transaction.in_transaction()); + self.stream .send_many(&[rd.message()?, dr.message()?, cc.message()?, rfq.message()?]) .await?; - inner.done(self.in_transaction); + + inner.done(self.logical_transaction.in_transaction()); return Ok(false); } Some(Command::Deallocate) => { @@ -494,9 +501,12 @@ impl Client { Some(Command::Query(query)) => { if query.is_cross_shard() && self.cross_shard_disabled { self.stream - .error(ErrorResponse::cross_shard_disabled(), self.in_transaction) + .error( + ErrorResponse::cross_shard_disabled(), + self.logical_transaction.in_transaction(), + ) .await?; - inner.done(self.in_transaction); + inner.done(self.logical_transaction.in_transaction()); inner.reset_router(); return Ok(false); } @@ -548,12 +558,15 @@ impl Client { if err.no_server() { error!("{} [{}]", err, self.addr); self.stream - .error(ErrorResponse::from_err(&err), self.in_transaction) + .error( + ErrorResponse::from_err(&err), + self.logical_transaction.in_transaction(), + ) .await?; // TODO: should this be wrapped in a method? inner.disconnect(); inner.reset_router(); - inner.done(self.in_transaction); + inner.done(self.logical_transaction.in_transaction()); return Ok(false); } else { return Err(err.into()); @@ -629,13 +642,23 @@ impl Client { // ReadyForQuery (B) if code == 'Z' { inner.stats.query(); - // In transaction if buffered BEGIN from client - // or server is telling us we are. - self.in_transaction = message.in_transaction() || inner.start_transaction.is_some(); - inner.stats.idle(self.in_transaction); + // 1) Should we logically be in‐txn? + // In transaction if buffered BEGIN from client or server is telling us we are. + let should_be_tx = message.in_transaction() || inner.start_transaction.is_some(); + let in_transaction = self.logical_transaction.in_transaction(); + + // 2) Reconcile against our LogicalTransaction + if should_be_tx && !in_transaction { + self.logical_transaction.soft_begin()?; + } + if !should_be_tx && in_transaction { + self.logical_transaction.reset(); // COMMIT/ROLLBACK just happened? + } + + inner.stats.idle(self.logical_transaction.in_transaction()); // Flush mirrors. - if !self.in_transaction { + if !self.logical_transaction.in_transaction() { inner.backend.mirror_flush(); } } @@ -780,10 +803,10 @@ impl Client { /// with no queries. async fn end_transaction(&mut self, rollback: bool) -> Result<(), Error> { let cmd = if rollback { - self.logical_transaction.rollback(); + self.logical_transaction.rollback()?; CommandComplete::new_rollback() } else { - self.logical_transaction.commit(); + self.logical_transaction.commit()?; CommandComplete::new_commit() }; let mut messages = if !self.in_transaction() { diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index 779ecc51..7cfaf7ae 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -135,13 +135,12 @@ async fn test_test_client() { assert!(inner.backend.connected()); - let in_trasaction = client.in_transaction(); let command = inner .command( &mut client.request_buffer, &mut client.prepared_statements, &client.params, - in_trasaction, + &client.logical_transaction, ) .unwrap(); assert!(matches!(command, Some(Command::Query(_)))); diff --git a/pgdog/src/frontend/logical_transaction.rs b/pgdog/src/frontend/logical_transaction.rs index 232b8034..0bf23dc1 100644 --- a/pgdog/src/frontend/logical_transaction.rs +++ b/pgdog/src/frontend/logical_transaction.rs @@ -64,6 +64,15 @@ impl LogicalTransaction { .or_else(|| self.manual_shard.clone()) } + /// Return whether a transaction is currently open or pending. + /// This is because we don't actually trigger the begin until the first statement is executed. + pub fn in_transaction(&self) -> bool { + matches!( + self.status, + TransactionStatus::BeginPending | TransactionStatus::InProgress + ) + } + /// Mark that a `BEGIN` is pending. /// /// Transitions `Idle -> BeginPending`. From 1a4c8e805b2dc785b2a33788e26d78fb6ab69467 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 11:55:23 -0400 Subject: [PATCH 03/25] wip --- pgdog/src/frontend/logical_transaction.rs | 39 +++++++---------------- 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/pgdog/src/frontend/logical_transaction.rs b/pgdog/src/frontend/logical_transaction.rs index 0bf23dc1..8e67d6a5 100644 --- a/pgdog/src/frontend/logical_transaction.rs +++ b/pgdog/src/frontend/logical_transaction.rs @@ -86,12 +86,8 @@ impl LogicalTransaction { self.status = TransactionStatus::BeginPending; Ok(()) } - TransactionStatus::BeginPending | TransactionStatus::InProgress => { - Err(TransactionError::AlreadyInTransaction) - } - TransactionStatus::Committed | TransactionStatus::RolledBack => { - Err(TransactionError::AlreadyFinalized) - } + TransactionStatus::BeginPending => Err(TransactionError::AlreadyInTransaction), + TransactionStatus::InProgress => Err(TransactionError::AlreadyInTransaction), } } @@ -110,15 +106,12 @@ impl LogicalTransaction { self.touch_shard(shard)?; match self.status { + TransactionStatus::Idle => Err(TransactionError::NoPendingBegins), TransactionStatus::BeginPending => { self.status = TransactionStatus::InProgress; Ok(()) } - - TransactionStatus::Idle => Err(TransactionError::NoPendingBegins), TransactionStatus::InProgress => Ok(()), - TransactionStatus::Committed => Err(TransactionError::AlreadyFinalized), - TransactionStatus::RolledBack => Err(TransactionError::AlreadyFinalized), } } @@ -132,15 +125,12 @@ impl LogicalTransaction { /// - `AlreadyFinalized` if already `Committed` or `RolledBack`. pub fn commit(&mut self) -> Result<(), TransactionError> { match self.status { + TransactionStatus::Idle => Err(TransactionError::NoPendingBegins), + TransactionStatus::BeginPending => Err(TransactionError::NoActiveTransaction), TransactionStatus::InProgress => { - self.status = TransactionStatus::Committed; + self.reset(); Ok(()) } - - TransactionStatus::Idle => Err(TransactionError::NoPendingBegins), - TransactionStatus::BeginPending => Err(TransactionError::NoActiveTransaction), - TransactionStatus::Committed => Err(TransactionError::AlreadyFinalized), - TransactionStatus::RolledBack => Err(TransactionError::AlreadyFinalized), } } @@ -154,15 +144,12 @@ impl LogicalTransaction { /// - `AlreadyFinalized` if already `Committed` or `RolledBack`. pub fn rollback(&mut self) -> Result<(), TransactionError> { match self.status { + TransactionStatus::Idle => Err(TransactionError::NoPendingBegins), + TransactionStatus::BeginPending => Err(TransactionError::NoActiveTransaction), TransactionStatus::InProgress => { - self.status = TransactionStatus::RolledBack; + self.reset(); Ok(()) } - - TransactionStatus::Idle => Err(TransactionError::NoPendingBegins), - TransactionStatus::BeginPending => Err(TransactionError::NoActiveTransaction), - TransactionStatus::Committed => Err(TransactionError::AlreadyFinalized), - TransactionStatus::RolledBack => Err(TransactionError::AlreadyFinalized), } } @@ -285,10 +272,6 @@ pub enum TransactionStatus { BeginPending, /// Transaction active. InProgress, - /// ROLLBACK issued. - RolledBack, - /// COMMIT issued. - Committed, } // ----------------------------------------------------------------------------- @@ -409,7 +392,7 @@ mod tests { tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.commit().unwrap(); - assert_eq!(tx.status, TransactionStatus::Committed); + assert_eq!(tx.status, TransactionStatus::Idle); } #[test] @@ -443,7 +426,7 @@ mod tests { tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.rollback().unwrap(); - assert_eq!(tx.status, TransactionStatus::RolledBack); + assert_eq!(tx.status, TransactionStatus::Idle); } #[test] From 84af9e8ab520d793e09b81fa76083c5b72d7d343 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 12:27:54 -0400 Subject: [PATCH 04/25] wip, test fixes --- pgdog/src/frontend/logical_transaction.rs | 186 ++++++++++++++++------ 1 file changed, 134 insertions(+), 52 deletions(-) diff --git a/pgdog/src/frontend/logical_transaction.rs b/pgdog/src/frontend/logical_transaction.rs index 8e67d6a5..369ed18c 100644 --- a/pgdog/src/frontend/logical_transaction.rs +++ b/pgdog/src/frontend/logical_transaction.rs @@ -86,8 +86,8 @@ impl LogicalTransaction { self.status = TransactionStatus::BeginPending; Ok(()) } - TransactionStatus::BeginPending => Err(TransactionError::AlreadyInTransaction), - TransactionStatus::InProgress => Err(TransactionError::AlreadyInTransaction), + TransactionStatus::BeginPending => Err(TransactionError::ExpectedIdle), + TransactionStatus::InProgress => Err(TransactionError::ExpectedIdle), } } @@ -106,7 +106,7 @@ impl LogicalTransaction { self.touch_shard(shard)?; match self.status { - TransactionStatus::Idle => Err(TransactionError::NoPendingBegins), + TransactionStatus::Idle => Err(TransactionError::ExpectedPendingOrActive), TransactionStatus::BeginPending => { self.status = TransactionStatus::InProgress; Ok(()) @@ -125,8 +125,8 @@ impl LogicalTransaction { /// - `AlreadyFinalized` if already `Committed` or `RolledBack`. pub fn commit(&mut self) -> Result<(), TransactionError> { match self.status { - TransactionStatus::Idle => Err(TransactionError::NoPendingBegins), - TransactionStatus::BeginPending => Err(TransactionError::NoActiveTransaction), + TransactionStatus::Idle => Err(TransactionError::ExpectedActive), + TransactionStatus::BeginPending => Err(TransactionError::ExpectedActive), TransactionStatus::InProgress => { self.reset(); Ok(()) @@ -144,8 +144,8 @@ impl LogicalTransaction { /// - `AlreadyFinalized` if already `Committed` or `RolledBack`. pub fn rollback(&mut self) -> Result<(), TransactionError> { match self.status { - TransactionStatus::Idle => Err(TransactionError::NoPendingBegins), - TransactionStatus::BeginPending => Err(TransactionError::NoActiveTransaction), + TransactionStatus::Idle => Err(TransactionError::ExpectedActive), + TransactionStatus::BeginPending => Err(TransactionError::ExpectedActive), TransactionStatus::InProgress => { self.reset(); Ok(()) @@ -233,10 +233,10 @@ impl LogicalTransaction { #[derive(Debug)] pub enum TransactionError { // Transaction lifecycle - AlreadyInTransaction, - NoActiveTransaction, - AlreadyFinalized, - NoPendingBegins, + ExpectedIdle, + ExpectedPending, + ExpectedPendingOrActive, + ExpectedActive, // Sharding policy InvalidShardType, @@ -247,10 +247,10 @@ impl fmt::Display for TransactionError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use TransactionError::*; match self { - AlreadyInTransaction => write!(f, "transaction already started"), - NoActiveTransaction => write!(f, "no active transaction"), - AlreadyFinalized => write!(f, "transaction already finalized"), - NoPendingBegins => write!(f, "transaction not pending"), + ExpectedIdle => write!(f, "transaction already started"), + ExpectedPending => write!(f, "transaction not pending"), + ExpectedActive => write!(f, "no active transaction"), + ExpectedPendingOrActive => write!(f, "no active/pending transaction"), InvalidShardType => write!(f, "sharding hints must be ::Direct(n)"), ShardConflict => { write!(f, "can't run a transaction on multiple shards") @@ -301,43 +301,49 @@ mod tests { let mut tx = LogicalTransaction::new(); tx.soft_begin().unwrap(); let err = tx.soft_begin().unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyInTransaction)); + assert!(matches!(err, TransactionError::ExpectedIdle)); } #[test] fn test_soft_begin_in_progress_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + let err = tx.soft_begin().unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyInTransaction)); + assert!(matches!(err, TransactionError::ExpectedIdle)); } #[test] fn test_soft_begin_after_commit_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.commit().unwrap(); - let err = tx.soft_begin().unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyFinalized)); + + tx.soft_begin().unwrap(); // no panic } #[test] fn test_soft_begin_after_rollback_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.rollback().unwrap(); - let err = tx.soft_begin().unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyFinalized)); + + tx.soft_begin().unwrap(); // no panic } #[test] fn test_execute_query_from_begin_pending() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + assert_eq!(tx.status, TransactionStatus::InProgress); assert_eq!(tx.dirty_shard, Some(Shard::Direct(0))); } @@ -346,25 +352,31 @@ mod tests { fn test_execute_query_from_idle_errors() { let mut tx = LogicalTransaction::new(); let err = tx.execute_query(Shard::Direct(0)).unwrap_err(); - assert!(matches!(err, TransactionError::NoPendingBegins)); + assert!(matches!(err, TransactionError::ExpectedPendingOrActive)); } #[test] fn test_execute_query_after_commit_errors() { - let mut tx = LogicalTransaction::new(); - tx.soft_begin().unwrap(); - tx.execute_query(Shard::Direct(0)).unwrap(); - tx.commit().unwrap(); - let err = tx.execute_query(Shard::Direct(0)).unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyFinalized)); + let mut ltx = LogicalTransaction::new(); + + ltx.soft_begin().unwrap(); + ltx.execute_query(Shard::Direct(0)).unwrap(); + ltx.execute_query(Shard::Direct(0)).unwrap(); + ltx.execute_query(Shard::Direct(0)).unwrap(); + ltx.commit().unwrap(); + + let err = ltx.execute_query(Shard::Direct(0)).unwrap_err(); + assert!(matches!(err, TransactionError::ExpectedPendingOrActive)); } #[test] fn test_execute_query_multiple_on_same_shard() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + assert_eq!(tx.dirty_shard, Some(Shard::Direct(0))); assert_eq!(tx.status, TransactionStatus::InProgress); } @@ -372,8 +384,10 @@ mod tests { #[test] fn test_execute_query_cross_shard_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + let err = tx.execute_query(Shard::Direct(1)).unwrap_err(); assert!(matches!(err, TransactionError::ShardConflict)); } @@ -381,7 +395,9 @@ mod tests { #[test] fn test_execute_query_invalid_shard_type_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); + let err = tx.execute_query(Shard::All).unwrap_err(); assert!(matches!(err, TransactionError::InvalidShardType)); } @@ -389,9 +405,11 @@ mod tests { #[test] fn test_commit_from_in_progress() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.commit().unwrap(); + assert_eq!(tx.status, TransactionStatus::Idle); } @@ -399,80 +417,104 @@ mod tests { fn test_commit_from_idle_errors() { let mut tx = LogicalTransaction::new(); let err = tx.commit().unwrap_err(); - assert!(matches!(err, TransactionError::NoPendingBegins)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] fn test_commit_from_begin_pending_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); + let err = tx.commit().unwrap_err(); - assert!(matches!(err, TransactionError::NoActiveTransaction)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] fn test_commit_already_committed_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); tx.commit().unwrap(); + let err = tx.commit().unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyFinalized)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] fn test_rollback_from_in_progress() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); tx.rollback().unwrap(); + assert_eq!(tx.status, TransactionStatus::Idle); } #[test] fn test_rollback_from_begin_pending_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); + let err = tx.rollback().unwrap_err(); - assert!(matches!(err, TransactionError::NoActiveTransaction)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] fn test_reset_clears_state() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.set_manual_shard(Shard::Direct(0)).unwrap(); tx.reset(); + assert_eq!(tx.status, TransactionStatus::Idle); assert_eq!(tx.manual_shard, None); assert_eq!(tx.dirty_shard, None); } #[test] - fn test_set_manual_shard_before_touch() { + fn test_set_matching_manual_shard_before_touch() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.set_manual_shard(Shard::Direct(0)).unwrap(); + assert_eq!(tx.manual_shard, Some(Shard::Direct(0))); + + tx.execute_query(Shard::Direct(0)).unwrap(); // should succeed + tx.execute_query(Shard::Direct(0)).unwrap(); // should succeed tx.execute_query(Shard::Direct(0)).unwrap(); // should succeed } #[test] fn test_set_manual_shard_after_touch_same_ok() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.set_manual_shard(Shard::Direct(0)).unwrap(); + assert_eq!(tx.manual_shard, Some(Shard::Direct(0))); } #[test] fn test_set_manual_shard_after_touch_different_errors() { let mut tx = LogicalTransaction::new(); + // touch shard 0 tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + // manually set shard 1 let err = tx.set_manual_shard(Shard::Direct(1)).unwrap_err(); assert!(matches!(err, TransactionError::ShardConflict)); @@ -482,8 +524,10 @@ mod tests { fn test_manual_then_dirty_conflict() { let mut tx = LogicalTransaction::new(); tx.soft_begin().unwrap(); + // pin to shard 0 tx.set_manual_shard(Shard::Direct(0)).unwrap(); + // touching another shard must fail let err = tx.execute_query(Shard::Direct(1)).unwrap_err(); assert!(matches!(err, TransactionError::ShardConflict)); @@ -499,8 +543,12 @@ mod tests { #[test] fn test_active_shard_dirty() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(69)).unwrap(); + tx.execute_query(Shard::Direct(69)).unwrap(); + tx.execute_query(Shard::Direct(69)).unwrap(); + assert_eq!(tx.active_shard(), Some(Shard::Direct(69))); } @@ -515,57 +563,81 @@ mod tests { fn test_rollback_from_idle_errors() { let mut tx = LogicalTransaction::new(); let err = tx.rollback().unwrap_err(); - assert!(matches!(err, TransactionError::NoPendingBegins)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] fn test_commit_after_rollback_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); tx.rollback().unwrap(); + let err = tx.commit().unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyFinalized)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] fn test_rollback_after_commit_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); tx.commit().unwrap(); + let err = tx.rollback().unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyFinalized)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] fn test_rollback_already_rolledback_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); tx.rollback().unwrap(); + let err = tx.rollback().unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyFinalized)); + println!("Error: {:?}", err); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] fn test_execute_query_after_rollback_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); tx.rollback().unwrap(); + let err = tx.execute_query(Shard::Direct(0)).unwrap_err(); - assert!(matches!(err, TransactionError::AlreadyFinalized)); + assert!(matches!(err, TransactionError::ExpectedPendingOrActive)); } #[test] fn test_set_manual_shard_multiple_changes_before_execute() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.set_manual_shard(Shard::Direct(1)).unwrap(); - tx.set_manual_shard(Shard::Direct(2)).unwrap(); + tx.set_manual_shard(Shard::Direct(2)).unwrap(); // change, no error. + assert_eq!(tx.manual_shard, Some(Shard::Direct(2))); + + tx.execute_query(Shard::Direct(2)).unwrap(); + tx.execute_query(Shard::Direct(2)).unwrap(); tx.execute_query(Shard::Direct(2)).unwrap(); + tx.execute_query(Shard::Direct(2)).unwrap(); + let err = tx.execute_query(Shard::Direct(1)).unwrap_err(); assert!(matches!(err, TransactionError::ShardConflict)); } @@ -573,9 +645,11 @@ mod tests { #[test] fn test_set_manual_shard_after_commit_same_ok() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.commit().unwrap(); + tx.set_manual_shard(Shard::Direct(0)).unwrap(); assert_eq!(tx.manual_shard, Some(Shard::Direct(0))); } @@ -583,31 +657,41 @@ mod tests { #[test] fn test_set_manual_shard_after_commit_different_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); tx.commit().unwrap(); - let err = tx.set_manual_shard(Shard::Direct(1)).unwrap_err(); - assert!(matches!(err, TransactionError::ShardConflict)); + + tx.set_manual_shard(Shard::Direct(1)).unwrap(); // should not panic } #[test] fn test_set_manual_shard_after_rollback_same_ok() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.rollback().unwrap(); - tx.set_manual_shard(Shard::Direct(0)).unwrap(); - assert_eq!(tx.manual_shard, Some(Shard::Direct(0))); + + tx.set_manual_shard(Shard::Direct(88)).unwrap(); + assert_eq!(tx.manual_shard, Some(Shard::Direct(88))); // no panic } #[test] fn test_set_manual_shard_after_rollback_different_errors() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); + tx.execute_query(Shard::Direct(0)).unwrap(); tx.rollback().unwrap(); - let err = tx.set_manual_shard(Shard::Direct(1)).unwrap_err(); - assert!(matches!(err, TransactionError::ShardConflict)); + + tx.set_manual_shard(Shard::Direct(1)).unwrap(); // should not panic } #[test] @@ -626,10 +710,13 @@ mod tests { #[test] fn test_soft_begin_after_reset_from_finalized() { let mut tx = LogicalTransaction::new(); + tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(0)).unwrap(); tx.commit().unwrap(); + tx.reset(); + tx.soft_begin().unwrap(); assert_eq!(tx.status, TransactionStatus::BeginPending); } @@ -637,17 +724,12 @@ mod tests { #[test] fn test_active_shard_both_same() { let mut tx = LogicalTransaction::new(); + tx.set_manual_shard(Shard::Direct(3)).unwrap(); tx.soft_begin().unwrap(); tx.execute_query(Shard::Direct(3)).unwrap(); - assert_eq!(tx.active_shard(), Some(Shard::Direct(3))); - } - #[test] - fn test_statements_executed_remains_zero_after_execute() { - let mut tx = LogicalTransaction::new(); - tx.soft_begin().unwrap(); - tx.execute_query(Shard::Direct(0)).unwrap(); + assert_eq!(tx.active_shard(), Some(Shard::Direct(3))); } } From 77feddd295a418e573bba58799eeabd0afe2f802 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 12:55:30 -0400 Subject: [PATCH 05/25] wip-try-split-command --- integration/load_balancer/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/load_balancer/run.sh b/integration/load_balancer/run.sh index 05b7f394..701041cb 100644 --- a/integration/load_balancer/run.sh +++ b/integration/load_balancer/run.sh @@ -10,7 +10,7 @@ export PGHOST=127.0.0.1 export PGDATABASE=postgres export PGPASSWORD=postgres -docker-compose up -d +docker compose up -d echo "Waiting for Postgres to be ready" @@ -45,4 +45,4 @@ popd killall pgdog -docker-compose down +docker compose down From 956a0cb0f5cbe03151e65de079a8e89f9b4c1556 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 13:04:35 -0400 Subject: [PATCH 06/25] testing --- integration/go/go_pgx/load_balancer_test.go | 25 +++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/integration/go/go_pgx/load_balancer_test.go b/integration/go/go_pgx/load_balancer_test.go index aa337b95..f6b45b55 100644 --- a/integration/go/go_pgx/load_balancer_test.go +++ b/integration/go/go_pgx/load_balancer_test.go @@ -231,10 +231,27 @@ outer: } func prewarm(t *testing.T, pool *pgxpool.Pool) { + ctx := context.Background() for range 25 { - for _, q := range []string{"BEGIN", "SELECT 1", "COMMIT", "SELECT 1"} { - _, err := pool.Exec(context.Background(), q) - assert.NoError(t, err) - } + // pin to one connection via a real Tx + tx, err := pool.Begin(ctx) + assert.NoError(t, err) + _, err = tx.Exec(ctx, "SELECT 1") + assert.NoError(t, err) + err = tx.Commit(ctx) + assert.NoError(t, err) + + // simple query to warm another round + _, err = pool.Exec(ctx, "SELECT 1") + assert.NoError(t, err) } } + +// func prewarm(t *testing.T, pool *pgxpool.Pool) { +// for range 25 { +// for _, q := range []string{"BEGIN", "SELECT 1", "COMMIT", "SELECT 1"} { +// _, err := pool.Exec(context.Background(), q) +// assert.NoError(t, err) +// } +// } +// } From 82c89eff0b3b7831f1a5a9a7bd4ed080304a50b1 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 13:41:02 -0400 Subject: [PATCH 07/25] test_fix_maybe --- integration/go/go_pgx/load_balancer_test.go | 42 ++++++++++----------- pgdog/src/frontend/client/mod.rs | 2 + pgdog/src/frontend/logical_transaction.rs | 10 ++++- 3 files changed, 31 insertions(+), 23 deletions(-) diff --git a/integration/go/go_pgx/load_balancer_test.go b/integration/go/go_pgx/load_balancer_test.go index f6b45b55..a0275f50 100644 --- a/integration/go/go_pgx/load_balancer_test.go +++ b/integration/go/go_pgx/load_balancer_test.go @@ -230,28 +230,28 @@ outer: return int64(totalQueryCount), int64(totalTransactionCount) } -func prewarm(t *testing.T, pool *pgxpool.Pool) { - ctx := context.Background() - for range 25 { - // pin to one connection via a real Tx - tx, err := pool.Begin(ctx) - assert.NoError(t, err) - _, err = tx.Exec(ctx, "SELECT 1") - assert.NoError(t, err) - err = tx.Commit(ctx) - assert.NoError(t, err) - - // simple query to warm another round - _, err = pool.Exec(ctx, "SELECT 1") - assert.NoError(t, err) - } -} - // func prewarm(t *testing.T, pool *pgxpool.Pool) { +// ctx := context.Background() // for range 25 { -// for _, q := range []string{"BEGIN", "SELECT 1", "COMMIT", "SELECT 1"} { -// _, err := pool.Exec(context.Background(), q) -// assert.NoError(t, err) -// } +// // pin to one connection via a real Tx +// tx, err := pool.Begin(ctx) +// assert.NoError(t, err) +// _, err = tx.Exec(ctx, "SELECT 1") +// assert.NoError(t, err) +// err = tx.Commit(ctx) +// assert.NoError(t, err) + +// // simple query to warm another round +// _, err = pool.Exec(ctx, "SELECT 1") +// assert.NoError(t, err) // } // } + +func prewarm(t *testing.T, pool *pgxpool.Pool) { + for range 25 { + for _, q := range []string{"BEGIN", "SELECT 1", "COMMIT", "SELECT 1"} { + _, err := pool.Exec(context.Background(), q) + assert.NoError(t, err) + } + } +} diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index d2e902f5..18ff63a2 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -449,6 +449,7 @@ impl Client { inner.start_transaction = Some(query.clone()); inner.done(self.in_transaction()); + return Ok(false); } } @@ -459,6 +460,7 @@ impl Client { self.logical_transaction.rollback()?; inner.done(self.logical_transaction.in_transaction()); + return Ok(false); } Some(Command::CommitTransaction) => { diff --git a/pgdog/src/frontend/logical_transaction.rs b/pgdog/src/frontend/logical_transaction.rs index 369ed18c..e756f7af 100644 --- a/pgdog/src/frontend/logical_transaction.rs +++ b/pgdog/src/frontend/logical_transaction.rs @@ -126,7 +126,10 @@ impl LogicalTransaction { pub fn commit(&mut self) -> Result<(), TransactionError> { match self.status { TransactionStatus::Idle => Err(TransactionError::ExpectedActive), - TransactionStatus::BeginPending => Err(TransactionError::ExpectedActive), + TransactionStatus::BeginPending => { + self.reset(); + Ok(()) + } TransactionStatus::InProgress => { self.reset(); Ok(()) @@ -145,7 +148,10 @@ impl LogicalTransaction { pub fn rollback(&mut self) -> Result<(), TransactionError> { match self.status { TransactionStatus::Idle => Err(TransactionError::ExpectedActive), - TransactionStatus::BeginPending => Err(TransactionError::ExpectedActive), + TransactionStatus::BeginPending => { + self.reset(); + Ok(()) + } TransactionStatus::InProgress => { self.reset(); Ok(()) From 78a3611d4e86ba921c868229ee90de6e2aa0e8d2 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 13:48:19 -0400 Subject: [PATCH 08/25] fix_unit_tests --- pgdog/src/frontend/logical_transaction.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pgdog/src/frontend/logical_transaction.rs b/pgdog/src/frontend/logical_transaction.rs index e756f7af..6061d332 100644 --- a/pgdog/src/frontend/logical_transaction.rs +++ b/pgdog/src/frontend/logical_transaction.rs @@ -427,13 +427,11 @@ mod tests { } #[test] - fn test_commit_from_begin_pending_errors() { + fn test_commit_from_begin_pending() { let mut tx = LogicalTransaction::new(); tx.soft_begin().unwrap(); - - let err = tx.commit().unwrap_err(); - assert!(matches!(err, TransactionError::ExpectedActive)); + tx.commit().unwrap(); // no-panic } #[test] @@ -469,9 +467,7 @@ mod tests { let mut tx = LogicalTransaction::new(); tx.soft_begin().unwrap(); - - let err = tx.rollback().unwrap_err(); - assert!(matches!(err, TransactionError::ExpectedActive)); + tx.rollback().unwrap(); // no-panic } #[test] From 2f3ebf65343b08c7ad942bb2796b1cc5bacaf381 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 14:10:02 -0400 Subject: [PATCH 09/25] fix test --- integration/go/go_pgx/load_balancer_test.go | 33 ++++++++------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/integration/go/go_pgx/load_balancer_test.go b/integration/go/go_pgx/load_balancer_test.go index a0275f50..c23032de 100644 --- a/integration/go/go_pgx/load_balancer_test.go +++ b/integration/go/go_pgx/load_balancer_test.go @@ -230,28 +230,19 @@ outer: return int64(totalQueryCount), int64(totalTransactionCount) } -// func prewarm(t *testing.T, pool *pgxpool.Pool) { -// ctx := context.Background() -// for range 25 { -// // pin to one connection via a real Tx -// tx, err := pool.Begin(ctx) -// assert.NoError(t, err) -// _, err = tx.Exec(ctx, "SELECT 1") -// assert.NoError(t, err) -// err = tx.Commit(ctx) -// assert.NoError(t, err) - -// // simple query to warm another round -// _, err = pool.Exec(ctx, "SELECT 1") -// assert.NoError(t, err) -// } -// } - func prewarm(t *testing.T, pool *pgxpool.Pool) { + ctx := context.Background() for range 25 { - for _, q := range []string{"BEGIN", "SELECT 1", "COMMIT", "SELECT 1"} { - _, err := pool.Exec(context.Background(), q) - assert.NoError(t, err) - } + // transaction `BEGIN; SELECT 1; COMMIT;` + tx, err := pool.Begin(ctx) + assert.NoError(t, err) + _, err = tx.Exec(ctx, "SELECT 1") + assert.NoError(t, err) + err = tx.Commit(ctx) + assert.NoError(t, err) + + // no-transaction `SELECT 1;` + _, err = pool.Exec(ctx, "SELECT 1") + assert.NoError(t, err) } } From bbc286be7f0cbeb1154f74b112de79df2e857cdd Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 14:41:41 -0400 Subject: [PATCH 10/25] wip --- pgdog/src/frontend/client/mod.rs | 41 +++++++++++++++++------ pgdog/src/frontend/logical_transaction.rs | 16 ++++----- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 18ff63a2..50c902a1 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -19,6 +19,7 @@ use crate::backend::{ }; use crate::config::{self, AuthType}; use crate::frontend::buffer::BufferedQuery; +use crate::frontend::logical_transaction::TransactionError; #[cfg(debug_assertions)] use crate::frontend::QueryLogger; use crate::net::messages::{ @@ -804,21 +805,41 @@ impl Client { /// This avoids connecting to servers when clients start and commit transactions /// with no queries. async fn end_transaction(&mut self, rollback: bool) -> Result<(), Error> { + // attempt to commit or rollback the logical transaction + let logical_result = if rollback { + self.logical_transaction.rollback() + } else { + self.logical_transaction.commit() + }; + + match logical_result { + // no transaction in progress → send a NOTICE and READY + Err(TransactionError::ExpectedActive) => { + let notice = NoticeResponse::from(ErrorResponse::no_transaction()) + .message()? + .backend(); + let ready = ReadyForQuery::idle().message()?.backend(); + self.stream.send_many(&[notice, ready]).await?; + return Ok(()); + } + // any other logical‐txn error is fatal + Err(e) => return Err(e.into()), + // on Ok, fall through to send CommandComplete + Ok(()) => {} + } + + // build and send COMMIT/ROLLBACK + READY let cmd = if rollback { - self.logical_transaction.rollback()?; CommandComplete::new_rollback() } else { - self.logical_transaction.commit()?; CommandComplete::new_commit() }; - let mut messages = if !self.in_transaction() { - vec![NoticeResponse::from(ErrorResponse::no_transaction()).message()?] - } else { - vec![] - }; - messages.push(cmd.message()?.backend()); - messages.push(ReadyForQuery::idle().message()?); - self.stream.send_many(&messages).await?; + + let complete_msg = cmd.message()?.backend(); + let ready_msg = ReadyForQuery::idle().message()?.backend(); + + self.stream.send_many(&[complete_msg, ready_msg]).await?; + debug!("transaction ended"); Ok(()) } diff --git a/pgdog/src/frontend/logical_transaction.rs b/pgdog/src/frontend/logical_transaction.rs index 6061d332..084a552d 100644 --- a/pgdog/src/frontend/logical_transaction.rs +++ b/pgdog/src/frontend/logical_transaction.rs @@ -106,7 +106,7 @@ impl LogicalTransaction { self.touch_shard(shard)?; match self.status { - TransactionStatus::Idle => Err(TransactionError::ExpectedPendingOrActive), + TransactionStatus::Idle => Err(TransactionError::ExpectedActive), TransactionStatus::BeginPending => { self.status = TransactionStatus::InProgress; Ok(()) @@ -240,8 +240,6 @@ impl LogicalTransaction { pub enum TransactionError { // Transaction lifecycle ExpectedIdle, - ExpectedPending, - ExpectedPendingOrActive, ExpectedActive, // Sharding policy @@ -253,10 +251,8 @@ impl fmt::Display for TransactionError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use TransactionError::*; match self { - ExpectedIdle => write!(f, "transaction already started"), - ExpectedPending => write!(f, "transaction not pending"), - ExpectedActive => write!(f, "no active transaction"), - ExpectedPendingOrActive => write!(f, "no active/pending transaction"), + ExpectedIdle => write!(f, "there is already a transaction in progress"), + ExpectedActive => write!(f, "there is no transaction in progress"), InvalidShardType => write!(f, "sharding hints must be ::Direct(n)"), ShardConflict => { write!(f, "can't run a transaction on multiple shards") @@ -358,7 +354,7 @@ mod tests { fn test_execute_query_from_idle_errors() { let mut tx = LogicalTransaction::new(); let err = tx.execute_query(Shard::Direct(0)).unwrap_err(); - assert!(matches!(err, TransactionError::ExpectedPendingOrActive)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] @@ -372,7 +368,7 @@ mod tests { ltx.commit().unwrap(); let err = ltx.execute_query(Shard::Direct(0)).unwrap_err(); - assert!(matches!(err, TransactionError::ExpectedPendingOrActive)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] @@ -622,7 +618,7 @@ mod tests { tx.rollback().unwrap(); let err = tx.execute_query(Shard::Direct(0)).unwrap_err(); - assert!(matches!(err, TransactionError::ExpectedPendingOrActive)); + assert!(matches!(err, TransactionError::ExpectedActive)); } #[test] From 6f88b4200b9daeeb8410c599b9faaf5a03816518 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 15:24:17 -0400 Subject: [PATCH 11/25] wip --- Cargo.lock | 5 +- pgdog/Cargo.toml | 1 + pgdog/src/frontend/client/mod.rs | 67 ++++++++++++++---------- pgdog/src/net/messages/error_response.rs | 10 ++++ 4 files changed, 52 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f0c81f9..8375ea2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2289,6 +2289,7 @@ dependencies = [ "serde", "serde_json", "sha1", + "smallvec", "socket2", "thiserror 2.0.12", "tikv-jemallocator", @@ -3322,9 +3323,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.15.0" +version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" dependencies = [ "serde", ] diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 238ee373..e00c1e5c 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -59,6 +59,7 @@ indexmap = "2.9" lru = "0.16" hickory-resolver = "0.25.2" lazy_static = "1" +smallvec = "1.15.1" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 50c902a1..e2209b9c 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -5,13 +5,15 @@ use std::time::Instant; use bytes::BytesMut; use engine::EngineContext; +use smallvec::SmallVec; use timeouts::Timeouts; use tokio::time::timeout; use tokio::{select, spawn}; use tracing::{debug, enabled, error, info, trace, Level as LogLevel}; -use super::logical_transaction::{LogicalTransaction, TransactionStatus}; +use super::logical_transaction::{LogicalTransaction, TransactionError, TransactionStatus}; use super::{Buffer, Command, Comms, Error, PreparedStatements}; + use crate::auth::{md5, scram::Server}; use crate::backend::{ databases, @@ -19,7 +21,6 @@ use crate::backend::{ }; use crate::config::{self, AuthType}; use crate::frontend::buffer::BufferedQuery; -use crate::frontend::logical_transaction::TransactionError; #[cfg(debug_assertions)] use crate::frontend::QueryLogger; use crate::net::messages::{ @@ -645,17 +646,18 @@ impl Client { // ReadyForQuery (B) if code == 'Z' { inner.stats.query(); - // 1) Should we logically be in‐txn? - // In transaction if buffered BEGIN from client or server is telling us we are. + // 1) Does the backend server say we're in a transaction? let should_be_tx = message.in_transaction() || inner.start_transaction.is_some(); + + // 2) Is the frontend client in a logical transaction? let in_transaction = self.logical_transaction.in_transaction(); - // 2) Reconcile against our LogicalTransaction + // 3) Reconcile against our LogicalTransaction if should_be_tx && !in_transaction { self.logical_transaction.soft_begin()?; } if !should_be_tx && in_transaction { - self.logical_transaction.reset(); // COMMIT/ROLLBACK just happened? + self.logical_transaction.reset(); } inner.stats.idle(self.logical_transaction.in_transaction()); @@ -786,15 +788,27 @@ impl Client { /// Tell the client we started a transaction. async fn start_transaction(&mut self) -> Result<(), Error> { - self.logical_transaction.soft_begin()?; + // stack‐allocate up to 3 messages: optional NOTICE + BEGIN + Ready + let mut messages: SmallVec<[Message; 3]> = SmallVec::new(); - self.stream - .send_many(&[ - CommandComplete::new_begin().message()?.backend(), - ReadyForQuery::in_transaction(true).message()?, - ]) - .await?; + match self.logical_transaction.soft_begin() { + Err(TransactionError::ExpectedActive) => { + let notice = NoticeResponse::from(ErrorResponse::already_in_transaction()) + .message()? + .backend(); + + messages.push(notice); + } + Err(e) => return Err(e.into()), // any other error is fatal + Ok(()) => {} + } + // push the BEGIN + in-transaction ready + messages.push(CommandComplete::new_begin().message()?.backend()); + messages.push(ReadyForQuery::in_transaction(true).message()?.backend()); + + // send all messages + self.stream.send_many(&messages).await?; debug!("transaction started"); Ok(()) @@ -805,7 +819,9 @@ impl Client { /// This avoids connecting to servers when clients start and commit transactions /// with no queries. async fn end_transaction(&mut self, rollback: bool) -> Result<(), Error> { - // attempt to commit or rollback the logical transaction + // stack‐allocate up to 3 messages: NOTICE + COMMIT/ROLLBACK + READY + let mut messages: SmallVec<[Message; 3]> = SmallVec::new(); + let logical_result = if rollback { self.logical_transaction.rollback() } else { @@ -813,33 +829,26 @@ impl Client { }; match logical_result { - // no transaction in progress → send a NOTICE and READY Err(TransactionError::ExpectedActive) => { - let notice = NoticeResponse::from(ErrorResponse::no_transaction()) - .message()? - .backend(); - let ready = ReadyForQuery::idle().message()?.backend(); - self.stream.send_many(&[notice, ready]).await?; - return Ok(()); + messages.push( + NoticeResponse::from(ErrorResponse::no_transaction()) + .message()? + .backend(), + ); } - // any other logical‐txn error is fatal Err(e) => return Err(e.into()), - // on Ok, fall through to send CommandComplete Ok(()) => {} } - // build and send COMMIT/ROLLBACK + READY let cmd = if rollback { CommandComplete::new_rollback() } else { CommandComplete::new_commit() }; + messages.push(cmd.message()?.backend()); + messages.push(ReadyForQuery::idle().message()?.backend()); - let complete_msg = cmd.message()?.backend(); - let ready_msg = ReadyForQuery::idle().message()?.backend(); - - self.stream.send_many(&[complete_msg, ready_msg]).await?; - + self.stream.send_many(&messages).await?; debug!("transaction ended"); Ok(()) } diff --git a/pgdog/src/net/messages/error_response.rs b/pgdog/src/net/messages/error_response.rs index 91109066..39fa0b63 100644 --- a/pgdog/src/net/messages/error_response.rs +++ b/pgdog/src/net/messages/error_response.rs @@ -138,6 +138,16 @@ impl ErrorResponse { ..Default::default() } } + + /// Warning for issuing BEGIN inside an existing transaction. + pub fn already_in_transaction() -> ErrorResponse { + ErrorResponse { + severity: "WARNING".into(), + code: "25001".into(), + message: "there is already a transaction in progress".into(), + ..Default::default() + } + } } impl Display for ErrorResponse { From 4b4d0d9fe892c1236758504e5e6e4ff39008ebbc Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 15:33:25 -0400 Subject: [PATCH 12/25] wip --- pgdog/src/frontend/client/mod.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index e2209b9c..8d9e0cca 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -461,7 +461,7 @@ impl Client { self.end_transaction(true).await?; self.logical_transaction.rollback()?; - inner.done(self.logical_transaction.in_transaction()); + inner.done(self.in_transaction()); return Ok(false); } @@ -471,7 +471,7 @@ impl Client { self.end_transaction(false).await?; self.logical_transaction.commit()?; - inner.done(self.logical_transaction.in_transaction()); + inner.done(self.in_transaction()); return Ok(false); } // How many shards are configured. @@ -480,14 +480,13 @@ impl Client { let mut dr = DataRow::new(); dr.add(*shards as i64); let cc = CommandComplete::from_str("SHOW"); - let rfq = - ReadyForQuery::in_transaction(self.logical_transaction.in_transaction()); + let rfq = ReadyForQuery::in_transaction(self.in_transaction()); self.stream .send_many(&[rd.message()?, dr.message()?, cc.message()?, rfq.message()?]) .await?; - inner.done(self.logical_transaction.in_transaction()); + inner.done(self.in_transaction()); return Ok(false); } Some(Command::Deallocate) => { @@ -510,7 +509,7 @@ impl Client { self.logical_transaction.in_transaction(), ) .await?; - inner.done(self.logical_transaction.in_transaction()); + inner.done(self.in_transaction()); inner.reset_router(); return Ok(false); } @@ -562,15 +561,12 @@ impl Client { if err.no_server() { error!("{} [{}]", err, self.addr); self.stream - .error( - ErrorResponse::from_err(&err), - self.logical_transaction.in_transaction(), - ) + .error(ErrorResponse::from_err(&err), self.in_transaction()) .await?; // TODO: should this be wrapped in a method? inner.disconnect(); inner.reset_router(); - inner.done(self.logical_transaction.in_transaction()); + inner.done(self.in_transaction()); return Ok(false); } else { return Err(err.into()); @@ -650,7 +646,7 @@ impl Client { let should_be_tx = message.in_transaction() || inner.start_transaction.is_some(); // 2) Is the frontend client in a logical transaction? - let in_transaction = self.logical_transaction.in_transaction(); + let in_transaction = self.in_transaction(); // 3) Reconcile against our LogicalTransaction if should_be_tx && !in_transaction { From 08fcb595693f5861a839558209efbcb5ee51e308 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 15:54:13 -0400 Subject: [PATCH 13/25] remove-comment --- pgdog/src/frontend/client/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 8d9e0cca..1ebb105f 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -54,7 +54,6 @@ pub struct Client { streaming: bool, shutdown: bool, prepared_statements: PreparedStatements, - // in_transaction: bool, logical_transaction: LogicalTransaction, timeouts: Timeouts, request_buffer: Buffer, From 33f1ec5b3cd002cfd7396d7f5af69675475f61cd Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Fri, 8 Aug 2025 15:55:21 -0400 Subject: [PATCH 14/25] wip --- pgdog/src/frontend/client/engine/context.rs | 4 ++++ pgdog/src/frontend/client/engine/mod.rs | 6 ++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pgdog/src/frontend/client/engine/context.rs b/pgdog/src/frontend/client/engine/context.rs index 7e436131..98cf572e 100644 --- a/pgdog/src/frontend/client/engine/context.rs +++ b/pgdog/src/frontend/client/engine/context.rs @@ -30,4 +30,8 @@ impl<'a> EngineContext<'a> { buffer: &client.request_buffer, } } + + pub fn in_transaction(&self) -> bool { + self.logical_transaction.in_transaction() + } } diff --git a/pgdog/src/frontend/client/engine/mod.rs b/pgdog/src/frontend/client/engine/mod.rs index 747dbf9f..8bf7d3e8 100644 --- a/pgdog/src/frontend/client/engine/mod.rs +++ b/pgdog/src/frontend/client/engine/mod.rs @@ -54,10 +54,8 @@ impl<'a> Engine<'a> { 'S' => { if only_close || only_sync && !self.context.connected { messages.push( - ReadyForQuery::in_transaction( - self.context.logical_transaction.in_transaction(), - ) - .message()?, + ReadyForQuery::in_transaction(self.context.in_transaction()) + .message()?, ) } } From f57baa72312996a14c764d758a24f13009269962 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Sat, 9 Aug 2025 20:09:37 -0400 Subject: [PATCH 15/25] wip --- pgdog/src/frontend/client/inner.rs | 8 ++------ pgdog/src/frontend/client/mod.rs | 33 ++++++++---------------------- 2 files changed, 10 insertions(+), 31 deletions(-) diff --git a/pgdog/src/frontend/client/inner.rs b/pgdog/src/frontend/client/inner.rs index f40ed938..3f36a9a3 100644 --- a/pgdog/src/frontend/client/inner.rs +++ b/pgdog/src/frontend/client/inner.rs @@ -6,9 +6,8 @@ use crate::{ Error as BackendError, }, frontend::{ - buffer::BufferedQuery, logical_transaction::LogicalTransaction, - router::Error as RouterError, Buffer, Command, Comms, PreparedStatements, Router, - RouterContext, Stats, + logical_transaction::LogicalTransaction, router::Error as RouterError, Buffer, Command, + Comms, PreparedStatements, Router, RouterContext, Stats, }, net::Parameters, state::State, @@ -30,8 +29,6 @@ pub struct Inner { pub(super) router: Router, /// Client stats. pub(super) stats: Stats, - /// Start transaction statement, intercepted by the router. - pub(super) start_transaction: Option, /// Client-wide comms. pub(super) comms: Comms, } @@ -48,7 +45,6 @@ impl Inner { backend, router, stats: Stats::new(), - start_transaction: None, comms: client.comms.clone(), }) } diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 1ebb105f..38bed2be 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -320,7 +320,7 @@ impl Client { select! { _ = shutdown.notified() => { - if !inner.backend.connected() && inner.start_transaction.is_none() { + if !inner.backend.connected() && !self.in_transaction() { break; } } @@ -448,25 +448,18 @@ impl Client { if let BufferedQuery::Query(_) = query { self.start_transaction().await?; - inner.start_transaction = Some(query.clone()); inner.done(self.in_transaction()); - return Ok(false); } } Some(Command::RollbackTransaction) => { - inner.start_transaction = None; - self.end_transaction(true).await?; self.logical_transaction.rollback()?; inner.done(self.in_transaction()); - return Ok(false); } Some(Command::CommitTransaction) => { - inner.start_transaction = None; - self.end_transaction(false).await?; self.logical_transaction.commit()?; @@ -574,16 +567,6 @@ impl Client { }; } - // We don't start a transaction on the servers until - // a client is actually executing something. - // - // This prevents us holding open connections to multiple servers - if self.request_buffer.executable() { - if let Some(query) = inner.start_transaction.take() { - inner.backend.execute(&query).await?; - } - } - for msg in self.request_buffer.iter() { if let ProtocolMessage::Bind(bind) = msg { inner.backend.bind(bind)? @@ -641,17 +624,17 @@ impl Client { // ReadyForQuery (B) if code == 'Z' { inner.stats.query(); - // 1) Does the backend server say we're in a transaction? - let should_be_tx = message.in_transaction() || inner.start_transaction.is_some(); + // Does the backend server say we're in a transaction? + let has_backend_tx = message.in_transaction(); - // 2) Is the frontend client in a logical transaction? - let in_transaction = self.in_transaction(); + // Is the frontend client in a logical transaction? + let has_logical_tx = self.in_transaction(); - // 3) Reconcile against our LogicalTransaction - if should_be_tx && !in_transaction { + // Reconcile, QueryParser might not be active and backend is the source of truth + if has_backend_tx && !has_logical_tx { self.logical_transaction.soft_begin()?; } - if !should_be_tx && in_transaction { + if !has_backend_tx && has_logical_tx { self.logical_transaction.reset(); } From 3349342eea0174a095965a9badbcba59c85bf66e Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Sat, 9 Aug 2025 20:10:14 -0400 Subject: [PATCH 16/25] remove_unused_function --- pgdog/src/frontend/buffer.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pgdog/src/frontend/buffer.rs b/pgdog/src/frontend/buffer.rs index 1d1dc793..8957e8cc 100644 --- a/pgdog/src/frontend/buffer.rs +++ b/pgdog/src/frontend/buffer.rs @@ -139,14 +139,6 @@ impl Buffer { .unwrap_or(false) } - /// The client is setting state on the connection - /// which we can no longer ignore. - pub(crate) fn executable(&self) -> bool { - self.buffer - .iter() - .any(|m| ['E', 'Q', 'B'].contains(&m.code())) - } - /// Rewrite query in buffer. pub fn rewrite(&mut self, query: &str) -> Result<(), Error> { if self.buffer.iter().any(|c| c.code() != 'Q') { From 90b8ba9b7bae64f1e4aa31aa192cf76e61564797 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Sat, 9 Aug 2025 20:38:19 -0400 Subject: [PATCH 17/25] checkpoint --- pgdog/src/frontend/client/mod.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 38bed2be..daa4e4c9 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -630,15 +630,14 @@ impl Client { // Is the frontend client in a logical transaction? let has_logical_tx = self.in_transaction(); - // Reconcile, QueryParser might not be active and backend is the source of truth - if has_backend_tx && !has_logical_tx { - self.logical_transaction.soft_begin()?; - } - if !has_backend_tx && has_logical_tx { - self.logical_transaction.reset(); - } + // In transaction if buffered BEGIN from client or server is telling us we are. + let in_transaction_for_stats = has_backend_tx || has_logical_tx; + + println!("has_backend: {}", has_backend_tx); + println!("has_logical: {}", has_logical_tx); + println!("global: {}", in_transaction_for_stats); - inner.stats.idle(self.logical_transaction.in_transaction()); + inner.stats.idle(in_transaction_for_stats); // Flush mirrors. if !self.logical_transaction.in_transaction() { From b15c05831635b15ee7c418da5933a5622d2bd7f8 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Sat, 9 Aug 2025 20:53:14 -0400 Subject: [PATCH 18/25] checkpoint --- pgdog/src/backend/pool/connection/binding.rs | 22 ++++++++++++++++++ pgdog/src/frontend/buffer.rs | 8 +++++++ pgdog/src/frontend/client/mod.rs | 24 +++++++++++++++----- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 9bbea4fe..22707498 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -237,6 +237,28 @@ impl Binding { Ok(()) } + /// Execute a BEGIN on all servers + /// TODO: Block mutli-shard BEGINs as transaction should not occur on multiple shards + pub async fn begin(&mut self) -> Result<(), Error> { + let begin: &str = "BEGIN;"; + + match self { + Binding::Server(Some(ref mut server)) => { + server.execute(&begin).await?; + } + + Binding::MultiShard(ref mut servers, _) => { + for server in servers { + server.execute(&begin).await?; + } + } + + _ => (), + } + + Ok(()) + } + pub async fn link_client(&mut self, params: &Parameters) -> Result { match self { Binding::Server(Some(ref mut server)) => server.link_client(params).await, diff --git a/pgdog/src/frontend/buffer.rs b/pgdog/src/frontend/buffer.rs index 8957e8cc..1d1dc793 100644 --- a/pgdog/src/frontend/buffer.rs +++ b/pgdog/src/frontend/buffer.rs @@ -139,6 +139,14 @@ impl Buffer { .unwrap_or(false) } + /// The client is setting state on the connection + /// which we can no longer ignore. + pub(crate) fn executable(&self) -> bool { + self.buffer + .iter() + .any(|m| ['E', 'Q', 'B'].contains(&m.code())) + } + /// Rewrite query in buffer. pub fn rewrite(&mut self, query: &str) -> Result<(), Error> { if self.buffer.iter().any(|c| c.code() != 'Q') { diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index daa4e4c9..d67d85fb 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -567,6 +567,14 @@ impl Client { }; } + // We don't start a transaction on the servers until + // a client is actually executing something. + // + // This prevents us holding open connections to multiple servers + if self.should_trigger_buffered_begin() { + inner.backend.begin().await?; + } + for msg in self.request_buffer.iter() { if let ProtocolMessage::Bind(bind) = msg { inner.backend.bind(bind)? @@ -631,16 +639,16 @@ impl Client { let has_logical_tx = self.in_transaction(); // In transaction if buffered BEGIN from client or server is telling us we are. - let in_transaction_for_stats = has_backend_tx || has_logical_tx; + let in_transaction = has_backend_tx || has_logical_tx; println!("has_backend: {}", has_backend_tx); println!("has_logical: {}", has_logical_tx); - println!("global: {}", in_transaction_for_stats); + println!("global: {}", in_transaction); - inner.stats.idle(in_transaction_for_stats); + inner.stats.idle(in_transaction); // Flush mirrors. - if !self.logical_transaction.in_transaction() { + if !in_transaction { inner.backend.mirror_flush(); } } @@ -782,7 +790,7 @@ impl Client { // push the BEGIN + in-transaction ready messages.push(CommandComplete::new_begin().message()?.backend()); - messages.push(ReadyForQuery::in_transaction(true).message()?.backend()); + messages.push(ReadyForQuery::in_transaction(true).message()?); // send all messages self.stream.send_many(&messages).await?; @@ -823,7 +831,7 @@ impl Client { CommandComplete::new_commit() }; messages.push(cmd.message()?.backend()); - messages.push(ReadyForQuery::idle().message()?.backend()); + messages.push(ReadyForQuery::idle().message()?); self.stream.send_many(&messages).await?; debug!("transaction ended"); @@ -864,6 +872,10 @@ impl Client { self.logical_transaction.status == TransactionStatus::BeginPending || self.logical_transaction.status == TransactionStatus::InProgress } + + fn should_trigger_buffered_begin(&self) -> bool { + self.request_buffer.executable() && !self.in_transaction() + } } impl Drop for Client { From 374107fcb5c7519ebe9264f725993963a55702ba Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Sat, 9 Aug 2025 21:31:00 -0400 Subject: [PATCH 19/25] wip --- pgdog/src/backend/pool/connection/binding.rs | 21 +++----------------- pgdog/src/frontend/client/mod.rs | 2 -- pgdog/src/frontend/client/test/mod.rs | 7 +++++++ 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 22707498..0448d556 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -1,7 +1,7 @@ //! Binding between frontend client and a connection on the backend. use crate::{ - net::{parameter::Parameters, ProtocolMessage}, + net::{parameter::Parameters, ProtocolMessage, Query}, state::State, }; @@ -240,23 +240,8 @@ impl Binding { /// Execute a BEGIN on all servers /// TODO: Block mutli-shard BEGINs as transaction should not occur on multiple shards pub async fn begin(&mut self) -> Result<(), Error> { - let begin: &str = "BEGIN;"; - - match self { - Binding::Server(Some(ref mut server)) => { - server.execute(&begin).await?; - } - - Binding::MultiShard(ref mut servers, _) => { - for server in servers { - server.execute(&begin).await?; - } - } - - _ => (), - } - - Ok(()) + let query = Query::new("BEGIN"); + self.execute(query.query()).await } pub async fn link_client(&mut self, params: &Parameters) -> Result { diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index d67d85fb..9f7c2fa4 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -454,14 +454,12 @@ impl Client { } Some(Command::RollbackTransaction) => { self.end_transaction(true).await?; - self.logical_transaction.rollback()?; inner.done(self.in_transaction()); return Ok(false); } Some(Command::CommitTransaction) => { self.end_transaction(false).await?; - self.logical_transaction.commit()?; inner.done(self.in_transaction()); return Ok(false); diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index 7cfaf7ae..674cf665 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -511,6 +511,13 @@ async fn test_transaction_state() { read!(conn, ['2', 'D', 'C', 'Z']); + println!("router.route {:?}", inner.router.route()); + println!("router.route {:?}", inner.router.route()); + println!("router.route {:?}", inner.router.route()); + println!("router.route {:?}", inner.router.route()); + println!("router.route {:?}", inner.router.route()); + println!("router.route {:?}", inner.router.route()); + assert!(inner.router.routed()); assert!(client.in_transaction()); assert!(inner.router.route().is_write()); From 457abafc739d5702b2354dae4647d6b0177cecff Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Sat, 9 Aug 2025 21:35:14 -0400 Subject: [PATCH 20/25] wip --- pgdog/src/frontend/client/mod.rs | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 9f7c2fa4..c8002572 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -630,19 +630,9 @@ impl Client { // ReadyForQuery (B) if code == 'Z' { inner.stats.query(); - // Does the backend server say we're in a transaction? - let has_backend_tx = message.in_transaction(); - - // Is the frontend client in a logical transaction? - let has_logical_tx = self.in_transaction(); // In transaction if buffered BEGIN from client or server is telling us we are. - let in_transaction = has_backend_tx || has_logical_tx; - - println!("has_backend: {}", has_backend_tx); - println!("has_logical: {}", has_logical_tx); - println!("global: {}", in_transaction); - + let in_transaction = message.in_transaction() || self.in_transaction(); inner.stats.idle(in_transaction); // Flush mirrors. From b06fe28e6449ea0bcd5a9ce66c0df6cf7bacade1 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Mon, 11 Aug 2025 12:41:10 -0400 Subject: [PATCH 21/25] checkpoint --- pgdog/src/backend/pool/connection/mirror.rs | 6 +- pgdog/src/frontend/client/inner.rs | 10 +- pgdog/src/frontend/client/mod.rs | 23 +++- pgdog/src/frontend/client/test/mod.rs | 21 ++-- pgdog/src/frontend/logical_transaction.rs | 5 + pgdog/src/frontend/router/context.rs | 16 ++- pgdog/src/frontend/router/mod.rs | 5 - pgdog/src/frontend/router/parser/context.rs | 6 +- .../frontend/router/parser/query/explain.rs | 9 +- pgdog/src/frontend/router/parser/query/mod.rs | 9 -- pgdog/src/frontend/router/parser/query/set.rs | 2 +- .../src/frontend/router/parser/query/test.rs | 101 +++++++++++------- .../router/parser/query/transaction.rs | 1 - 13 files changed, 135 insertions(+), 79 deletions(-) diff --git a/pgdog/src/backend/pool/connection/mirror.rs b/pgdog/src/backend/pool/connection/mirror.rs index d183ace8..8aa94669 100644 --- a/pgdog/src/backend/pool/connection/mirror.rs +++ b/pgdog/src/backend/pool/connection/mirror.rs @@ -9,6 +9,7 @@ use tracing::{debug, error}; use crate::backend::Cluster; use crate::config::config; use crate::frontend::client::timeouts::Timeouts; +use crate::frontend::logical_transaction::LogicalTransaction; use crate::frontend::{Command, PreparedStatements, Router, RouterContext}; use crate::net::Parameters; use crate::state::State; @@ -47,6 +48,8 @@ pub(crate) struct Mirror { params: Parameters, /// Mirror state. state: State, + /// Logical transaction state. + logical_transaction: LogicalTransaction, } impl Mirror { @@ -71,6 +74,7 @@ impl Mirror { cluster: cluster.clone(), state: State::Idle, params: Parameters::default(), + logical_transaction: LogicalTransaction::new(), }; let query_timeout = Timeouts::from_config(&config.config.general); @@ -135,7 +139,7 @@ impl Mirror { &self.cluster, &mut self.prepared_statements, &self.params, - false, + &self.logical_transaction, ) { match self.router.query(context) { Ok(command) => { diff --git a/pgdog/src/frontend/client/inner.rs b/pgdog/src/frontend/client/inner.rs index 3f36a9a3..340fee54 100644 --- a/pgdog/src/frontend/client/inner.rs +++ b/pgdog/src/frontend/client/inner.rs @@ -64,11 +64,11 @@ impl Inner { .map(|cluster| { // Build router context. let context = RouterContext::new( - buffer, // Query and parameters. - cluster, // Cluster configuration. - prepared_statements, // Prepared statements. - params, // Client connection parameters. - logical_transaction.in_transaction(), // Client in explcitely started transaction. + buffer, // Query and parameters. + cluster, // Cluster configuration. + prepared_statements, // Prepared statements. + params, // Client connection parameters. + logical_transaction, // Client in explcitely started transaction. )?; self.router.query(context) }) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index c8002572..21e0efcc 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -371,6 +371,10 @@ impl Client { /// Handle client messages. async fn client_messages(&mut self, mut inner: InnerBorrow<'_>) -> Result { + println!(""); + println!(""); + println!("Starting Inner.router.route: {}", inner.router.route()); + inner .stats .received(self.request_buffer.total_message_len()); @@ -491,8 +495,15 @@ impl Client { return Ok(false); } - Some(Command::Query(query)) => { - if query.is_cross_shard() && self.cross_shard_disabled { + Some(Command::Query(route)) => { + if self.in_transaction() { + println!("I am in a transaction!"); + println!("shard: {}", route.shard().clone()); + let shard = route.shard().clone(); + self.logical_transaction.execute_query(shard)?; + } + + if route.is_cross_shard() && self.cross_shard_disabled { self.stream .error( ErrorResponse::cross_shard_disabled(), @@ -565,6 +576,8 @@ impl Client { }; } + println!("Inner.router.route: {}", inner.router.route()); + // We don't start a transaction on the servers until // a client is actually executing something. // @@ -631,10 +644,15 @@ impl Client { if code == 'Z' { inner.stats.query(); + println!("message in transaction: {}", message.in_transaction()); + // In transaction if buffered BEGIN from client or server is telling us we are. let in_transaction = message.in_transaction() || self.in_transaction(); inner.stats.idle(in_transaction); + println!("in transaction: {}", in_transaction); + println!("in message transaction: {}", message.in_transaction()); + // Flush mirrors. if !in_transaction { inner.backend.mirror_flush(); @@ -648,6 +666,7 @@ impl Client { // Flushing can take a minute and we don't want to block // the connection from being reused. if inner.backend.done() { + println!("I AM DONE! "); let changed_params = inner.backend.changed_params(); if inner.transaction_mode() && !self.replication_mode { inner.disconnect(); diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index 674cf665..08b87aa4 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -454,7 +454,7 @@ async fn test_transaction_state() { assert!(client.in_transaction()); assert!(inner.router.route().is_write()); - assert!(inner.router.in_transaction()); + assert!(client.logical_transaction.in_transaction()); conn.write_all(&buffer!( { Parse::named("test", "SELECT $1") }, @@ -470,7 +470,7 @@ async fn test_transaction_state() { assert!(inner.router.routed()); assert!(client.in_transaction()); assert!(inner.router.route().is_write()); - assert!(inner.router.in_transaction()); + assert!(client.logical_transaction.in_transaction()); for c in ['1', 't', 'T', 'Z'] { let msg = inner.backend.read().await.unwrap(); @@ -506,27 +506,26 @@ async fn test_transaction_state() { let msg = inner.backend.read().await.unwrap(); assert_eq!(msg.code(), c); + println!("Loop -> {} :: pre-message {:?}", c, inner.router.route()); client.server_message(&mut inner.get(), msg).await.unwrap(); + println!("Loop -> {} :: post-message {:?}", c, inner.router.route()); } - read!(conn, ['2', 'D', 'C', 'Z']); + println!("Out of loop route {:?}", inner.router.route()); - println!("router.route {:?}", inner.router.route()); - println!("router.route {:?}", inner.router.route()); - println!("router.route {:?}", inner.router.route()); - println!("router.route {:?}", inner.router.route()); - println!("router.route {:?}", inner.router.route()); - println!("router.route {:?}", inner.router.route()); + read!(conn, ['2', 'D', 'C', 'Z']); - assert!(inner.router.routed()); + // assert!(inner.router.routed()); assert!(client.in_transaction()); assert!(inner.router.route().is_write()); - assert!(inner.router.in_transaction()); + assert!(client.logical_transaction.in_transaction()); conn.write_all(&buffer!({ Query::new("COMMIT") })) .await .unwrap(); + println!("I JUST COMMITED IN THE TESTS!"); + client.buffer(&State::Idle).await.unwrap(); client.client_messages(inner.get()).await.unwrap(); diff --git a/pgdog/src/frontend/logical_transaction.rs b/pgdog/src/frontend/logical_transaction.rs index 084a552d..f1ef41b3 100644 --- a/pgdog/src/frontend/logical_transaction.rs +++ b/pgdog/src/frontend/logical_transaction.rs @@ -81,6 +81,7 @@ impl LogicalTransaction { /// - `AlreadyInTransaction` if already `BeginPending` or `InProgress`. /// - `AlreadyFinalized` if `Committed` or `RolledBack`.tx or finalized. pub fn soft_begin(&mut self) -> Result<(), TransactionError> { + println!("soft begin"); match self.status { TransactionStatus::Idle => { self.status = TransactionStatus::BeginPending; @@ -103,6 +104,7 @@ impl LogicalTransaction { /// - `InvalidManualShardType` if `shard` is not `Shard::Direct(_)`. /// - `ShardConflict` if `active_shard` is set to a different shard. pub fn execute_query(&mut self, shard: Shard) -> Result<(), TransactionError> { + println!("EXECUTE QUERY"); self.touch_shard(shard)?; match self.status { @@ -124,6 +126,7 @@ impl LogicalTransaction { /// - `NoActiveTransaction` if `BeginPending` (nothing ran). /// - `AlreadyFinalized` if already `Committed` or `RolledBack`. pub fn commit(&mut self) -> Result<(), TransactionError> { + println!("COMMIT TRANSACTION"); match self.status { TransactionStatus::Idle => Err(TransactionError::ExpectedActive), TransactionStatus::BeginPending => { @@ -146,6 +149,7 @@ impl LogicalTransaction { /// - `NoActiveTransaction` if `BeginPending` (nothing ran). /// - `AlreadyFinalized` if already `Committed` or `RolledBack`. pub fn rollback(&mut self) -> Result<(), TransactionError> { + println!("ROLLBACK TRANSACTION"); match self.status { TransactionStatus::Idle => Err(TransactionError::ExpectedActive), TransactionStatus::BeginPending => { @@ -164,6 +168,7 @@ impl LogicalTransaction { /// Sets status to `Idle`, clears manual and dirty shard /// Safe to call in any state. pub fn reset(&mut self) { + println!("RESET"); self.status = TransactionStatus::Idle; self.manual_shard = None; self.dirty_shard = None; diff --git a/pgdog/src/frontend/router/context.rs b/pgdog/src/frontend/router/context.rs index 117a2dbf..30f7ea20 100644 --- a/pgdog/src/frontend/router/context.rs +++ b/pgdog/src/frontend/router/context.rs @@ -1,7 +1,9 @@ use super::Error; use crate::{ backend::Cluster, - frontend::{buffer::BufferedQuery, Buffer, PreparedStatements}, + frontend::{ + buffer::BufferedQuery, logical_transaction::LogicalTransaction, Buffer, PreparedStatements, + }, net::{Bind, Parameters}, }; @@ -17,10 +19,10 @@ pub struct RouterContext<'a> { pub cluster: &'a Cluster, /// Client parameters, e.g. search_path. pub params: &'a Parameters, - /// Client inside transaction, - pub in_transaction: bool, /// Currently executing COPY statement. pub copy_mode: bool, + /// Client's logical_transaction struct, + pub logical_transaction: &'a LogicalTransaction, } impl<'a> RouterContext<'a> { @@ -29,7 +31,7 @@ impl<'a> RouterContext<'a> { cluster: &'a Cluster, stmt: &'a mut PreparedStatements, params: &'a Parameters, - in_transaction: bool, + logical_transaction: &'a LogicalTransaction, ) -> Result { let query = buffer.query()?; let bind = buffer.parameters()?; @@ -40,9 +42,13 @@ impl<'a> RouterContext<'a> { bind, params, prepared_statements: stmt, + logical_transaction, cluster, - in_transaction, copy_mode, }) } + + pub fn in_transaction(&self) -> bool { + self.logical_transaction.in_transaction() + } } diff --git a/pgdog/src/frontend/router/mod.rs b/pgdog/src/frontend/router/mod.rs index 2629139c..4feac537 100644 --- a/pgdog/src/frontend/router/mod.rs +++ b/pgdog/src/frontend/router/mod.rs @@ -98,11 +98,6 @@ impl Router { self.routed } - /// Query parser is inside a transaction. - pub fn in_transaction(&self) -> bool { - self.query_parser.in_transaction() - } - /// Get last commmand computed by the query parser. pub fn command(&self) -> &Command { &self.latest_command diff --git a/pgdog/src/frontend/router/parser/context.rs b/pgdog/src/frontend/router/parser/context.rs index 51d84d66..3c50bbaa 100644 --- a/pgdog/src/frontend/router/parser/context.rs +++ b/pgdog/src/frontend/router/parser/context.rs @@ -60,7 +60,7 @@ impl<'a> QueryParserContext<'a> { /// Write override enabled? pub(super) fn write_override(&self) -> bool { - self.router_context.in_transaction && self.rw_conservative() + self.in_transaction() && self.rw_conservative() } /// Are we using the conservative read/write separation strategy? @@ -92,4 +92,8 @@ impl<'a> QueryParserContext<'a> { pub(super) fn multi_tenant(&self) -> &Option { self.multi_tenant } + + pub(super) fn in_transaction(&self) -> bool { + self.router_context.in_transaction() + } } diff --git a/pgdog/src/frontend/router/parser/query/explain.rs b/pgdog/src/frontend/router/parser/query/explain.rs index a95ff5be..86d15745 100644 --- a/pgdog/src/frontend/router/parser/query/explain.rs +++ b/pgdog/src/frontend/router/parser/query/explain.rs @@ -28,6 +28,7 @@ mod tests { use super::*; use crate::backend::Cluster; + use crate::frontend::logical_transaction::LogicalTransaction; use crate::frontend::{Buffer, PreparedStatements, RouterContext}; use crate::net::messages::{Bind, Parameter, Parse, Query}; use crate::net::Parameters; @@ -39,8 +40,10 @@ mod tests { let cluster = Cluster::new_test(); let mut stmts = PreparedStatements::default(); let params = Parameters::default(); + let logical_transaction = LogicalTransaction::new(); - let ctx = RouterContext::new(&buffer, &cluster, &mut stmts, ¶ms, false).unwrap(); + let ctx = RouterContext::new(&buffer, &cluster, &mut stmts, ¶ms, &logical_transaction) + .unwrap(); match QueryParser::default().parse(ctx).unwrap().clone() { Command::Query(route) => route, @@ -65,8 +68,10 @@ mod tests { let cluster = Cluster::new_test(); let mut stmts = PreparedStatements::default(); let params = Parameters::default(); + let logical_transaction = LogicalTransaction::new(); - let ctx = RouterContext::new(&buffer, &cluster, &mut stmts, ¶ms, false).unwrap(); + let ctx = RouterContext::new(&buffer, &cluster, &mut stmts, ¶ms, &logical_transaction) + .unwrap(); match QueryParser::default().parse(ctx).unwrap().clone() { Command::Query(route) => route, diff --git a/pgdog/src/frontend/router/parser/query/mod.rs b/pgdog/src/frontend/router/parser/query/mod.rs index fae8727c..12be9f16 100644 --- a/pgdog/src/frontend/router/parser/query/mod.rs +++ b/pgdog/src/frontend/router/parser/query/mod.rs @@ -49,8 +49,6 @@ use tracing::{debug, trace}; /// #[derive(Debug)] pub struct QueryParser { - // The statement is executed inside a tranasction. - in_transaction: bool, // No matter what query is executed, we'll send it to the primary. write_override: bool, // Currently calculated shard. @@ -60,7 +58,6 @@ pub struct QueryParser { impl Default for QueryParser { fn default() -> Self { Self { - in_transaction: false, write_override: false, shard: Shard::All, } @@ -68,17 +65,11 @@ impl Default for QueryParser { } impl QueryParser { - /// Indicates we are in a transaction. - pub fn in_transaction(&self) -> bool { - self.in_transaction - } - /// Parse a query and return a command. pub fn parse(&mut self, context: RouterContext) -> Result { let mut qp_context = QueryParserContext::new(context); let mut command = if qp_context.query().is_ok() { - self.in_transaction = qp_context.router_context.in_transaction; self.write_override = qp_context.write_override(); self.query(&mut qp_context)? diff --git a/pgdog/src/frontend/router/parser/query/set.rs b/pgdog/src/frontend/router/parser/query/set.rs index cba3effd..7f053c41 100644 --- a/pgdog/src/frontend/router/parser/query/set.rs +++ b/pgdog/src/frontend/router/parser/query/set.rs @@ -60,7 +60,7 @@ impl QueryParser { // TODO: Handle SET commands for updating client // params without touching the server. name => { - if !self.in_transaction { + if !context.in_transaction() { let mut value = vec![]; for node in &stmt.args { diff --git a/pgdog/src/frontend/router/parser/query/test.rs b/pgdog/src/frontend/router/parser/query/test.rs index 799b251f..1c74ccc8 100644 --- a/pgdog/src/frontend/router/parser/query/test.rs +++ b/pgdog/src/frontend/router/parser/query/test.rs @@ -1,14 +1,16 @@ -use crate::net::{ - messages::{parse::Parse, Parameter}, - Close, Format, Sync, -}; - use super::{super::Shard, *}; + use crate::backend::Cluster; use crate::config::ReadWriteStrategy; -use crate::frontend::{Buffer, PreparedStatements, RouterContext}; +use crate::frontend::{ + logical_transaction::LogicalTransaction, Buffer, PreparedStatements, RouterContext, +}; use crate::net::messages::Query; use crate::net::Parameters; +use crate::net::{ + messages::{parse::Parse, Parameter}, + Close, Format, Sync, +}; macro_rules! command { ($query:expr) => {{ @@ -18,7 +20,10 @@ macro_rules! command { let cluster = Cluster::new_test(); let mut stmt = PreparedStatements::default(); let params = Parameters::default(); - let context = RouterContext::new(&buffer, &cluster, &mut stmt, ¶ms, false).unwrap(); + let logical_transaction = LogicalTransaction::new(); + let context = + RouterContext::new(&buffer, &cluster, &mut stmt, ¶ms, &logical_transaction) + .unwrap(); let command = query_parser.parse(context).unwrap().clone(); (command, query_parser) @@ -44,9 +49,21 @@ macro_rules! query_parser { let mut prep_stmts = PreparedStatements::default(); let params = Parameters::default(); let buffer: Buffer = vec![$query.into()].into(); - let router_context = - RouterContext::new(&buffer, &cluster, &mut prep_stmts, ¶ms, $in_transaction) - .unwrap(); + let mut logical_transaction = LogicalTransaction::new(); + + if $in_transaction { + logical_transaction.soft_begin().unwrap(); + } + + let router_context = RouterContext::new( + &buffer, + &cluster, + &mut prep_stmts, + ¶ms, + &logical_transaction, + ) + .unwrap(); + $qp.parse(router_context).unwrap() }}; @@ -69,6 +86,7 @@ macro_rules! parse { data: p.to_vec(), }) .collect::>(); + let logical_transaction = LogicalTransaction::new(); let bind = Bind::new_params_codes($name, ¶ms, $codes); let route = QueryParser::default() .parse( @@ -77,7 +95,7 @@ macro_rules! parse { &Cluster::new_test(), &mut PreparedStatements::default(), &Parameters::default(), - false, + &logical_transaction, ) .unwrap(), ) @@ -165,23 +183,20 @@ fn test_omni() { let q = "SELECT sharded_omni.* FROM sharded_omni WHERE sharded_omni.id = $1"; let route = query!(q); assert!(matches!(route.shard(), Shard::Direct(_))); - let (_, qp) = command!(q); - assert!(!qp.in_transaction); + let (_, _qp) = command!(q); } #[test] fn test_set() { let route = query!(r#"SET "pgdog.shard" TO 1"#); assert_eq!(route.shard(), &Shard::Direct(1)); - let (_, qp) = command!(r#"SET "pgdog.shard" TO 1"#); - assert!(!qp.in_transaction); + let (_, _qp) = command!(r#"SET "pgdog.shard" TO 1"#); let route = query!(r#"SET "pgdog.sharding_key" TO '11'"#); assert_eq!(route.shard(), &Shard::Direct(1)); - let (_, qp) = command!(r#"SET "pgdog.sharding_key" TO '11'"#); - assert!(!qp.in_transaction); + let (_, _qp) = command!(r#"SET "pgdog.sharding_key" TO '11'"#); - for (command, qp) in [ + for (command, _qp) in [ command!("SET TimeZone TO 'UTC'"), command!("SET TIME ZONE 'UTC'"), ] { @@ -192,10 +207,9 @@ fn test_set() { } _ => panic!("not a set"), }; - assert!(!qp.in_transaction); } - let (command, qp) = command!("SET statement_timeout TO 3000"); + let (command, _qp) = command!("SET statement_timeout TO 3000"); match command { Command::Set { name, value } => { assert_eq!(name, "statement_timeout"); @@ -203,11 +217,10 @@ fn test_set() { } _ => panic!("not a set"), }; - assert!(!qp.in_transaction); // TODO: user shouldn't be able to set these. // The server will report an error on synchronization. - let (command, qp) = command!("SET is_superuser TO true"); + let (command, _qp) = command!("SET is_superuser TO true"); match command { Command::Set { name, value } => { assert_eq!(name, "is_superuser"); @@ -215,7 +228,6 @@ fn test_set() { } _ => panic!("not a set"), }; - assert!(!qp.in_transaction); let (_, mut qp) = command!("BEGIN"); assert!(qp.write_override); @@ -241,15 +253,24 @@ fn test_set() { let cluster = Cluster::new_test(); let mut prep_stmts = PreparedStatements::default(); let params = Parameters::default(); - let router_context = - RouterContext::new(&buffer, &cluster, &mut prep_stmts, ¶ms, true).unwrap(); + + let mut logical_transaction = LogicalTransaction::new(); + logical_transaction.soft_begin().unwrap(); + + let router_context = RouterContext::new( + &buffer, + &cluster, + &mut prep_stmts, + ¶ms, + &logical_transaction, + ) + .unwrap(); let mut context = QueryParserContext::new(router_context); for read_only in [true, false] { context.read_only = read_only; // Overriding context above. let mut qp = QueryParser::default(); - qp.in_transaction = true; let route = qp.query(&mut context).unwrap(); match route { @@ -269,7 +290,6 @@ fn test_transaction() { _ => panic!("not a query"), }; - assert!(qp.in_transaction); assert!(qp.write_override); let route = query_parser!(qp, Parse::named("test", "SELECT $1"), true); @@ -290,9 +310,7 @@ fn test_transaction() { command, Command::StartTransaction(BufferedQuery::Query(_)) )); - assert!(qp.in_transaction); - qp.in_transaction = true; let route = query_parser!( qp, Query::new("SET application_name TO 'test'"), @@ -323,9 +341,8 @@ fn test_begin_extended() { #[test] fn test_show_shards() { - let (cmd, qp) = command!("SHOW pgdog.shards"); + let (cmd, _qp) = command!("SHOW pgdog.shards"); assert!(matches!(cmd, Command::Shards(2))); - assert!(!qp.in_transaction); } #[test] @@ -355,10 +372,13 @@ fn test_cte() { fn test_function_begin() { let (cmd, mut qp) = command!("BEGIN"); assert!(matches!(cmd, Command::StartTransaction(_))); - assert!(qp.in_transaction); let cluster = Cluster::new_test(); let mut prep_stmts = PreparedStatements::default(); let params = Parameters::default(); + + let mut logical_transaction = LogicalTransaction::new(); + logical_transaction.soft_begin().unwrap(); + let buffer: Buffer = vec![Query::new( "SELECT ROW(t1.*) AS tt1, @@ -377,15 +397,22 @@ WHERE t2.account = ( ) .into()] .into(); - let router_context = - RouterContext::new(&buffer, &cluster, &mut prep_stmts, ¶ms, true).unwrap(); + + let router_context = RouterContext::new( + &buffer, + &cluster, + &mut prep_stmts, + ¶ms, + &logical_transaction, + ) + .unwrap(); + let mut context = QueryParserContext::new(router_context); let route = qp.query(&mut context).unwrap(); match route { Command::Query(query) => assert!(query.is_write()), _ => panic!("not a select"), } - assert!(qp.in_transaction); } #[test] @@ -433,8 +460,10 @@ fn test_close_direct_one_shard() { let buf: Buffer = vec![Close::named("test").into(), Sync.into()].into(); let mut pp = PreparedStatements::default(); let params = Parameters::default(); + let logical_transaction = LogicalTransaction::new(); - let context = RouterContext::new(&buf, &cluster, &mut pp, ¶ms, false).unwrap(); + let context = + RouterContext::new(&buf, &cluster, &mut pp, ¶ms, &logical_transaction).unwrap(); let cmd = qp.parse(context).unwrap(); diff --git a/pgdog/src/frontend/router/parser/query/transaction.rs b/pgdog/src/frontend/router/parser/query/transaction.rs index 9554b0f6..85ffaa5a 100644 --- a/pgdog/src/frontend/router/parser/query/transaction.rs +++ b/pgdog/src/frontend/router/parser/query/transaction.rs @@ -25,7 +25,6 @@ impl QueryParser { TransactionStmtKind::TransStmtCommit => return Ok(Command::CommitTransaction), TransactionStmtKind::TransStmtRollback => return Ok(Command::RollbackTransaction), TransactionStmtKind::TransStmtBegin | TransactionStmtKind::TransStmtStart => { - self.in_transaction = true; return Ok(Command::StartTransaction(context.query()?.clone())); } _ => Ok(Command::Query(Route::write(None))), From c0528d3a3090ec8ac811b78b0457ba1ec2ff02c5 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Mon, 11 Aug 2025 15:00:49 -0400 Subject: [PATCH 22/25] wip --- pgdog/src/frontend/client/mod.rs | 23 +++++++++++++++++------ pgdog/src/frontend/client/test/mod.rs | 10 ++++++++++ pgdog/src/frontend/logical_transaction.rs | 4 ---- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 21e0efcc..a193b7b3 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -312,8 +312,10 @@ impl Client { /// Run the client. async fn run(&mut self) -> Result<(), Error> { + println!("1"); let mut inner = Inner::new(self)?; let shutdown = self.comms.shutting_down(); + println!("2"); loop { let query_timeout = self.timeouts.query_timeout(&inner.stats.state); @@ -633,32 +635,33 @@ impl Client { let message = message.backend(); let has_more_messages = inner.backend.has_more_messages(); + println!("\n --> ONE"); + // Messages that we need to send to the client immediately. // ReadyForQuery (B) | CopyInResponse (B) | ErrorResponse(B) | NoticeResponse(B) | NotificationResponse (B) let flush = matches!(code, 'Z' | 'G' | 'E' | 'N' | 'A') || !has_more_messages || message.streaming(); + println!("\n --> TWO"); + // Server finished executing a query. // ReadyForQuery (B) if code == 'Z' { inner.stats.query(); - println!("message in transaction: {}", message.in_transaction()); - // In transaction if buffered BEGIN from client or server is telling us we are. let in_transaction = message.in_transaction() || self.in_transaction(); inner.stats.idle(in_transaction); - println!("in transaction: {}", in_transaction); - println!("in message transaction: {}", message.in_transaction()); - // Flush mirrors. if !in_transaction { inner.backend.mirror_flush(); } } + println!("\n --> THREE"); + inner.stats.sent(message.len()); // Release the connection back into the pool @@ -666,7 +669,7 @@ impl Client { // Flushing can take a minute and we don't want to block // the connection from being reused. if inner.backend.done() { - println!("I AM DONE! "); + println!("\n --> FOUR"); let changed_params = inner.backend.changed_params(); if inner.transaction_mode() && !self.replication_mode { inner.disconnect(); @@ -678,6 +681,8 @@ impl Client { inner.stats.last_transaction_time.as_secs_f64() * 1000.0 ); + println!("\n --> FIVE"); + // Update client params with values // sent from the server using ParameterStatus(B) messages. if !changed_params.is_empty() { @@ -689,17 +694,23 @@ impl Client { } } + println!("\n --> SIX"); + if flush { self.stream.send_flush(&message).await?; } else { self.stream.send(&message).await?; } + println!("\n --> SEVEN"); + // Pooler is offline or the client requested to disconnect and the transaction is done. if inner.backend.done() && (inner.comms.offline() || self.shutdown) && !self.admin { return Ok(true); } + println!("\n --> EIGHT"); + Ok(false) } diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index 08b87aa4..1eca032d 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -273,6 +273,7 @@ async fn test_client_extended() { let _ = read!(conn, ['1', '2', 't', 'T', 'D', 'C', 'Z']); + println!("this does not print"); handle.await.unwrap(); } @@ -529,15 +530,24 @@ async fn test_transaction_state() { client.buffer(&State::Idle).await.unwrap(); client.client_messages(inner.get()).await.unwrap(); + println!("\nONE"); + for c in ['C', 'Z'] { + println!("{} A", c); let msg = inner.backend.read().await.unwrap(); assert_eq!(msg.code(), c); + println!("{} B", c); client.server_message(&mut inner.get(), msg).await.unwrap(); + println!("{} C", c); } + println!("\nTWO"); + read!(conn, ['C', 'Z']); + println!("\nTHRE"); + assert!(!client.in_transaction()); assert!(!inner.router.routed()); } diff --git a/pgdog/src/frontend/logical_transaction.rs b/pgdog/src/frontend/logical_transaction.rs index f1ef41b3..b227c1cc 100644 --- a/pgdog/src/frontend/logical_transaction.rs +++ b/pgdog/src/frontend/logical_transaction.rs @@ -81,7 +81,6 @@ impl LogicalTransaction { /// - `AlreadyInTransaction` if already `BeginPending` or `InProgress`. /// - `AlreadyFinalized` if `Committed` or `RolledBack`.tx or finalized. pub fn soft_begin(&mut self) -> Result<(), TransactionError> { - println!("soft begin"); match self.status { TransactionStatus::Idle => { self.status = TransactionStatus::BeginPending; @@ -104,7 +103,6 @@ impl LogicalTransaction { /// - `InvalidManualShardType` if `shard` is not `Shard::Direct(_)`. /// - `ShardConflict` if `active_shard` is set to a different shard. pub fn execute_query(&mut self, shard: Shard) -> Result<(), TransactionError> { - println!("EXECUTE QUERY"); self.touch_shard(shard)?; match self.status { @@ -126,7 +124,6 @@ impl LogicalTransaction { /// - `NoActiveTransaction` if `BeginPending` (nothing ran). /// - `AlreadyFinalized` if already `Committed` or `RolledBack`. pub fn commit(&mut self) -> Result<(), TransactionError> { - println!("COMMIT TRANSACTION"); match self.status { TransactionStatus::Idle => Err(TransactionError::ExpectedActive), TransactionStatus::BeginPending => { @@ -149,7 +146,6 @@ impl LogicalTransaction { /// - `NoActiveTransaction` if `BeginPending` (nothing ran). /// - `AlreadyFinalized` if already `Committed` or `RolledBack`. pub fn rollback(&mut self) -> Result<(), TransactionError> { - println!("ROLLBACK TRANSACTION"); match self.status { TransactionStatus::Idle => Err(TransactionError::ExpectedActive), TransactionStatus::BeginPending => { From a3f6d46b67d530c06a0a704720dbf468ca21dd45 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Mon, 11 Aug 2025 15:40:26 -0400 Subject: [PATCH 23/25] wip --- pgdog/src/frontend/client/mod.rs | 15 --------------- pgdog/src/frontend/client/test/mod.rs | 24 +++++------------------- 2 files changed, 5 insertions(+), 34 deletions(-) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index a193b7b3..fed77ab2 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -635,16 +635,12 @@ impl Client { let message = message.backend(); let has_more_messages = inner.backend.has_more_messages(); - println!("\n --> ONE"); - // Messages that we need to send to the client immediately. // ReadyForQuery (B) | CopyInResponse (B) | ErrorResponse(B) | NoticeResponse(B) | NotificationResponse (B) let flush = matches!(code, 'Z' | 'G' | 'E' | 'N' | 'A') || !has_more_messages || message.streaming(); - println!("\n --> TWO"); - // Server finished executing a query. // ReadyForQuery (B) if code == 'Z' { @@ -660,8 +656,6 @@ impl Client { } } - println!("\n --> THREE"); - inner.stats.sent(message.len()); // Release the connection back into the pool @@ -669,7 +663,6 @@ impl Client { // Flushing can take a minute and we don't want to block // the connection from being reused. if inner.backend.done() { - println!("\n --> FOUR"); let changed_params = inner.backend.changed_params(); if inner.transaction_mode() && !self.replication_mode { inner.disconnect(); @@ -681,8 +674,6 @@ impl Client { inner.stats.last_transaction_time.as_secs_f64() * 1000.0 ); - println!("\n --> FIVE"); - // Update client params with values // sent from the server using ParameterStatus(B) messages. if !changed_params.is_empty() { @@ -694,23 +685,17 @@ impl Client { } } - println!("\n --> SIX"); - if flush { self.stream.send_flush(&message).await?; } else { self.stream.send(&message).await?; } - println!("\n --> SEVEN"); - // Pooler is offline or the client requested to disconnect and the transaction is done. if inner.backend.done() && (inner.comms.offline() || self.shutdown) && !self.admin { return Ok(true); } - println!("\n --> EIGHT"); - Ok(false) } diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index 1eca032d..9e2992e7 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -455,7 +455,6 @@ async fn test_transaction_state() { assert!(client.in_transaction()); assert!(inner.router.route().is_write()); - assert!(client.logical_transaction.in_transaction()); conn.write_all(&buffer!( { Parse::named("test", "SELECT $1") }, @@ -471,7 +470,6 @@ async fn test_transaction_state() { assert!(inner.router.routed()); assert!(client.in_transaction()); assert!(inner.router.route().is_write()); - assert!(client.logical_transaction.in_transaction()); for c in ['1', 't', 'T', 'Z'] { let msg = inner.backend.read().await.unwrap(); @@ -507,47 +505,35 @@ async fn test_transaction_state() { let msg = inner.backend.read().await.unwrap(); assert_eq!(msg.code(), c); - println!("Loop -> {} :: pre-message {:?}", c, inner.router.route()); + println!("Before: {}: routed?{}", c, inner.router.routed()); + client.server_message(&mut inner.get(), msg).await.unwrap(); - println!("Loop -> {} :: post-message {:?}", c, inner.router.route()); - } - println!("Out of loop route {:?}", inner.router.route()); + println!("After: {}: routed? {}", c, inner.router.routed()); + } read!(conn, ['2', 'D', 'C', 'Z']); - // assert!(inner.router.routed()); + assert!(inner.router.routed()); assert!(client.in_transaction()); assert!(inner.router.route().is_write()); - assert!(client.logical_transaction.in_transaction()); conn.write_all(&buffer!({ Query::new("COMMIT") })) .await .unwrap(); - println!("I JUST COMMITED IN THE TESTS!"); - client.buffer(&State::Idle).await.unwrap(); client.client_messages(inner.get()).await.unwrap(); - println!("\nONE"); - for c in ['C', 'Z'] { - println!("{} A", c); let msg = inner.backend.read().await.unwrap(); assert_eq!(msg.code(), c); - println!("{} B", c); client.server_message(&mut inner.get(), msg).await.unwrap(); - println!("{} C", c); } - println!("\nTWO"); - read!(conn, ['C', 'Z']); - println!("\nTHRE"); - assert!(!client.in_transaction()); assert!(!inner.router.routed()); } From 9f5c10fc6a1d67b35d9e8e194951a3403ad0f6ce Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Mon, 11 Aug 2025 22:43:42 -0400 Subject: [PATCH 24/25] wip --- pgdog/src/frontend/client/mod.rs | 5 +++++ pgdog/src/frontend/client/test/mod.rs | 16 ++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index fed77ab2..50137dbd 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -658,6 +658,8 @@ impl Client { inner.stats.sent(message.len()); + println!("server_message: 1"); + // Release the connection back into the pool // before flushing data to client. // Flushing can take a minute and we don't want to block @@ -693,9 +695,12 @@ impl Client { // Pooler is offline or the client requested to disconnect and the transaction is done. if inner.backend.done() && (inner.comms.offline() || self.shutdown) && !self.admin { + println!("i am exiting"); return Ok(true); } + println!("i am exiting"); + Ok(false) } diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index 9e2992e7..c4c624cc 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -514,24 +514,36 @@ async fn test_transaction_state() { read!(conn, ['2', 'D', 'C', 'Z']); - assert!(inner.router.routed()); - assert!(client.in_transaction()); + // assert!(inner.router.routed()); + // assert!(client.in_transaction()); assert!(inner.router.route().is_write()); + println!("1."); conn.write_all(&buffer!({ Query::new("COMMIT") })) .await .unwrap(); + println!("2."); + client.buffer(&State::Idle).await.unwrap(); + + println!("2.5"); + client.client_messages(inner.get()).await.unwrap(); + println!("3."); + for c in ['C', 'Z'] { + println!("3.1"); let msg = inner.backend.read().await.unwrap(); assert_eq!(msg.code(), c); + println!("3.2"); client.server_message(&mut inner.get(), msg).await.unwrap(); } + println!("4."); + read!(conn, ['C', 'Z']); assert!(!client.in_transaction()); From b8f5fe672381540611d0ca101ca2810be5aba0f6 Mon Sep 17 00:00:00 2001 From: Nic Laflamme Date: Tue, 12 Aug 2025 11:28:35 -0400 Subject: [PATCH 25/25] wip --- pgdog/src/backend/pool/connection/binding.rs | 1 + pgdog/src/frontend/client/mod.rs | 86 ++++++++++++++------ pgdog/src/frontend/client/test/mod.rs | 36 +++++--- pgdog/src/frontend/logical_transaction.rs | 13 ++- 4 files changed, 98 insertions(+), 38 deletions(-) diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 0448d556..f63be9bc 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -71,6 +71,7 @@ impl Binding { Binding::Admin(backend) => Ok(backend.read().await?), Binding::MultiShard(shards, state) => { + println!("2.1"); if shards.is_empty() { loop { debug!("multi-shard binding suspended"); diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 50137dbd..0576fd88 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -5,6 +5,7 @@ use std::time::Instant; use bytes::BytesMut; use engine::EngineContext; +use pg_query::protobuf::PartitionElem; use smallvec::SmallVec; use timeouts::Timeouts; use tokio::time::timeout; @@ -373,9 +374,19 @@ impl Client { /// Handle client messages. async fn client_messages(&mut self, mut inner: InnerBorrow<'_>) -> Result { - println!(""); - println!(""); - println!("Starting Inner.router.route: {}", inner.router.route()); + // We don't start a transaction on the servers until a client is actually executing something. + // This prevents us holding open connections to multiple servers + if self.should_trigger_buffered_begin() { + println!(""); + println!("****************************************"); + println!("****************************************"); + println!("****************************************"); + println!("****************************************"); + println!("****************************************"); + println!(""); + inner.backend.begin().await?; + self.logical_transaction.record_begin(); + } inner .stats @@ -439,6 +450,14 @@ impl Client { } }; + println!(""); + println!("COMMAND: {:?}", command); + println!("-- connected? {}", connected); + + // AAAAAAA + // I decided that transactions keep the connection open. which makes total sense. + // how can we release a transaction to the connection pool and have it be reused by another client? + if !connected { // Simulate transaction starting // until client sends an actual query. @@ -450,6 +469,13 @@ impl Client { // to a shard. // match command { + Some(Command::CommitTransaction) => { + println!("HELLOOOOOOOO?"); + self.end_transaction(false).await?; + + inner.done(self.in_transaction()); + return Ok(false); + } Some(Command::StartTransaction(query)) => { if let BufferedQuery::Query(_) = query { self.start_transaction().await?; @@ -464,12 +490,6 @@ impl Client { inner.done(self.in_transaction()); return Ok(false); } - Some(Command::CommitTransaction) => { - self.end_transaction(false).await?; - - inner.done(self.in_transaction()); - return Ok(false); - } // How many shards are configured. Some(Command::Shards(shards)) => { let rd = RowDescription::new(&[Field::bigint("shards")]); @@ -499,8 +519,6 @@ impl Client { Some(Command::Query(route)) => { if self.in_transaction() { - println!("I am in a transaction!"); - println!("shard: {}", route.shard().clone()); let shard = route.shard().clone(); self.logical_transaction.execute_query(shard)?; } @@ -580,14 +598,6 @@ impl Client { println!("Inner.router.route: {}", inner.router.route()); - // We don't start a transaction on the servers until - // a client is actually executing something. - // - // This prevents us holding open connections to multiple servers - if self.should_trigger_buffered_begin() { - inner.backend.begin().await?; - } - for msg in self.request_buffer.iter() { if let ProtocolMessage::Bind(bind) = msg { inner.backend.bind(bind)? @@ -635,6 +645,11 @@ impl Client { let message = message.backend(); let has_more_messages = inner.backend.has_more_messages(); + println!(""); + println!(""); + println!(""); + println!("BACKKEND: \n{:?}", message); + // Messages that we need to send to the client immediately. // ReadyForQuery (B) | CopyInResponse (B) | ErrorResponse(B) | NoticeResponse(B) | NotificationResponse (B) let flush = matches!(code, 'Z' | 'G' | 'E' | 'N' | 'A') @@ -658,19 +673,20 @@ impl Client { inner.stats.sent(message.len()); - println!("server_message: 1"); - // Release the connection back into the pool // before flushing data to client. // Flushing can take a minute and we don't want to block // the connection from being reused. if inner.backend.done() { - let changed_params = inner.backend.changed_params(); - if inner.transaction_mode() && !self.replication_mode { + if inner.transaction_mode() && !self.replication_mode && !self.in_transaction() { inner.disconnect(); } + + let changed_params = inner.backend.changed_params(); + inner.stats.transaction(); inner.reset_router(); + debug!( "transaction finished [{:.3}ms]", inner.stats.last_transaction_time.as_secs_f64() * 1000.0 @@ -695,12 +711,9 @@ impl Client { // Pooler is offline or the client requested to disconnect and the transaction is done. if inner.backend.done() && (inner.comms.offline() || self.shutdown) && !self.admin { - println!("i am exiting"); return Ok(true); } - println!("i am exiting"); - Ok(false) } @@ -812,6 +825,7 @@ impl Client { /// This avoids connecting to servers when clients start and commit transactions /// with no queries. async fn end_transaction(&mut self, rollback: bool) -> Result<(), Error> { + println!("ENDING??? ---"); // stack‐allocate up to 3 messages: NOTICE + COMMIT/ROLLBACK + READY let mut messages: SmallVec<[Message; 3]> = SmallVec::new(); @@ -821,6 +835,8 @@ impl Client { self.logical_transaction.commit() }; + println!("ENDING --- {:?}", logical_result); + match logical_result { Err(TransactionError::ExpectedActive) => { messages.push( @@ -882,7 +898,23 @@ impl Client { } fn should_trigger_buffered_begin(&self) -> bool { - self.request_buffer.executable() && !self.in_transaction() + let executable = self.request_buffer.executable(); + let should_trigger_begin = self.logical_transaction.should_trigger_begin(); + + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!("buffer: {:?}", self.request_buffer); + println!(""); + println!( + "Executable: {}, should_trigger_begin: {}", + executable, should_trigger_begin + ); + + executable && should_trigger_begin } } diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index c4c624cc..508daa9c 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -445,6 +445,29 @@ async fn test_lock_session() { async fn test_transaction_state() { let (mut conn, mut client, mut inner) = new_client!(true); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + println!(""); + conn.write_all(&buffer!({ Query::new("BEGIN") })) .await .unwrap(); @@ -496,7 +519,6 @@ async fn test_transaction_state() { .await .unwrap(); - assert!(!inner.router.routed()); client.buffer(&State::Idle).await.unwrap(); client.client_messages(inner.get()).await.unwrap(); assert!(inner.router.routed()); @@ -504,26 +526,19 @@ async fn test_transaction_state() { for c in ['2', 'D', 'C', 'Z'] { let msg = inner.backend.read().await.unwrap(); assert_eq!(msg.code(), c); - - println!("Before: {}: routed?{}", c, inner.router.routed()); - client.server_message(&mut inner.get(), msg).await.unwrap(); - - println!("After: {}: routed? {}", c, inner.router.routed()); } read!(conn, ['2', 'D', 'C', 'Z']); - // assert!(inner.router.routed()); - // assert!(client.in_transaction()); + assert!(client.in_transaction()); assert!(inner.router.route().is_write()); - println!("1."); conn.write_all(&buffer!({ Query::new("COMMIT") })) .await .unwrap(); - println!("2."); + assert!(client.in_transaction()); client.buffer(&State::Idle).await.unwrap(); @@ -536,6 +551,7 @@ async fn test_transaction_state() { for c in ['C', 'Z'] { println!("3.1"); let msg = inner.backend.read().await.unwrap(); + println!("mssage: {:?}", &msg); assert_eq!(msg.code(), c); println!("3.2"); diff --git a/pgdog/src/frontend/logical_transaction.rs b/pgdog/src/frontend/logical_transaction.rs index b227c1cc..37120053 100644 --- a/pgdog/src/frontend/logical_transaction.rs +++ b/pgdog/src/frontend/logical_transaction.rs @@ -37,6 +37,7 @@ use super::router::parser::Shard; #[derive(Debug)] pub struct LogicalTransaction { pub status: TransactionStatus, + begin_dispatched: bool, manual_shard: Option, dirty_shard: Option, } @@ -45,6 +46,7 @@ impl LogicalTransaction { pub fn new() -> Self { Self { status: TransactionStatus::Idle, + begin_dispatched: false, manual_shard: None, dirty_shard: None, } @@ -164,10 +166,19 @@ impl LogicalTransaction { /// Sets status to `Idle`, clears manual and dirty shard /// Safe to call in any state. pub fn reset(&mut self) { - println!("RESET"); self.status = TransactionStatus::Idle; self.manual_shard = None; self.dirty_shard = None; + self.begin_dispatched = false; + } + + /// TODO + pub fn record_begin(&mut self) { + self.begin_dispatched = true; + } + + pub fn should_trigger_begin(&self) -> bool { + self.in_transaction() && !self.begin_dispatched } /// Pin the transaction to a specific shard.