Skip to content
11 changes: 9 additions & 2 deletions pgdog/src/backend/pool/connection/binding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ impl Binding {

pub(super) async fn read(&mut self) -> Result<Message, Error> {
match self {
Binding::Admin(backend) => Ok(backend.read().await?),

Binding::Server(guard) => {
if let Some(guard) = guard.as_mut() {
guard.read().await
Expand All @@ -70,7 +72,6 @@ impl Binding {
}
}

Binding::Admin(backend) => Ok(backend.read().await?),
Binding::MultiShard(shards, state) => {
if shards.is_empty() {
loop {
Expand Down Expand Up @@ -115,6 +116,8 @@ impl Binding {

/// Send an entire buffer of messages to the servers(s).
pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> {
println!("\n\nsending.... \n{:#?}\n\n", client_request);

match self {
Binding::Admin(backend) => Ok(backend.send(client_request).await?),

Expand All @@ -128,6 +131,7 @@ impl Binding {

Binding::MultiShard(servers, _state) => {
for server in servers.iter_mut() {
println!("\nserver\n${:#?}\n", server);
server.send(client_request).await?;
}

Expand Down Expand Up @@ -222,6 +226,7 @@ impl Binding {

/// Execute a query on all servers.
pub async fn execute(&mut self, query: &str) -> Result<(), Error> {
println!("\n\nexecuting...\n{:#?}", query);
match self {
Binding::Server(Some(ref mut server)) => {
server.execute(query).await?;
Expand All @@ -243,6 +248,8 @@ impl Binding {
match self {
Binding::Server(Some(ref mut server)) => server.link_client(params).await,
Binding::MultiShard(ref mut servers, _) => {
println!("\nLinking... {:#?}", servers);

let mut max = 0;
for server in servers {
let synced = server.link_client(params).await?;
Expand Down Expand Up @@ -271,7 +278,7 @@ impl Binding {
}
}

pub(super) fn dirty(&mut self) {
pub(super) fn mark_dirty(&mut self) {
match self {
Binding::Server(Some(ref mut server)) => server.mark_dirty(true),
Binding::MultiShard(ref mut servers, _state) => {
Expand Down
4 changes: 3 additions & 1 deletion pgdog/src/backend/pool/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ impl Connection {
router: &mut Router,
streaming: bool,
) -> Result<(), Error> {
println!("\nhandle_client_request: \n\n{:#?}\n \n", client_request);

if client_request.copy() && !streaming {
let rows = router
.copy_data(client_request)
Expand Down Expand Up @@ -353,7 +355,7 @@ impl Connection {
pub(crate) fn lock(&mut self, lock: bool) {
self.locked = lock;
if lock {
self.binding.dirty();
self.binding.mark_dirty();
}
}

Expand Down
2 changes: 2 additions & 0 deletions pgdog/src/backend/pool/connection/multi_shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct Counters {
pub struct MultiShard {
/// Number of shards we are connected to.
shards: usize,

/// Route the query is taking.
route: Route,

Expand All @@ -48,6 +49,7 @@ pub struct MultiShard {

/// Sorting/aggregate buffer.
buffer: Buffer,

decoder: Decoder,
}

Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/backend/pool/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Guard {
error!("server reset error [{}]", server.addr());
}
Ok(_) => {
server.cleaned();
server.mark_clean();
}
}

Expand Down
3 changes: 2 additions & 1 deletion pgdog/src/backend/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl Server {
if let Some(message) = self.prepared_statements.state_mut().get_simulated() {
return Ok(message.backend());
}

match self
.stream
.as_mut()
Expand Down Expand Up @@ -823,7 +824,7 @@ impl Server {

/// Server has been cleaned.
#[inline]
pub(super) fn cleaned(&mut self) {
pub(super) fn mark_clean(&mut self) {
self.dirty = false;
}

Expand Down
6 changes: 3 additions & 3 deletions pgdog/src/frontend/client/query_engine/connect.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
use tokio::time::timeout;
use tracing::error;

use super::*;

use tracing::error;

impl QueryEngine {
/// Connect to backend, if necessary.
///
/// Return true if connected, false otherwise.
pub(super) async fn connect(
&mut self,
context: &mut QueryEngineContext<'_>,
route: &Route,
) -> Result<bool, Error> {
if self.backend.connected() {
return Ok(true);
}

let request = Request::new(self.client_id);
let route = &context.client_request.route;

self.stats.waiting(request.created_at);
self.comms.stats(self.stats);
Expand All @@ -26,6 +25,7 @@ impl QueryEngine {
Ok(_) => {
self.stats.connected();
self.stats.locked(route.lock_session());

// This connection will be locked to this client
// until they disconnect.
//
Expand Down
21 changes: 9 additions & 12 deletions pgdog/src/frontend/client/query_engine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
backend::pool::{Connection, Request},
frontend::{
router::{parser::Shard, Route},
BufferedQuery, Client, Command, Comms, Error, Router, RouterContext, Stats,
router::parser::Shard, BufferedQuery, Client, Command, Comms, Error, Router, RouterContext,
Stats,
},
net::{BackendKeyData, ErrorResponse, Message, Parameters},
state::State,
Expand Down Expand Up @@ -113,10 +113,7 @@ impl<'a> QueryEngine {
self.backend.mirror(&context.client_request);

let command = self.router.command();
let route = command.route().clone();

// FIXME, we should not to copy route twice.
context.client_request.route = route.clone();
context.client_request.route = command.route().clone();

match command {
Command::Shards(shards) => self.show_shards(context, *shards).await?,
Expand All @@ -125,19 +122,19 @@ impl<'a> QueryEngine {
}
Command::CommitTransaction => {
if self.backend.connected() {
self.execute(context, &route).await?
self.execute(context).await?
} else {
self.end_transaction(context, false).await?
}
}
Command::RollbackTransaction => {
if self.backend.connected() {
self.execute(context, &route).await?
self.execute(context).await?
} else {
self.end_transaction(context, true).await?
}
}
Command::Query(_) => self.execute(context, &route).await?,
Command::Query(_) => self.execute(context).await?,
Command::Listen { channel, shard } => {
self.listen(context, &channel.clone(), shard.clone())
.await?
Expand All @@ -153,15 +150,15 @@ impl<'a> QueryEngine {
Command::Unlisten(channel) => self.unlisten(context, &channel.clone()).await?,
Command::Set { name, value } => {
if self.backend.connected() {
self.execute(context, &route).await?
self.execute(context).await?
} else {
self.set(context, name.clone(), value.clone()).await?
}
}
Command::Copy(_) => self.execute(context, &route).await?,
Command::Copy(_) => self.execute(context).await?,
Command::Rewrite(query) => {
context.client_request.rewrite(query)?;
self.execute(context, &route).await?;
self.execute(context).await?;
}
Command::Deallocate => self.deallocate(context).await?,
command => self.unknown_command(context, command.clone()).await?,
Expand Down
7 changes: 4 additions & 3 deletions pgdog/src/frontend/client/query_engine/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ impl QueryEngine {
pub(super) async fn execute(
&mut self,
context: &mut QueryEngineContext<'_>,
route: &Route,
) -> Result<(), Error> {
let route = &context.client_request.route;

// Check for cross-shard quries.
if context.cross_shard_disabled && route.is_cross_shard() {
let bytes_sent = context
Expand All @@ -30,11 +31,11 @@ impl QueryEngine {
return Ok(());
}

if !self.connect(context, &route).await? {
if !self.connect(context).await? {
return Ok(());
}

// We need to run a query now.
// We need to run a query now, execute any buffered/queued `BEGIN` statements
if context.client_request.executable() {
if let Some(begin_stmt) = self.begin_stmt.take() {
self.backend.execute(begin_stmt.query()).await?;
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/frontend/router/parser/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl Route {
writes,
locking_behavior,
} = write;

self.read = !writes;
self.lock_session = matches!(locking_behavior, LockingBehavior::Lock);
}
Expand Down
Loading