Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions integration/go/go_pgx/load_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,18 @@ outer:
}

func prewarm(t *testing.T, pool *pgxpool.Pool) {
ctx := context.Background()
for range 25 {
for _, q := range []string{"BEGIN", "SELECT 1", "COMMIT", "SELECT 1"} {
_, err := pool.Exec(context.Background(), q)
assert.NoError(t, err)
}
// transaction `BEGIN; SELECT 1; COMMIT;`
tx, err := pool.Begin(ctx)
assert.NoError(t, err)
_, err = tx.Exec(ctx, "SELECT 1")
assert.NoError(t, err)
err = tx.Commit(ctx)
assert.NoError(t, err)

// no-transaction `SELECT 1;`
_, err = pool.Exec(ctx, "SELECT 1")
assert.NoError(t, err)
}
}
4 changes: 2 additions & 2 deletions integration/load_balancer/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export PGHOST=127.0.0.1
export PGDATABASE=postgres
export PGPASSWORD=postgres

docker-compose up -d
docker compose up -d
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docker-compose is deprecated.
docker compose has replaced it.

docker-compose does not work on my recent version of docker desktop.
docker compose works on the version of docker in our CI.

win-win?
I suppose it will work for a every engineer's machine too. Can rollback this change if that is not the case



echo "Waiting for Postgres to be ready"
Expand Down Expand Up @@ -45,4 +45,4 @@ popd

killall pgdog

docker-compose down
docker compose down
1 change: 1 addition & 0 deletions pgdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ indexmap = "2.9"
lru = "0.16"
hickory-resolver = "0.25.2"
lazy_static = "1"
smallvec = "1.15.1"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
Expand Down
10 changes: 9 additions & 1 deletion pgdog/src/backend/pool/connection/binding.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Binding between frontend client and a connection on the backend.

use crate::{
net::{parameter::Parameters, ProtocolMessage},
net::{parameter::Parameters, ProtocolMessage, Query},
state::State,
};

