Skip to content

Commit 6434959

Browse files
authored
Merge pull request #10 from weiznich/pipelining_support
Pipelining support
2 parents 800da86 + 7fb327b commit 6434959

File tree

7 files changed

+951
-344
lines changed

7 files changed

+951
-344
lines changed

src/lib.rs

Lines changed: 199 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,73 @@
1+
//! Diesel-async provides async variants of diesel releated query functionality
2+
//!
3+
//! diesel-async is an extension to diesel itself. It is designed to be used togehter
4+
//! with the main diesel crate. It only provides async variants of core diesel traits,
5+
//! that perform actual io-work.
6+
//! This includes async counterparts the following traits:
7+
//! * [`diesel::prelude::RunQueryDsl`](https://docs.diesel.rs/2.0.x/diesel/prelude/trait.RunQueryDsl.html)
8+
//! -> [`diesel_async::RunQueryDsl`](crate::RunQueryDsl)
9+
//! * [`diesel::connection::Connection`](https://docs.diesel.rs/2.0.x/diesel/connection/trait.Connection.html)
10+
//! -> [`diesel_async::AsyncConnection`](crate::AsyncConnection)
11+
//! * [`diesel::query_dsl::UpdateAndFetchResults`](https://docs.diesel.rs/2.0.x/diesel/query_dsl/trait.UpdateAndFetchResults.html)
12+
//! -> [`diesel_async::UpdateAndFetchResults`](crate::UpdateAndFetchResults)
13+
//!
14+
//! These traits closely mirror their diesel counter parts while providing async functionality.
15+
//!
16+
//! In addition to these core traits 2 fully async connection implementations are provided
17+
//! by diesel-async:
18+
//!
19+
//! * [`AsyncMysqlConnection`] (enabled by the `mysql` feature)
20+
//! * [`AsyncPgConnection`] (enabled by the `postgres` feature)
21+
//!
22+
//! Ordinary usage of `diesel-async` assumes that you just replace the corresponding sync trait
23+
//! method calls and connections with their async counterparts.
24+
//!
25+
//! ```rust
26+
//! # include!("./doctest_setup.rs");
27+
//! #
28+
//! diesel::table! {
29+
//! users(id) {
30+
//! id -> Integer,
31+
//! name -> Text,
32+
//! }
33+
//! }
34+
//! #
35+
//! # #[tokio::main(flavor = "current_thread")]
36+
//! # async fn main() {
37+
//! # run_test().await;
38+
//! # }
39+
//! #
40+
//! # async fn run_test() -> QueryResult<()> {
41+
//! # use diesel::insert_into;
42+
//! use crate::users::dsl::*;
43+
//! # let mut connection = establish_connection().await;
44+
//! # /*
45+
//! let mut connection = AsyncPgConnection::establish(std::env::var("DATABASE_URL")?).await?;
46+
//! # */
47+
//! let data = users
48+
//! // use ordinary diesel query dsl here
49+
//! .filter(id.gt(0))
50+
//! // execute the query via the provided
51+
//! // async variant of `RunQueryDsl`
52+
//! .load::<(i32, String)>(&mut connection)
53+
//! .await?;
54+
//! let expected_data = vec![
55+
//! (1, String::from("Sean")),
56+
//! (2, String::from("Tess")),
57+
//! ];
58+
//! assert_eq!(expected_data, data);
59+
//! # Ok(())
60+
//! # }
61+
//! ```
62+
63+
#![warn(missing_docs)]
164
use diesel::backend::Backend;
265
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
366
use diesel::row::Row;
467
use diesel::{ConnectionResult, QueryResult};
568
use futures::future::BoxFuture;
6-
use futures::Stream;
69+
use futures::{Future, Stream};
70+
771
#[cfg(feature = "mysql")]
872
mod mysql;
973
#[cfg(feature = "postgres")]
@@ -17,42 +81,136 @@ pub use self::mysql::AsyncMysqlConnection;
1781
#[cfg(feature = "postgres")]
1882
pub use self::pg::AsyncPgConnection;
1983
pub use self::run_query_dsl::*;
20-
pub use self::stmt_cache::StmtCache;
21-
pub use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
2284

