Skip to content

Commit 7fb327b

Browse files
committed
Documentation + pipelining support for postgres
1 parent a807e68 commit 7fb327b

File tree

5 files changed

+413
-52
lines changed

5 files changed

+413
-52
lines changed

src/lib.rs

Lines changed: 192 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,73 @@
1+
//! Diesel-async provides async variants of diesel releated query functionality
2+
//!
3+
//! diesel-async is an extension to diesel itself. It is designed to be used togehter
4+
//! with the main diesel crate. It only provides async variants of core diesel traits,
5+
//! that perform actual io-work.
6+
//! This includes async counterparts the following traits:
7+
//! * [`diesel::prelude::RunQueryDsl`](https://docs.diesel.rs/2.0.x/diesel/prelude/trait.RunQueryDsl.html)
8+
//! -> [`diesel_async::RunQueryDsl`](crate::RunQueryDsl)
9+
//! * [`diesel::connection::Connection`](https://docs.diesel.rs/2.0.x/diesel/connection/trait.Connection.html)
10+
//! -> [`diesel_async::AsyncConnection`](crate::AsyncConnection)
11+
//! * [`diesel::query_dsl::UpdateAndFetchResults`](https://docs.diesel.rs/2.0.x/diesel/query_dsl/trait.UpdateAndFetchResults.html)
12+
//! -> [`diesel_async::UpdateAndFetchResults`](crate::UpdateAndFetchResults)
13+
//!
14+
//! These traits closely mirror their diesel counter parts while providing async functionality.
15+
//!
16+
//! In addition to these core traits 2 fully async connection implementations are provided
17+
//! by diesel-async:
18+
//!
19+
//! * [`AsyncMysqlConnection`] (enabled by the `mysql` feature)
20+
//! * [`AsyncPgConnection`] (enabled by the `postgres` feature)
21+
//!
22+
//! Ordinary usage of `diesel-async` assumes that you just replace the corresponding sync trait
23+
//! method calls and connections with their async counterparts.
24+
//!
25+
//! ```rust
26+
//! # include!("./doctest_setup.rs");
27+
//! #
28+
//! diesel::table! {
29+
//! users(id) {
30+
//! id -> Integer,
31+
//! name -> Text,
32+
//! }
33+
//! }
34+
//! #
35+
//! # #[tokio::main(flavor = "current_thread")]
36+
//! # async fn main() {
37+
//! # run_test().await;
38+
//! # }
39+
//! #
40+
//! # async fn run_test() -> QueryResult<()> {
41+
//! # use diesel::insert_into;
42+
//! use crate::users::dsl::*;
43+
//! # let mut connection = establish_connection().await;
44+
//! # /*
45+
//! let mut connection = AsyncPgConnection::establish(std::env::var("DATABASE_URL")?).await?;
46+
//! # */
47+
//! let data = users
48+
//! // use ordinary diesel query dsl here
49+
//! .filter(id.gt(0))
50+
//! // execute the query via the provided
51+
//! // async variant of `RunQueryDsl`
52+
//! .load::<(i32, String)>(&mut connection)
53+
//! .await?;
54+
//! let expected_data = vec![
55+
//! (1, String::from("Sean")),
56+
//! (2, String::from("Tess")),
57+
//! ];
58+
//! assert_eq!(expected_data, data);
59+
//! # Ok(())
60+
//! # }
61+
//! ```
62+
63+
#![warn(missing_docs)]
164
use diesel::backend::Backend;
265
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
366
use diesel::row::Row;
467
use diesel::{ConnectionResult, QueryResult};
568
use futures::future::BoxFuture;
669
use futures::{Future, Stream};
70+
771
#[cfg(feature = "mysql")]
872
mod mysql;
973
#[cfg(feature = "postgres")]
@@ -17,47 +81,136 @@ pub use self::mysql::AsyncMysqlConnection;
1781
#[cfg(feature = "postgres")]
1882
pub use self::pg::AsyncPgConnection;
1983
pub use self::run_query_dsl::*;
20-
pub use self::stmt_cache::StmtCache;
21-
pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
2284

85+
use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
86+
87+
/// Perform simple operations on a backend.
88+
///
89+
/// You should likely use [`AsyncConnection`] instead.
2390
#[async_trait::async_trait]
2491
pub trait SimpleAsyncConnection {
92+
/// Execute multiple SQL statements within the same string.
93+
///
94+
/// This function is used to execute migrations,
95+
/// which may contain more than one SQL statement.
2596
async fn batch_execute(&mut self, query: &str) -> QueryResult<()>;
2697
}
2798

