Replies: 2 comments
-
|
I guess you are after the same design then me, where the connection is not passed into a query method, but is a field in the types where the query methods are. This is a bit nasty, but I finally settled with a solution having a Look at the unit tests for an example how to use this. Here, queries are directly added to the Please let me know if you
//! The common interface for application code to execute
//! DB queries.
//!
//! In essence, this crate contains three types:
//! - [`DefaultDb`]: executes queries outside of a transaction, can
//! [start](`DefaultDb::begin_transaction`) a transaction.
//! - [`TransactedDb`]: executes queries inside of a transaction, can
//! [commit](`TransactedDb::commit`) and [roll
//! back](`TransactedDb::rollback`) a transaction.
//! - [`Database`]: a trait implemented by both [`DefaultDb`] and
//! [`TransactedDb`].
//!
//! This crate does not provide any queries by itself. These are meant to be
//! added by other crates via extension traits on [`Database`].
//!
//! ### Registering queries via extension trait
//! Queries should be added as a trait:
//!
//! ```ignore
//! // the trait that contains the queries
//! trait FooQueries : Database {
//! // note that queries functions are not abstract
//! async fn create_foo(&mut self, ...) -> Result<nehws_lib_model::FooId> {
//! sqlx::query!("select ...").fetch_all(self.connection())
//! }
//!
//! // more queries methods here....
//! }
//! ```
//!
//! Make queries available to Database trait
//! via a blanked implementation:
//!
//! ```ignore
//! impl<T: Database> ExampleQueries for T {}
//! ```
#![expect(missing_debug_implementations, async_fn_in_trait)]
use sqlx::{Acquire as _, PgPool, Postgres, Transaction, pool::PoolConnection};
/// We re-export some SQLX types, so these can be used
pub use sqlx::{
Error as NehwsDbError, PgConnection as NehwsDbConnection, PgTransaction as NehwsTransaction,
Result as NehwsDbResult,
};
/// Provides access to a connection.
/// Should be called by extension traits but not from application code.
pub trait Database {
/// Acquires a new connection from the pool.
///
/// [`TransactedDb::commit`]
async fn connection(&mut self) -> sqlx::Result<&mut NehwsDbConnection>;
}
/// Default implementation that works without a transaction
pub struct DefaultDb {
/// db pool
pool: PgPool,
/// when starting a transaction,
acquired: Option<PoolConnection<Postgres>>,
}
impl DefaultDb {
/// Creates a new db instance.
#[must_use]
pub fn new(pool: PgPool) -> DefaultDb {
DefaultDb { pool, acquired: None }
}
/// Run the migrations defined in the `migrations` directory.
pub async fn run_migrations(&self) -> Result<(), sqlx::Error> {
sqlx::migrate!().run(&self.pool).await?;
Ok(())
}
/// If no connection has been acquired yet, a new connection is taken from
/// the pool. Once a transaction has been started, it is the callers'
/// responsibility to make changes permanent by committing it.
pub async fn begin_transaction<'c>(&'c mut self) -> sqlx::Result<TransactedDb<'c>> {
let connection = self.connection().await?;
let transaction = connection.begin().await?;
Ok(TransactedDb { transaction })
}
}
impl Database for DefaultDb {
async fn connection(&mut self) -> sqlx::Result<&mut NehwsDbConnection> {
if self.acquired.is_none() {
let connection = self.pool.acquire().await?;
self.acquired.replace(connection);
}
Ok(self.acquired.as_mut().unwrap_or_else(|| unreachable!()))
}
}
impl Clone for DefaultDb {
fn clone(&self) -> Self {
Self { pool: self.pool.clone(), acquired: None }
}
}
/// DB Queries inside a transaction
pub struct TransactedDb<'c> {
/// what shall I say... the transaction.
transaction: Transaction<'c, Postgres>,
}
/// Operations only valid inside a transaction
impl<'c> TransactedDb<'c> {
/// Attempts to commit the current transaction.
pub async fn commit(self) -> sqlx::Result<()> {
self.transaction.commit().await
}
/// Attempts to rollback the current transaction.
#[allow(dead_code)]
pub async fn rollback(self) -> sqlx::Result<()> {
self.transaction.rollback().await
}
}
impl<'c> Database for TransactedDb<'c> {
async fn connection(&mut self) -> sqlx::Result<&mut NehwsDbConnection> {
self.transaction.acquire().await
}
}
/// Offers access to the underlying transaction.
/// The sole purpose for this is to offer an easier migration path to the new DB
/// layer, to allow code that still uses sqlx' transaction directly continue to
/// work. TODO: remove this when db layer is used throughout the system
impl<'c> AsMut<Transaction<'c, Postgres>> for TransactedDb<'c> {
fn as_mut(&mut self) -> &mut Transaction<'c, Postgres> {
&mut self.transaction
}
}
#[cfg(test)]
mod tests {
use sqlx::{Acquire as _, Row as _};
use super::{Database, NehwsDbResult};
trait ExampleQueries: Database {
async fn find_something(&mut self) -> NehwsDbResult<Option<usize>> {
Ok(sqlx::query!("select 1 as \"peter!\"")
.fetch_optional(self.connection().await?)
.await?
.map(|x| x.peter as _))
}
async fn insert_foo(&mut self, whatever: i32) -> NehwsDbResult<i32> {
// Note: for this test, a new table is created. Since the table is not part of
// our migrations, sqlx cannot check the query at runtime, so we cannot user
// query! macro here.
//
let id: i32 = sqlx::query("insert into foo (whatever) values ($1) returning id;")
.bind(whatever)
.fetch_one(self.connection().await?)
.await?
.get(0);
Ok(id)
}
async fn find_foo_by_id(&mut self, id: i32) -> NehwsDbResult<Option<i32>> {
Ok(sqlx::query("select whatever from foo where id=$1")
.bind(id)
.fetch_optional(self.connection().await?)
.await?
.and_then(|row| row.get(0)))
}
// async fn _connection(&mut self) -> NehwsDbResult<&mut NehwsDbConnection>;
}
/// make out queries available on both `DefaultDb` and `TransactedDb`
impl<T: Database> ExampleQueries for T {}
// These are not tests, but demonstrate how to use this from the app perspectiv
#[sqlx::test]
async fn simple(pool: sqlx::PgPool) -> NehwsDbResult<()> {
let mut db = super::DefaultDb::new(pool);
let query_result = db.find_something().await?;
assert_eq!(query_result, Some(1));
Ok(())
}
#[sqlx::test]
async fn transacted(pool: sqlx::PgPool) -> NehwsDbResult<()> {
create_foo_table(&pool).await?;
let mut db1 = super::DefaultDb::new(pool.clone());
let mut tx = db1.begin_transaction().await?;
let id = tx.insert_foo(123).await?;
assert!(id > 0, "should have assigned an id greater zero");
assert_eq!(
tx.find_foo_by_id(id).await?,
Some(123),
"should have returned new row within same transaction"
);
let mut db2 = super::DefaultDb::new(pool.clone());
assert_eq!(
db2.find_foo_by_id(id).await?,
None,
"uncommitted transaction must not be visible to other connections"
);
tx.commit().await?;
assert_eq!(
db2.find_foo_by_id(id).await?,
Some(123),
"transaction must become visible to other connections after commit"
);
Ok(())
}
#[sqlx::test]
async fn join_transaction(pool: sqlx::PgPool) -> NehwsDbResult<()> {
create_foo_table(&pool).await?;
let mut tx = pool.begin().await?;
let id: i32 = sqlx::query("insert into foo (whatever) values (123) returning id;")
.fetch_one(tx.acquire().await?)
.await?
.get(0);
let db = super::DefaultDb::new(pool.clone());
#[allow(deprecated)]
let mut tx = db.join_transaction(tx);
let foo = tx.find_foo_by_id(id).await?;
assert!(foo.is_some(), "record expected to be visible from within same transaction");
Ok(())
}
async fn create_foo_table(pool: &sqlx::PgPool) -> NehwsDbResult<()> {
sqlx::query("create table foo (id serial primary key, whatever integer)")
.execute(pool)
.await?;
Ok(())
}
} |
Beta Was this translation helpful? Give feedback.
-
|
@belowm hello. Thanks for posting this. I have a question though. You say that
And looking at the where So essentially, when you instantiate a non-transaction DB, you give it just one single connection. And thus, if this connection is occupied by running some of the DB's queries, no other queries of that DB will be able to run until the first one is finished. Is this correct? This, as far as I can tell, creates a bottleneck in performance. Whereas, if your non-transactional DB could be instantiated with a pool of connections, your DB could This is all theory, of course, and I think that having a pool instead of a single connection may not be implementable at all. I've already spent more than a week trying to achieve having a "repository" generic over, if not any executor, then at least a transaction (or a connection) and a connection pool, so that it could acquire connections from a pool when needed, and acquire a single connection from a transaction, when used in such a context, but I think I'll just soon give up, because this doesn't seem possible. Having a standalone method generic over some executor is quite straightforward to implement, but a whole repository - this seems unachievable. Are there any news on this regard maybe? I even asked an SO question (https://stackoverflow.com/questions/79814791/a-repository-generic-over-an-sqlx-executor-in-rust), but almost a week has passed and so far noone left even a single comment. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I currently have a setup where I have a store:
And repositories, for instance, AccountsRepo:
And it's used like:
Is there an idiomatic and neat way to make the repositories ALSO accept transactions (or any
Executor)? For example, so it can ALSO be used like this:Beta Was this translation helpful? Give feedback.
All reactions