85+
use self::transaction_manager::{AnsiTransactionManager, TransactionManager};
86+
87+
/// Perform simple operations on a backend.
88+
///
89+
/// You should likely use [`AsyncConnection`] instead.
2390
#[async_trait::async_trait]
2491
pub trait SimpleAsyncConnection {
92+
/// Execute multiple SQL statements within the same string.
93+
///
94+
/// This function is used to execute migrations,
95+
/// which may contain more than one SQL statement.
2596
async fn batch_execute(&mut self, query: &str) -> QueryResult<()>;
2697
}
2798

28-
pub trait AsyncConnectionGatWorkaround<'a, DB: Backend> {
29-
type Stream: Stream<Item = QueryResult<Self::Row>> + Send + 'a;
30-
type Row: Row<'a, DB> + 'a;
99+
/// This trait is a workaround to emulate GAT on stable rust
100+
///
101+
/// It is used to specify the return type of `AsyncConnection::load`
102+
/// and `AsyncConnection::execute` which may contain lifetimes
103+
pub trait AsyncConnectionGatWorkaround<'conn, 'query, DB: Backend> {
104+
/// The future returned by `AsyncConnection::execute`
105+
type ExecuteFuture: Future<Output = QueryResult<usize>> + Send;
106+
/// The future returned by `AsyncConnection::load`
107+
type LoadFuture: Future<Output = QueryResult<Self::Stream>> + Send;
108+
/// The inner stream returned by `AsyncConnection::load`
109+
type Stream: Stream<Item = QueryResult<Self::Row>> + Send;
110+
/// The row type used by the stream returned by `AsyncConnection::load`
111+
type Row: Row<'conn, DB>;
31112
}
32113