99+
/// This trait is a workaround to emulate GAT on stable rust
100+
///
101+
/// It is used to specify the return type of `AsyncConnection::load`
102+
/// and `AsyncConnection::execute` which may contain lifetimes
28103
pub trait AsyncConnectionGatWorkaround<'conn, 'query, DB: Backend> {
104+
/// The future returned by `AsyncConnection::execute`
29105
type ExecuteFuture: Future<Output = QueryResult<usize>> + Send;
106+
/// The future returned by `AsyncConnection::load`
30107
type LoadFuture: Future<Output = QueryResult<Self::Stream>> + Send;
108+
/// The inner stream returned by `AsyncConnection::load`
31109
type Stream: Stream<Item = QueryResult<Self::Row>> + Send;
110+
/// The row type used by the stream returned by `AsyncConnection::load`
32111
type Row: Row<'conn, DB>;
33112
}
34113

114+
/// An async connection to a database
115+
///
116+
/// This trait represents a n async database connection. It can be used to query the database through
117+
/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
118+
/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
35119
#[async_trait::async_trait]
36120
pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send
37121
where
38122
for<'a, 'b> Self: AsyncConnectionGatWorkaround<'a, 'b, Self::Backend>,
39123
{
124+
/// The backend this type connects to
40125
type Backend: Backend;
126+
127+
#[doc(hidden)]
41128
type TransactionManager: TransactionManager<Self>;
42129

130+
/// Establishes a new connection to the database
131+
///
132+
/// The argument to this method and the method's behavior varies by backend.
133+
/// See the documentation for that backend's connection class
134+
/// for details about what it accepts and how it behaves.
43135
async fn establish(database_url: &str) -> ConnectionResult<Self>;
44136

45-
fn load<'conn, 'query, T>(
46-
&'conn mut self,
47-
source: T,
48-
) -> <Self as AsyncConnectionGatWorkaround<'conn, 'query, Self::Backend>>::LoadFuture
49-
where
50-
T: AsQuery + Send + 'query,
51-
T::Query: QueryFragment<Self::Backend> + QueryId + Send + 'query;
52-
53-
fn execute_returning_count<'conn, 'query, T>(
54-
&'conn mut self,
55-
source: T,
56-
) -> <Self as AsyncConnectionGatWorkaround<'conn, 'query, Self::Backend>>::ExecuteFuture
57-
where
58-
T: QueryFragment<Self::Backend> + QueryId + Send + 'query;
59-
60-
async fn transaction<F, R, E>(&mut self, callback: F) -> Result<R, E>
137+
/// Executes the given function inside of a database transaction
138+
///
139+
/// This function executes the provided closure `f` inside a database
140+
/// transaction. If there is already an open transaction for the current
141+
/// connection savepoints will be used instead. The connection is commited if
142+
/// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
143+
/// For both cases the original result value will be returned from this function.
144+
///
145+
/// If the transaction fails to commit due to a `SerializationFailure` or a
146+
/// `ReadOnlyTransaction` a rollback will be attempted. In this case a
147+
/// [`Error::CommitTransactionFailed`](crate::result::Error::CommitTransactionFailed)
148+
/// error is returned, which contains details about the original error and
149+
/// the success of the rollback attempt.
150+
/// If the rollback failed the connection should be considered broken
151+
/// as it contains a uncommitted unabortable open transaction. Any further
152+
/// interaction with the transaction system will result in an returned error
153+
/// in this cases.
154+
///
155+
/// If the closure returns an `Err(_)` and the rollback fails the function
156+
/// will return a [`Error::RollbackError`](crate::result::Error::RollbackError)
157+
/// wrapping the error generated by the rollback operation instead.
158+
/// In this case the connection should be considered broken as it contains
159+
/// an unabortable open transaction.
160+
///
161+
/// If a nested transaction fails to release the corresponding savepoint
162+
/// a rollback will be attempted. In this case a
163+
/// [`Error::CommitTransactionFailed`](crate::result::Error::CommitTransactionFailed)
164+
/// error is returned, which contains the original error and
165+
/// details about the success of the rollback attempt.
166+
///
167+
/// # Example
168+
///
169+
/// ```rust
170+
/// # include!("doctest_setup.rs");
171+
/// use diesel::result::Error;
172+
/// use futures::FutureExt;
173+
///
174+
/// # #[tokio::main(flavor = "current_thread")]
175+
/// # async fn main() {
176+
/// # run_test().await.unwrap();
177+
/// # }
178+
/// #
179+
/// # async fn run_test() -> QueryResult<()> {
180+
/// # use schema::users::dsl::*;
181+
/// # let conn = &mut establish_connection().await;
182+
/// conn.transaction::<_, Error, _>(|conn| async move {
183+
/// diesel::insert_into(users)
184+
/// .values(name.eq("Ruby"))
185+
/// .execute(conn)
186+
/// .await?;
187+
///
188+
/// let all_names = users.select(name).load::<String>(conn).await?;
189+
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
190+
///
191+
/// Ok(())
192+
/// }.boxed()).await?;
193+
///
194+
/// conn.transaction::<(), _, _>(|conn| async move {
195+
/// diesel::insert_into(users)
196+
/// .values(name.eq("Pascal"))
197+
/// .execute(conn)
198+
/// .await?;
199+
///
200+
/// let all_names = users.select(name).load::<String>(conn).await?;
201+
/// assert_eq!(vec!["Sean", "Tess", "Ruby", "Pascal"], all_names);
202+
///
203+
/// // If we want to roll back the transaction, but don't have an
204+
/// // actual error to return, we can return `RollbackTransaction`.
205+
/// Err(Error::RollbackTransaction)
206+
/// }.boxed()).await;
207+
///
208+
/// let all_names = users.select(name).load::<String>(conn).await?;
209+
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
210+
/// # Ok(())
211+
/// # }
212+
/// ```
213+
async fn transaction<R, E, F>(&mut self, callback: F) -> Result<R, E>
61214
where
62215
F: FnOnce(&mut Self) -> BoxFuture<Result<R, E>> + Send,
63216
E: From<diesel::result::Error> + Send,
@@ -78,11 +231,32 @@ where
78231
}
79232
}
80233

