Skip to content

Commit f83abc2

Browse files
Allow pipelining with composed futures for Postgres
1 parent c73ded6 commit f83abc2

File tree

1 file changed

+113
-0
lines changed

1 file changed

+113
-0
lines changed

src/pg/mod.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,48 @@ const FAKE_OID: u32 = 0;
114114
/// # }
115115
/// ```
116116
///
117+
/// For more complex cases, an immutable reference to the connection need to be used:
118+
/// ```rust
119+
/// # include!("../doctest_setup.rs");
120+
/// use diesel_async::RunQueryDsl;
121+
///
122+
/// #
123+
/// # #[tokio::main(flavor = "current_thread")]
124+
/// # async fn main() {
125+
/// # run_test().await.unwrap();
126+
/// # }
127+
/// #
128+
/// # async fn run_test() -> QueryResult<()> {
129+
/// # use diesel::sql_types::{Text, Integer};
130+
/// # let conn = &mut establish_connection().await;
131+
/// #
132+
/// async fn fn12(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
133+
/// let f1 = diesel::select(1_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
134+
/// let f2 = diesel::select(2_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
135+
///
136+
/// futures_util::try_join!(f1, f2)
137+
/// }
138+
///
139+
/// async fn fn34(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
140+
/// let f3 = diesel::select(3_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
141+
/// let f4 = diesel::select(4_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
142+
///
143+
/// futures_util::try_join!(f3, f4)
144+
/// }
145+
///
146+
/// let f12 = fn12(&conn);
147+
/// let f34 = fn34(&conn);
148+
///
149+
/// let ((r1, r2), (r3, r4)) = futures_util::try_join!(f12, f34).unwrap();
150+
///
151+
/// assert_eq!(r1, 1);
152+
/// assert_eq!(r2, 2);
153+
/// assert_eq!(r3, 3);
154+
/// assert_eq!(r4, 4);
155+
/// # Ok(())
156+
/// # }
157+
/// ```
158+
///
117159
/// ## TLS
118160
///
119161
/// Connections created by [`AsyncPgConnection::establish`] do not support TLS.
@@ -136,6 +178,12 @@ pub struct AsyncPgConnection {
136178
}
137179

138180
impl SimpleAsyncConnection for AsyncPgConnection {
181+
async fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
182+
SimpleAsyncConnection::batch_execute(&mut &*self, query).await
183+
}
184+
}
185+
186+
impl SimpleAsyncConnection for &AsyncPgConnection {
139187
async fn batch_execute(&mut self, query: &str) -> QueryResult<()> {
140188
self.record_instrumentation(InstrumentationEvent::start_query(&StrQueryHelper::new(
141189
query,
@@ -167,6 +215,38 @@ impl AsyncConnectionCore for AsyncPgConnection {
167215
type Row<'conn, 'query> = PgRow;
168216
type Backend = diesel::pg::Pg;
169217

218+
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
219+
where
220+
T: AsQuery + 'query,
221+
T::Query: QueryFragment<Self::Backend> + QueryId + 'query,
222+
{
223+
AsyncConnectionCore::load(&mut &*self, source)
224+
}
225+
226+
fn execute_returning_count<'conn, 'query, T>(
227+
&'conn mut self,
228+
source: T,
229+
) -> Self::ExecuteFuture<'conn, 'query>
230+
where
231+
T: QueryFragment<Self::Backend> + QueryId + 'query,
232+
{
233+
AsyncConnectionCore::execute_returning_count(&mut &*self, source)
234+
}
235+
}
236+
237+
impl AsyncConnectionCore for &AsyncPgConnection {
238+
type LoadFuture<'conn, 'query> =
239+
<AsyncPgConnection as AsyncConnectionCore>::LoadFuture<'conn, 'query>;
240+
241+
type ExecuteFuture<'conn, 'query> =
242+
<AsyncPgConnection as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>;
243+
244+
type Stream<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::Stream<'conn, 'query>;
245+
246+
type Row<'conn, 'query> = <AsyncPgConnection as AsyncConnectionCore>::Row<'conn, 'query>;
247+
248+
type Backend = <AsyncPgConnection as AsyncConnectionCore>::Backend;
249+
170250
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
171251
where
172252
T: AsQuery + 'query,
@@ -962,4 +1042,37 @@ mod tests {
9621042
assert_eq!(r1, 1);
9631043
assert_eq!(r2, 2);
9641044
}
1045+
1046+
#[tokio::test]
1047+
async fn pipelining_with_composed_futures() {
1048+
let database_url =
1049+
std::env::var("DATABASE_URL").expect("DATABASE_URL must be set in order to run tests");
1050+
let conn = crate::AsyncPgConnection::establish(&database_url)
1051+
.await
1052+
.unwrap();
1053+
1054+
async fn fn12(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
1055+
let f1 = diesel::select(1_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
1056+
let f2 = diesel::select(2_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
1057+
1058+
futures_util::try_join!(f1, f2)
1059+
}
1060+
1061+
async fn fn34(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> {
1062+
let f3 = diesel::select(3_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
1063+
let f4 = diesel::select(4_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn);
1064+
1065+
futures_util::try_join!(f3, f4)
1066+
}
1067+
1068+
let f12 = fn12(&conn);
1069+
let f34 = fn34(&conn);
1070+
1071+
let ((r1, r2), (r3, r4)) = futures_util::try_join!(f12, f34).unwrap();
1072+
1073+
assert_eq!(r1, 1);
1074+
assert_eq!(r2, 2);
1075+
assert_eq!(r3, 3);
1076+
assert_eq!(r4, 4);
1077+
}
9651078
}

0 commit comments

Comments
 (0)