diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 583ad91a..8ab6ab19 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -59,6 +59,8 @@ impl Binding { pub(super) async fn read(&mut self) -> Result { match self { + Binding::Admin(backend) => Ok(backend.read().await?), + Binding::Server(guard) => { if let Some(guard) = guard.as_mut() { guard.read().await @@ -70,7 +72,6 @@ impl Binding { } } - Binding::Admin(backend) => Ok(backend.read().await?), Binding::MultiShard(shards, state) => { if shards.is_empty() { loop { @@ -115,6 +116,8 @@ impl Binding { /// Send an entire buffer of messages to the servers(s). pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> { + println!("\n\nsending.... \n{:#?}\n\n", client_request); + match self { Binding::Admin(backend) => Ok(backend.send(client_request).await?), @@ -128,6 +131,7 @@ impl Binding { Binding::MultiShard(servers, _state) => { for server in servers.iter_mut() { + println!("\nserver\n${:#?}\n", server); server.send(client_request).await?; } @@ -222,6 +226,7 @@ impl Binding { /// Execute a query on all servers. pub async fn execute(&mut self, query: &str) -> Result<(), Error> { + println!("\n\nexecuting...\n{:#?}", query); match self { Binding::Server(Some(ref mut server)) => { server.execute(query).await?; @@ -243,6 +248,8 @@ impl Binding { match self { Binding::Server(Some(ref mut server)) => server.link_client(params).await, Binding::MultiShard(ref mut servers, _) => { + println!("\nLinking... {:#?}", servers); + let mut max = 0; for server in servers { let synced = server.link_client(params).await?; @@ -271,7 +278,7 @@ impl Binding { } } - pub(super) fn dirty(&mut self) { + pub(super) fn mark_dirty(&mut self) { match self { Binding::Server(Some(ref mut server)) => server.mark_dirty(true), Binding::MultiShard(ref mut servers, _state) => { diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index b93cf0a7..934f5095 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -278,6 +278,8 @@ impl Connection { router: &mut Router, streaming: bool, ) -> Result<(), Error> { + println!("\nhandle_client_request: \n\n{:#?}\n \n", client_request); + if client_request.copy() && !streaming { let rows = router .copy_data(client_request) @@ -353,7 +355,7 @@ impl Connection { pub(crate) fn lock(&mut self, lock: bool) { self.locked = lock; if lock { - self.binding.dirty(); + self.binding.mark_dirty(); } } diff --git a/pgdog/src/backend/pool/connection/multi_shard/mod.rs b/pgdog/src/backend/pool/connection/multi_shard/mod.rs index e3c44233..3390b18f 100644 --- a/pgdog/src/backend/pool/connection/multi_shard/mod.rs +++ b/pgdog/src/backend/pool/connection/multi_shard/mod.rs @@ -40,6 +40,7 @@ struct Counters { pub struct MultiShard { /// Number of shards we are connected to. shards: usize, + /// Route the query is taking. route: Route, @@ -48,6 +49,7 @@ pub struct MultiShard { /// Sorting/aggregate buffer. buffer: Buffer, + decoder: Decoder, } diff --git a/pgdog/src/backend/pool/guard.rs b/pgdog/src/backend/pool/guard.rs index e121f3d7..0b47d8bd 100644 --- a/pgdog/src/backend/pool/guard.rs +++ b/pgdog/src/backend/pool/guard.rs @@ -128,7 +128,7 @@ impl Guard { error!("server reset error [{}]", server.addr()); } Ok(_) => { - server.cleaned(); + server.mark_clean(); } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index cefbb9dd..87ac4c53 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -338,6 +338,7 @@ impl Server { if let Some(message) = self.prepared_statements.state_mut().get_simulated() { return Ok(message.backend()); } + match self .stream .as_mut() @@ -823,7 +824,7 @@ impl Server { /// Server has been cleaned. #[inline] - pub(super) fn cleaned(&mut self) { + pub(super) fn mark_clean(&mut self) { self.dirty = false; } diff --git a/pgdog/src/frontend/client/query_engine/connect.rs b/pgdog/src/frontend/client/query_engine/connect.rs index b5ff0f96..3886151c 100644 --- a/pgdog/src/frontend/client/query_engine/connect.rs +++ b/pgdog/src/frontend/client/query_engine/connect.rs @@ -1,9 +1,8 @@ use tokio::time::timeout; +use tracing::error; use super::*; -use tracing::error; - impl QueryEngine { /// Connect to backend, if necessary. /// @@ -11,13 +10,13 @@ impl QueryEngine { pub(super) async fn connect( &mut self, context: &mut QueryEngineContext<'_>, - route: &Route, ) -> Result { if self.backend.connected() { return Ok(true); } let request = Request::new(self.client_id); + let route = &context.client_request.route; self.stats.waiting(request.created_at); self.comms.stats(self.stats); @@ -26,6 +25,7 @@ impl QueryEngine { Ok(_) => { self.stats.connected(); self.stats.locked(route.lock_session()); + // This connection will be locked to this client // until they disconnect. // diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index d101ba46..2ffe3592 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -1,8 +1,8 @@ use crate::{ backend::pool::{Connection, Request}, frontend::{ - router::{parser::Shard, Route}, - BufferedQuery, Client, Command, Comms, Error, Router, RouterContext, Stats, + router::parser::Shard, BufferedQuery, Client, Command, Comms, Error, Router, RouterContext, + Stats, }, net::{BackendKeyData, ErrorResponse, Message, Parameters}, state::State, @@ -113,10 +113,7 @@ impl<'a> QueryEngine { self.backend.mirror(&context.client_request); let command = self.router.command(); - let route = command.route().clone(); - - // FIXME, we should not to copy route twice. - context.client_request.route = route.clone(); + context.client_request.route = command.route().clone(); match command { Command::Shards(shards) => self.show_shards(context, *shards).await?, @@ -125,19 +122,19 @@ impl<'a> QueryEngine { } Command::CommitTransaction => { if self.backend.connected() { - self.execute(context, &route).await? + self.execute(context).await? } else { self.end_transaction(context, false).await? } } Command::RollbackTransaction => { if self.backend.connected() { - self.execute(context, &route).await? + self.execute(context).await? } else { self.end_transaction(context, true).await? } } - Command::Query(_) => self.execute(context, &route).await?, + Command::Query(_) => self.execute(context).await?, Command::Listen { channel, shard } => { self.listen(context, &channel.clone(), shard.clone()) .await? @@ -153,15 +150,15 @@ impl<'a> QueryEngine { Command::Unlisten(channel) => self.unlisten(context, &channel.clone()).await?, Command::Set { name, value } => { if self.backend.connected() { - self.execute(context, &route).await? + self.execute(context).await? } else { self.set(context, name.clone(), value.clone()).await? } } - Command::Copy(_) => self.execute(context, &route).await?, + Command::Copy(_) => self.execute(context).await?, Command::Rewrite(query) => { context.client_request.rewrite(query)?; - self.execute(context, &route).await?; + self.execute(context).await?; } Command::Deallocate => self.deallocate(context).await?, command => self.unknown_command(context, command.clone()).await?, diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 6dbbb951..6036fe32 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -15,8 +15,9 @@ impl QueryEngine { pub(super) async fn execute( &mut self, context: &mut QueryEngineContext<'_>, - route: &Route, ) -> Result<(), Error> { + let route = &context.client_request.route; + // Check for cross-shard quries. if context.cross_shard_disabled && route.is_cross_shard() { let bytes_sent = context @@ -30,11 +31,11 @@ impl QueryEngine { return Ok(()); } - if !self.connect(context, &route).await? { + if !self.connect(context).await? { return Ok(()); } - // We need to run a query now. + // We need to run a query now, execute any buffered/queued `BEGIN` statements if context.client_request.executable() { if let Some(begin_stmt) = self.begin_stmt.take() { self.backend.execute(begin_stmt.query()).await?; diff --git a/pgdog/src/frontend/router/parser/route.rs b/pgdog/src/frontend/router/parser/route.rs index e96bbc18..4b833504 100644 --- a/pgdog/src/frontend/router/parser/route.rs +++ b/pgdog/src/frontend/router/parser/route.rs @@ -179,6 +179,7 @@ impl Route { writes, locking_behavior, } = write; + self.read = !writes; self.lock_session = matches!(locking_behavior, LockingBehavior::Lock); }