Skip to content

Commit 3eb72fb

Browse files
authored
Merge pull request #38 from sidechainme/main
add usage of scoped futures to transactions: ownership of this change transferred to @weiznich
2 parents 439a5f1 + 85b2059 commit 3eb72fb

File tree

8 files changed

+70
-45
lines changed

8 files changed

+70
-45
lines changed

Cargo.lock

Lines changed: 42 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mysql_common = {version = "0.29.0", optional = true}
2424
bb8 = {version = "0.8", optional = true}
2525
deadpool = {version = "0.9", optional = true}
2626
mobc = {version = "0.7", optional = true}
27+
scoped-futures = {version = "0.1", features = ["std"]}
2728

2829
[dev-dependencies]
2930
tokio = {version = "1.12.0", features = ["rt", "macros", "rt-multi-thread"]}

src/lib.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,11 @@ use diesel::backend::Backend;
7070
use diesel::query_builder::{AsQuery, QueryFragment, QueryId};
7171
use diesel::row::Row;
7272
use diesel::{ConnectionResult, QueryResult};
73-
use futures::future::BoxFuture;
7473
use futures::{Future, Stream};
7574

75+
pub use scoped_futures;
76+
use scoped_futures::ScopedBoxFuture;
77+
7678
#[cfg(feature = "mysql")]
7779
mod mysql;
7880
#[cfg(feature = "postgres")]
@@ -180,7 +182,7 @@ where
180182
/// ```rust
181183
/// # include!("doctest_setup.rs");
182184
/// use diesel::result::Error;
183-
/// use futures::FutureExt;
185+
/// use scoped_futures::ScopedFutureExt;
184186
///
185187
/// # #[tokio::main(flavor = "current_thread")]
186188
/// # async fn main() {
@@ -200,7 +202,7 @@ where
200202
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
201203
///
202204
/// Ok(())
203-
/// }.boxed()).await?;
205+
/// }.scope_boxed()).await?;
204206
///
205207
/// conn.transaction::<(), _, _>(|conn| async move {
206208
/// diesel::insert_into(users)
@@ -214,18 +216,18 @@ where
214216
/// // If we want to roll back the transaction, but don't have an
215217
/// // actual error to return, we can return `RollbackTransaction`.
216218
/// Err(Error::RollbackTransaction)
217-
/// }.boxed()).await;
219+
/// }.scope_boxed()).await;
218220
///
219221
/// let all_names = users.select(name).load::<String>(conn).await?;
220222
/// assert_eq!(vec!["Sean", "Tess", "Ruby"], all_names);
221223
/// # Ok(())
222224
/// # }
223225
/// ```
224-
async fn transaction<R, E, F>(&mut self, callback: F) -> Result<R, E>
226+
async fn transaction<'a, R, E, F>(&mut self, callback: F) -> Result<R, E>
225227
where
226-
F: FnOnce(&mut Self) -> BoxFuture<Result<R, E>> + Send,
227-
E: From<diesel::result::Error> + Send,
228-
R: Send,
228+
F: for<'r> FnOnce(&'r mut Self) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
229+
E: From<diesel::result::Error> + Send + 'a,
230+
R: Send + 'a,
229231
{
230232
Self::TransactionManager::transaction(self, callback).await
231233
}

