Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions integration/go/go_pgx/load_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,18 @@ outer:
}

func prewarm(t *testing.T, pool *pgxpool.Pool) {
ctx := context.Background()
for range 25 {
for _, q := range []string{"BEGIN", "SELECT 1", "COMMIT", "SELECT 1"} {
_, err := pool.Exec(context.Background(), q)
assert.NoError(t, err)
}
// transaction `BEGIN; SELECT 1; COMMIT;`
tx, err := pool.Begin(ctx)
assert.NoError(t, err)
_, err = tx.Exec(ctx, "SELECT 1")
assert.NoError(t, err)
err = tx.Commit(ctx)
assert.NoError(t, err)

// no-transaction `SELECT 1;`
_, err = pool.Exec(ctx, "SELECT 1")
assert.NoError(t, err)
}
}
4 changes: 2 additions & 2 deletions integration/load_balancer/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export PGHOST=127.0.0.1
export PGDATABASE=postgres
export PGPASSWORD=postgres

docker-compose up -d
docker compose up -d
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docker-compose is deprecated.
docker compose has replaced it.

docker-compose does not work on my recent version of docker desktop.
docker compose works on the version of docker in our CI.

win-win?
I suppose it will work for a every engineer's machine too. Can rollback this change if that is not the case



echo "Waiting for Postgres to be ready"
Expand Down Expand Up @@ -45,4 +45,4 @@ popd

killall pgdog

docker-compose down
docker compose down
1 change: 1 addition & 0 deletions pgdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ indexmap = "2.9"
lru = "0.16"
hickory-resolver = "0.25.2"
lazy_static = "1"
smallvec = "1.15.1"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
Expand Down
9 changes: 8 additions & 1 deletion pgdog/src/backend/pool/connection/binding.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Binding between frontend client and a connection on the backend.

use crate::{
net::{parameter::Parameters, ProtocolMessage},
net::{parameter::Parameters, ProtocolMessage, Query},
state::State,
};

Expand Down Expand Up @@ -237,6 +237,13 @@ impl Binding {
Ok(())
}

/// Execute a BEGIN on all servers
/// TODO: Block mutli-shard BEGINs as transaction should not occur on multiple shards
pub async fn begin(&mut self) -> Result<(), Error> {
let query = Query::new("BEGIN");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment. Client can pass options to BEGIN, changing transaction type.

self.execute(query.query()).await
}

pub async fn link_client(&mut self, params: &Parameters) -> Result<usize, Error> {
match self {
Binding::Server(Some(ref mut server)) => server.link_client(params).await,
Expand Down
12 changes: 9 additions & 3 deletions pgdog/src/frontend/client/engine/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{
frontend::{client::Inner, Buffer, Client, PreparedStatements},
frontend::{
client::Inner, logical_transaction::LogicalTransaction, Buffer, Client, PreparedStatements,
},
net::Parameters,
};

Expand All @@ -13,7 +15,7 @@ pub struct EngineContext<'a> {
/// Client parameters.
pub(super) params: &'a Parameters,
/// Is the client inside a transaction?
pub(super) in_transaction: bool,
pub(super) logical_transaction: &'a LogicalTransaction,
/// Messages currently in client's buffer.
pub(super) buffer: &'a Buffer,
}
Expand All @@ -23,9 +25,13 @@ impl<'a> EngineContext<'a> {
Self {
prepared_statements: &mut client.prepared_statements,
params: &client.params,
in_transaction: client.in_transaction,
logical_transaction: &client.logical_transaction,
connected: inner.connected(),
buffer: &client.request_buffer,
}
}

pub fn in_transaction(&self) -> bool {
self.logical_transaction.in_transaction()
}
}
9 changes: 6 additions & 3 deletions pgdog/src/frontend/client/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ impl<'a> Engine<'a> {
'S' => {
if only_close || only_sync && !self.context.connected {
messages.push(
ReadyForQuery::in_transaction(self.context.in_transaction).message()?,
ReadyForQuery::in_transaction(self.context.in_transaction())
.message()?,
)
}
}
Expand All @@ -73,7 +74,7 @@ impl<'a> Engine<'a> {
#[cfg(test)]
mod test {
use crate::{
frontend::{Buffer, PreparedStatements},
frontend::{logical_transaction::LogicalTransaction, Buffer, PreparedStatements},
net::{Parameters, Parse, Sync},
};

Expand All @@ -93,11 +94,13 @@ mod test {
Sync.into(),
]);

let logical_transaction = LogicalTransaction::new();

let context = EngineContext {
connected: false,
prepared_statements: &mut prepared,
params: &params,
in_transaction: false,
logical_transaction: &logical_transaction,
buffer: &buf,
};

Expand Down
19 changes: 8 additions & 11 deletions pgdog/src/frontend/client/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
Error as BackendError,
},
frontend::{
buffer::BufferedQuery, router::Error as RouterError, Buffer, Command, Comms,
PreparedStatements, Router, RouterContext, Stats,
logical_transaction::LogicalTransaction, router::Error as RouterError, Buffer, Command,
Comms, PreparedStatements, Router, RouterContext, Stats,
},
net::Parameters,
state::State,
Expand All @@ -29,8 +29,6 @@ pub struct Inner {
pub(super) router: Router,
/// Client stats.
pub(super) stats: Stats,
/// Start transaction statement, intercepted by the router.
pub(super) start_transaction: Option<BufferedQuery>,
/// Client-wide comms.
pub(super) comms: Comms,
}
Expand All @@ -47,7 +45,6 @@ impl Inner {
backend,
router,
stats: Stats::new(),
start_transaction: None,
comms: client.comms.clone(),
})
}
Expand All @@ -58,7 +55,7 @@ impl Inner {
buffer: &mut Buffer,
prepared_statements: &mut PreparedStatements,
params: &Parameters,
in_transaction: bool,
logical_transaction: &LogicalTransaction,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the root change, followed the compiler from here basically.

) -> Result<Option<&Command>, RouterError> {
let command = self
.backend
Expand All @@ -67,11 +64,11 @@ impl Inner {
.map(|cluster| {
// Build router context.
let context = RouterContext::new(
buffer, // Query and parameters.
cluster, // Cluster configuration.
prepared_statements, // Prepared statements.
params, // Client connection parameters.
in_transaction, // Client in explcitely started transaction.
buffer, // Query and parameters.
cluster, // Cluster configuration.
prepared_statements, // Prepared statements.
params, // Client connection parameters.
logical_transaction.in_transaction(), // Client in explcitely started transaction.
)?;
self.router.query(context)
})
Expand Down
Loading
Loading