234+
/// Creates a transaction that will never be committed. This is useful for
235+
/// tests. Panics if called while inside of a transaction or
236+
/// if called with a connection containing a broken transaction
81237
async fn begin_test_transaction(&mut self) -> QueryResult<()> {
82238
assert_eq!(Self::TransactionManager::get_transaction_depth(self), 0);
83239
Self::TransactionManager::begin_transaction(self).await
84240
}
85241

242+
#[doc(hidden)]
243+
fn load<'conn, 'query, T>(
244+
&'conn mut self,
245+
source: T,
246+
) -> <Self as AsyncConnectionGatWorkaround<'conn, 'query, Self::Backend>>::LoadFuture
247+
where
248+
T: AsQuery + Send + 'query,
249+
T::Query: QueryFragment<Self::Backend> + QueryId + Send + 'query;
250+
251+
#[doc(hidden)]
252+
fn execute_returning_count<'conn, 'query, T>(
253+
&'conn mut self,
254+
source: T,
255+
) -> <Self as AsyncConnectionGatWorkaround<'conn, 'query, Self::Backend>>::ExecuteFuture
256+
where
257+
T: QueryFragment<Self::Backend> + QueryId + Send + 'query;
258+
259+
#[doc(hidden)]
86260
fn transaction_state(
87261
&mut self,
88262
) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData;

src/mysql/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use self::error_helper::ErrorHelper;
2121
use self::row::MysqlRow;
2222
use self::serialize::ToSqlHelper;
2323

24+
/// A connection to a MySQL database. Connection URLs should be in the form
25+
/// `mysql://[user[:password]@]host/database_name`
2426
pub struct AsyncMysqlConnection {
2527
conn: mysql_async::Conn,
2628
stmt_cache: StmtCache<Mysql, Statement>,
@@ -138,6 +140,10 @@ impl PrepareCallback<Statement, MysqlType> for &'_ mut mysql_async::Conn {
138140
}
139141

140142
impl AsyncMysqlConnection {
143+
/// Wrap an existing [`mysql_async::Conn`] into a async diesel mysql connection
144+
///
145+
/// This function constructs a new `AsyncMysqlConnection` based on an existing
146+
/// [`mysql_async::Conn]`.
141147
pub async fn try_from(conn: mysql_async::Conn) -> ConnectionResult<Self> {
142148
use crate::run_query_dsl::RunQueryDsl;
143149
let mut conn = AsyncMysqlConnection {

0 commit comments

Comments
 (0)