Expand Down Expand Up @@ -71,6 +71,7 @@ impl Binding {

Binding::Admin(backend) => Ok(backend.read().await?),
Binding::MultiShard(shards, state) => {
println!("2.1");
if shards.is_empty() {
loop {
debug!("multi-shard binding suspended");
Expand Down Expand Up @@ -237,6 +238,13 @@ impl Binding {
Ok(())
}

/// Execute a BEGIN on all servers
/// TODO: Block mutli-shard BEGINs as transaction should not occur on multiple shards
pub async fn begin(&mut self) -> Result<(), Error> {
let query = Query::new("BEGIN");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment. Client can pass options to BEGIN, changing transaction type.

self.execute(query.query()).await
}

pub async fn link_client(&mut self, params: &Parameters) -> Result<usize, Error> {
match self {
Binding::Server(Some(ref mut server)) => server.link_client(params).await,
Expand Down
6 changes: 5 additions & 1 deletion pgdog/src/backend/pool/connection/mirror.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tracing::{debug, error};
use crate::backend::Cluster;
use crate::config::config;
use crate::frontend::client::timeouts::Timeouts;
use crate::frontend::logical_transaction::LogicalTransaction;
use crate::frontend::{Command, PreparedStatements, Router, RouterContext};
use crate::net::Parameters;
use crate::state::State;
Expand Down Expand Up @@ -47,6 +48,8 @@ pub(crate) struct Mirror {
params: Parameters,
/// Mirror state.
state: State,
/// Logical transaction state.
logical_transaction: LogicalTransaction,
}

impl Mirror {
Expand All @@ -71,6 +74,7 @@ impl Mirror {
cluster: cluster.clone(),
state: State::Idle,
params: Parameters::default(),
logical_transaction: LogicalTransaction::new(),
};

let query_timeout = Timeouts::from_config(&config.config.general);
Expand Down Expand Up @@ -135,7 +139,7 @@ impl Mirror {
&self.cluster,
&mut self.prepared_statements,
&self.params,
false,
&self.logical_transaction,
) {
match self.router.query(context) {
Ok(command) => {
Expand Down
12 changes: 9 additions & 3 deletions pgdog/src/frontend/client/engine/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{
frontend::{client::Inner, Buffer, Client, PreparedStatements},
frontend::{
client::Inner, logical_transaction::LogicalTransaction, Buffer, Client, PreparedStatements,
},
net::Parameters,
};

Expand All @@ -13,7 +15,7 @@ pub struct EngineContext<'a> {
/// Client parameters.
pub(super) params: &'a Parameters,
/// Is the client inside a transaction?
pub(super) in_transaction: bool,
pub(super) logical_transaction: &'a LogicalTransaction,
/// Messages currently in client's buffer.
pub(super) buffer: &'a Buffer,
}
Expand All @@ -23,9 +25,13 @@ impl<'a> EngineContext<'a> {
Self {
prepared_statements: &mut client.prepared_statements,
params: &client.params,
in_transaction: client.in_transaction,
logical_transaction: &client.logical_transaction,
connected: inner.connected(),
buffer: &client.request_buffer,
}
}

pub fn in_transaction(&self) -> bool {
self.logical_transaction.in_transaction()
}
}
9 changes: 6 additions & 3 deletions pgdog/src/frontend/client/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ impl<'a> Engine<'a> {
'S' => {
if only_close || only_sync && !self.context.connected {
messages.push(
ReadyForQuery::in_transaction(self.context.in_transaction).message()?,
ReadyForQuery::in_transaction(self.context.in_transaction())
.message()?,
)
}
}
Expand All @@ -73,7 +74,7 @@ impl<'a> Engine<'a> {
#[cfg(test)]
mod test {
use crate::{
frontend::{Buffer, PreparedStatements},
frontend::{logical_transaction::LogicalTransaction, Buffer, PreparedStatements},
net::{Parameters, Parse, Sync},
};

Expand All @@ -93,11 +94,13 @@ mod test {
Sync.into(),
]);

let logical_transaction = LogicalTransaction::new();

let context = EngineContext {
connected: false,
prepared_statements: &mut prepared,
params: &params,
in_transaction: false,
logical_transaction: &logical_transaction,
buffer: &buf,
};

Expand Down
11 changes: 4 additions & 7 deletions pgdog/src/frontend/client/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
Error as BackendError,
},
frontend::{
buffer::BufferedQuery, router::Error as RouterError, Buffer, Command, Comms,
PreparedStatements, Router, RouterContext, Stats,
logical_transaction::LogicalTransaction, router::Error as RouterError, Buffer, Command,
Comms, PreparedStatements, Router, RouterContext, Stats,
},
net::Parameters,
state::State,
Expand All @@ -29,8 +29,6 @@ pub struct Inner {
pub(super) router: Router,
/// Client stats.
pub(super) stats: Stats,
/// Start transaction statement, intercepted by the router.
pub(super) start_transaction: Option<BufferedQuery>,
/// Client-wide comms.
pub(super) comms: Comms,
}
Expand All @@ -47,7 +45,6 @@ impl Inner {
backend,
router,
stats: Stats::new(),
start_transaction: None,
comms: client.comms.clone(),
})
}
Expand All @@ -58,7 +55,7 @@ impl Inner {
buffer: &mut Buffer,
prepared_statements: &mut PreparedStatements,
params: &Parameters,
in_transaction: bool,
logical_transaction: &LogicalTransaction,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the root change, followed the compiler from here basically.

) -> Result<Option<&Command>, RouterError> {
let command = self
.backend
Expand All @@ -71,7 +68,7 @@ impl Inner {
cluster, // Cluster configuration.
prepared_statements, // Prepared statements.
params, // Client connection parameters.
in_transaction, // Client in explcitely started transaction.
logical_transaction, // Client in explcitely started transaction.
)?;
self.router.query(context)
})
Expand Down
Loading
Loading