src/pg/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ impl AsyncPgConnection {
271271
///
272272
/// ```rust
273273
/// # include!("../doctest_setup.rs");
274-
/// # use futures::FutureExt;
274+
/// # use scoped_futures::ScopedFutureExt;
275275
/// #
276276
/// # #[tokio::main(flavor = "current_thread")]
277277
/// # async fn main() {
@@ -285,7 +285,7 @@ impl AsyncPgConnection {
285285
/// .read_only()
286286
/// .serializable()
287287
/// .deferrable()
288-
/// .run(|conn| async move { Ok(()) }.boxed())
288+
/// .run(|conn| async move { Ok(()) }.scope_boxed())
289289
/// .await
290290
/// # }
291291
/// ```

src/pg/transaction_builder.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use diesel::backend::Backend;
33
use diesel::pg::Pg;
44
use diesel::query_builder::{AstPass, QueryBuilder, QueryFragment};
55
use diesel::QueryResult;
6-
use futures::future::BoxFuture;
6+
use scoped_futures::ScopedBoxFuture;
77

88
/// Used to build a transaction, specifying additional details.
99
///
@@ -283,10 +283,11 @@ where
283283
/// the original error will be returned, otherwise the error generated by the rollback
284284
/// will be returned. In the second case the connection should be considered broken
285285
/// as it contains a uncommitted unabortable open transaction.
286-
pub async fn run<T, E, F>(&mut self, f: F) -> Result<T, E>
286+
pub async fn run<'b, T, E, F>(&mut self, f: F) -> Result<T, E>
287287
where
288-
F: FnOnce(&mut C) -> BoxFuture<Result<T, E>> + Send,
289-
E: From<diesel::result::Error>,
288+
F: for<'r> FnOnce(&'r mut C) -> ScopedBoxFuture<'b, 'r, Result<T, E>> + Send + 'a,
289+
T: 'b,
290+
E: From<diesel::result::Error> + 'b,
290291
{
291292
let mut query_builder = <Pg as Backend>::QueryBuilder::default();
292293
self.to_sql(&mut query_builder, &Pg)?;

src/run_query_dsl/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ pub mod methods {
9696
U: Send,
9797
T: AsQuery + Send,
9898
T::SqlType: CompatibleType<U, DB, SqlType = ST>,
99-
U: FromSqlRow<ST, DB> + Send + 'static,
99+
U: FromSqlRow<ST, DB> + Send,
100100
DB: QueryMetadata<T::SqlType>,
101101
{
102102
type LoadFuture = futures::future::MapOk<
@@ -115,13 +115,12 @@ pub mod methods {
115115
where
116116
Conn: AsyncConnection<Backend = DB>,
117117
U: Send,
118-
DB: Backend + 'static,
118+
DB: Backend,
119119
T: AsQuery + Send + 'query,
120120
T::Query: QueryFragment<DB> + QueryId + Send + 'query,
121121
T::SqlType: CompatibleType<U, DB, SqlType = ST>,
122-
U: FromSqlRow<ST, DB> + Send + 'static,
122+
U: FromSqlRow<ST, DB> + Send,
123123
DB: QueryMetadata<T::SqlType>,
124-
ST: 'static,
125124
{
126125
fn internal_load<'conn>(
127126
self,
@@ -142,9 +141,8 @@ pub mod methods {
142141
where
143142
S: Stream<Item = QueryResult<R>> + Send + 's,
144143
R: diesel::row::Row<'a, DB> + 's,
145-
DB: Backend + 'static,
146-
U: FromSqlRow<ST, DB> + 'static,
147-
ST: 'static,
144+
DB: Backend,
145+
U: FromSqlRow<ST, DB>,
148146
{
149147
stream.map(map_row_helper::<_, DB, U, ST>)
150148
}

src/transaction_manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use diesel::result::Error;
22
use diesel::QueryResult;
3-
use futures::future::BoxFuture;
3+
use scoped_futures::ScopedBoxFuture;
44
use std::borrow::Cow;
55
use std::num::NonZeroU32;
66

@@ -50,9 +50,9 @@ pub trait TransactionManager<Conn: AsyncConnection>: Send {
5050
///
5151
/// Each implementation of this function needs to fulfill the documented
5252
/// behaviour of [`AsyncConnection::transaction`]
53-
async fn transaction<F, R, E>(conn: &mut Conn, callback: F) -> Result<R, E>
53+
async fn transaction<'a, F, R, E>(conn: &mut Conn, callback: F) -> Result<R, E>
5454
where
55-
F: FnOnce(&mut Conn) -> BoxFuture<Result<R, E>> + Send,
55+
F: for<'r> FnOnce(&'r mut Conn) -> ScopedBoxFuture<'a, 'r, Result<R, E>> + Send + 'a,
5656
E: From<Error> + Send,
5757
R: Send,
5858
{

tests/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use diesel::prelude::{ExpressionMethods, OptionalExtension, QueryDsl};
22
use diesel::QueryResult;
33
use diesel_async::*;
4-
use futures::FutureExt;
4+
use scoped_futures::ScopedFutureExt;
55
use std::fmt::Debug;
66
use std::pin::Pin;
77

@@ -31,7 +31,7 @@ async fn transaction_test(conn: &mut TestConnection) -> QueryResult<()> {
3131
assert_eq!(count, 3);
3232
Ok(())
3333
}
34-
.boxed()
34+
.scope_boxed()
3535
})
3636
.await;
3737
assert!(res.is_ok());

0 commit comments

Comments
 (0)