114+
/// An async connection to a database
115+
///
116+
/// This trait represents a n async database connection. It can be used to query the database through
117+
/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
118+
/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
33119
#[async_trait::async_trait]
34120
pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send
35121
where
36-
for<'a> Self: AsyncConnectionGatWorkaround<'a, Self::Backend>,
122+
for<'a, 'b> Self: AsyncConnectionGatWorkaround<'a, 'b, Self::Backend>,
37123
{
124+
/// The backend this type connects to
38125
type Backend: Backend;
126+
127+
#[doc(hidden)]
39128
type TransactionManager: TransactionManager<Self>;
40129

130+
/// Establishes a new connection to the database
131+
///
132+
/// The argument to this method and the method's behavior varies by backend.
133+
/// See the documentation for that backend's connection class
134+
/// for details about what it accepts and how it behaves.
41135
async fn establish(database_url: &str) -> ConnectionResult<Self>;
42136

43-
async fn load<'a, T>(
44-
&'a mut self,
45-
source: T,
46-
) -> QueryResult<<Self as AsyncConnectionGatWorkaround<'a, Self::Backend>>::Stream>
47-
where
48-
T: AsQuery + Send,
49-
T::Query: QueryFragment<Self::Backend> + QueryId + Send;
50-
51-
async fn execute_returning_count<T>(&mut self, source: T) -> QueryResult<usize>
52-
where
53-
T: QueryFragment<Self::Backend> + QueryId + Send;
54-
55-
async fn transaction<F, R, E>(&mut self, callback: F) -> Result<R, E>
137+
/// Executes the given function inside of a database transaction
138+
///
139+
/// This function executes the provided closure `f` inside a database
140+
/// transaction. If there is already an open transaction for the current
141+
/// connection savepoints will be used instead. The connection is commited if
142+
/// the closure returns `Ok(_)`, it will be rolled back if it returns `Err(_)`.
143+
/// For both cases the original result value will be returned from this function.
144+
///
145+
/// If the transaction fails to commit due to a `SerializationFailure` or a
146+
/// `ReadOnlyTransaction` a rollback will be attempted. In this case a
147+
/// [`Error::CommitTransactionFailed`](crate::result::Error::CommitTransactionFailed)
148+
/// error is returned, which contains details about the original error and
149+
/// the success of the rollback attempt.
150+
/// If the rollback failed the connection should be considered broken
151+
/// as it contains a uncommitted unabortable open transaction. Any further
152+
/// interaction with the transaction system will result in an returned error
153+
/// in this cases.
154+
///
155+
/// If the closure returns an `Err(_)` and the rollback fails the function
156+
/// will return a [`Error::RollbackError`](crate::result::Error::RollbackError)
157+
/// wrapping the error generated by the rollback operation instead.
158+
/// In this case the connection should be considered broken as it contains
159+
/// an unabortable open transaction.
160+
///
161+
/// If a nested transaction fails to release the corresponding savepoint
162+
/// a rollback will be attempted. In this case a
163+
/// [`Error::CommitTransactionFailed`](crate::result::Error::CommitTransactionFailed)
164+
/// error is returned, which contains the original error and
165+
/// details about the success of the rollback attempt.
166+
///
167+
/// # Example
168+
///
169+
/// ```rust
170+
/// # include!("doctest_setup.rs");
171+
/// use diesel::result::Error;
172+
/// use futures::FutureExt;
173+
///
174+
/// # #[tokio::main(flavor = "current_thread")]
175+
/// # async fn main() {
176+
/// # run_test().await.unwrap();
177+
/// # }
178+
/// #
179+
/// # async fn run_test() -> QueryResult<()> {
180+
/// # use schema::users::dsl::*;
181+
/// # let conn = &mut establish_connection().await;
182+
/// conn.transaction::<_, Error, _>(|conn| async move {
183+
/// diesel::insert_into(users)
184+
/// .values(name.eq("Ruby"))
185+
/// .execute(conn)
186+
/// .await?;
187+
///
188+
/// let all_names = users.select(name).load::<String>(conn).await?;
189+
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
190+
///
191+
/// Ok(())
192+
/// }.boxed()).await?;
193+
///
194+
/// conn.transaction::<(), _, _>(|conn| async move {
195+
/// diesel::insert_into(users)
196+
/// .values(name.eq("Pascal"))
197+
/// .execute(conn)
198+
/// .await?;
199+
///
200+
/// let all_names = users.select(name).load::<String>(conn).await?;
201+
/// assert_eq!(vec!["Sean", "Tess", "Ruby", "Pascal"], all_names);
202+
///
203+
/// // If we want to roll back the transaction, but don't have an
204+
/// // actual error to return, we can return `RollbackTransaction`.
205+
/// Err(Error::RollbackTransaction)
206+
/// }.boxed()).await;
207+
///
208+
/// let all_names = users.select(name).load::<String>(conn).await?;
209+
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
210+
/// # Ok(())
211+
/// # }
212+
/// ```
213+
async fn transaction<R, E, F>(&mut self, callback: F) -> Result<R, E>
56214
where
57215
F: FnOnce(&mut Self) -> BoxFuture<Result<R, E>> + Send,
58216
E: From<diesel::result::Error> + Send,
@@ -73,11 +231,32 @@ where
73231
}
74232
}
75233

234+
/// Creates a transaction that will never be committed. This is useful for
235+
/// tests. Panics if called while inside of a transaction or
236+
/// if called with a connection containing a broken transaction
76237
async fn begin_test_transaction(&mut self) -> QueryResult<()> {
77238
assert_eq!(Self::TransactionManager::get_transaction_depth(self), 0);
78239
Self::TransactionManager::begin_transaction(self).await
79240
}
80241

242+
#[doc(hidden)]
243+
fn load<'conn, 'query, T>(
244+
&'conn mut self,
245+
source: T,
246+
) -> <Self as AsyncConnectionGatWorkaround<'conn, 'query, Self::Backend>>::LoadFuture
247+
where
248+
T: AsQuery + Send + 'query,
249+
T::Query: QueryFragment<Self::Backend> + QueryId + Send + 'query;
250+
251+
#[doc(hidden)]
252+
fn execute_returning_count<'conn, 'query, T>(
253+
&'conn mut self,
254+
source: T,
255+
) -> <Self as AsyncConnectionGatWorkaround<'conn, 'query, Self::Backend>>::ExecuteFuture
256+
where
257+
T: QueryFragment<Self::Backend> + QueryId + Send + 'query;
258+
259+
#[doc(hidden)]
81260
fn transaction_state(
82261
&mut self,
83262
) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData;

0 commit comments

Comments
 (0)