Skip to content

Commit c73ded6

Browse files
Split AsyncConnection trait
1 parent cbcf68f commit c73ded6

File tree

9 files changed

+166
-130
lines changed

9 files changed

+166
-130
lines changed

src/lib.rs

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,8 @@ pub trait SimpleAsyncConnection {
125125
fn batch_execute(&mut self, query: &str) -> impl Future<Output = QueryResult<()>> + Send;
126126
}
127127

128-
/// An async connection to a database
129-
///
130-
/// This trait represents a n async database connection. It can be used to query the database through
131-
/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
132-
/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
133-
pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
128+
/// Core trait for an async database connection
129+
pub trait AsyncConnectionCore: SimpleAsyncConnection + Send {
134130
/// The future returned by `AsyncConnection::execute`
135131
type ExecuteFuture<'conn, 'query>: Future<Output = QueryResult<usize>> + Send;
136132
/// The future returned by `AsyncConnection::load`
@@ -143,6 +139,37 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
143139
/// The backend this type connects to
144140
type Backend: Backend;
145141

142+
#[doc(hidden)]
143+
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
144+
where
145+
T: AsQuery + 'query,
146+
T::Query: QueryFragment<Self::Backend> + QueryId + 'query;
147+
148+
#[doc(hidden)]
149+
fn execute_returning_count<'conn, 'query, T>(
150+
&'conn mut self,
151+
source: T,
152+
) -> Self::ExecuteFuture<'conn, 'query>
153+
where
154+
T: QueryFragment<Self::Backend> + QueryId + 'query;
155+
156+
// These functions allow the associated types (`ExecuteFuture`, `LoadFuture`, etc.) to
157+
// compile without a `where Self: '_` clause. This is needed the because bound causes
158+
// lifetime issues when using `transaction()` with generic `AsyncConnection`s.
159+
//
160+
// See: https://github.com/rust-lang/rust/issues/87479
161+
#[doc(hidden)]
162+
fn _silence_lint_on_execute_future(_: Self::ExecuteFuture<'_, '_>) {}
163+
#[doc(hidden)]
164+
fn _silence_lint_on_load_future(_: Self::LoadFuture<'_, '_>) {}
165+
}
166+
167+
/// An async connection to a database
168+
///
169+
/// This trait represents an async database connection. It can be used to query the database through
170+
/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
171+
/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
172+
pub trait AsyncConnection: AsyncConnectionCore + Sized {
146173
#[doc(hidden)]
147174
type TransactionManager: TransactionManager<Self>;
148175

@@ -336,35 +363,11 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
336363
})
337364
}
338365

339-
#[doc(hidden)]
340-
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
341-
where
342-
T: AsQuery + 'query,
343-
T::Query: QueryFragment<Self::Backend> + QueryId + 'query;
344-
345-
#[doc(hidden)]
346-
fn execute_returning_count<'conn, 'query, T>(
347-
&'conn mut self,
348-
source: T,
349-
) -> Self::ExecuteFuture<'conn, 'query>
350-
where
351-
T: QueryFragment<Self::Backend> + QueryId + 'query;
352-
353366
#[doc(hidden)]
354367
fn transaction_state(
355368
&mut self,
356369
) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData;
357370

358-
// These functions allow the associated types (`ExecuteFuture`, `LoadFuture`, etc.) to
359-
// compile without a `where Self: '_` clause. This is needed the because bound causes
360-
// lifetime issues when using `transaction()` with generic `AsyncConnection`s.
361-
//
362-
// See: https://github.com/rust-lang/rust/issues/87479
363-
#[doc(hidden)]
364-
fn _silence_lint_on_execute_future(_: Self::ExecuteFuture<'_, '_>) {}
365-
#[doc(hidden)]
366-
fn _silence_lint_on_load_future(_: Self::LoadFuture<'_, '_>) {}
367-
368371
#[doc(hidden)]
369372
fn instrumentation(&mut self) -> &mut dyn Instrumentation;
370373

src/mysql/mod.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::stmt_cache::{CallbackHelper, QueryFragmentHelper};
2-
use crate::{AnsiTransactionManager, AsyncConnection, SimpleAsyncConnection};
2+
use crate::{AnsiTransactionManager, AsyncConnection, AsyncConnectionCore, SimpleAsyncConnection};
33
use diesel::connection::statement_cache::{
44
MaybeCached, QueryFragmentForCachedStatement, StatementCache,
55
};
@@ -64,30 +64,13 @@ const CONNECTION_SETUP_QUERIES: &[&str] = &[
6464
"SET character_set_results = 'utf8mb4'",
6565
];
6666

67-
impl AsyncConnection for AsyncMysqlConnection {
67+
impl AsyncConnectionCore for AsyncMysqlConnection {
6868
type ExecuteFuture<'conn, 'query> = BoxFuture<'conn, QueryResult<usize>>;
6969
type LoadFuture<'conn, 'query> = BoxFuture<'conn, QueryResult<Self::Stream<'conn, 'query>>>;
7070
type Stream<'conn, 'query> = BoxStream<'conn, QueryResult<Self::Row<'conn, 'query>>>;
7171
type Row<'conn, 'query> = MysqlRow;
7272
type Backend = Mysql;
7373

74-
type TransactionManager = AnsiTransactionManager;
75-
76-
async fn establish(database_url: &str) -> diesel::ConnectionResult<Self> {
77-
let mut instrumentation = DynInstrumentation::default_instrumentation();
78-
instrumentation.on_connection_event(InstrumentationEvent::start_establish_connection(
79-
database_url,
80-
));
81-
let r = Self::establish_connection_inner(database_url).await;
82-
instrumentation.on_connection_event(InstrumentationEvent::finish_establish_connection(
83-
database_url,
84-
r.as_ref().err(),
85-
));
86-
let mut conn = r?;
87-
conn.instrumentation = instrumentation;
88-
Ok(conn)
89-
}
90-
9174
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
9275
where
9376
T: diesel::query_builder::AsQuery,
@@ -173,6 +156,25 @@ impl AsyncConnection for AsyncMysqlConnection {
173156
.map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))
174157
})
175158
}
159+
}
160+
161+
impl AsyncConnection for AsyncMysqlConnection {
162+
type TransactionManager = AnsiTransactionManager;
163+
164+
async fn establish(database_url: &str) -> diesel::ConnectionResult<Self> {
165+
let mut instrumentation = DynInstrumentation::default_instrumentation();
166+
instrumentation.on_connection_event(InstrumentationEvent::start_establish_connection(
167+
database_url,
168+
));
169+
let r = Self::establish_connection_inner(database_url).await;
170+
instrumentation.on_connection_event(InstrumentationEvent::finish_establish_connection(
171+
database_url,
172+
r.as_ref().err(),
173+
));
174+
let mut conn = r?;
175+
conn.instrumentation = instrumentation;
176+
Ok(conn)
177+
}
176178

177179
fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
178180
&mut self.transaction_manager

src/pg/mod.rs

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use self::error_helper::ErrorHelper;
88
use self::row::PgRow;
99
use self::serialize::ToSqlHelper;
1010
use crate::stmt_cache::{CallbackHelper, QueryFragmentHelper};
11-
use crate::{AnsiTransactionManager, AsyncConnection, SimpleAsyncConnection};
11+
use crate::{AnsiTransactionManager, AsyncConnection, AsyncConnectionCore, SimpleAsyncConnection};
1212
use diesel::connection::statement_cache::{
1313
PrepareForCache, QueryFragmentForCachedStatement, StatementCache,
1414
};
@@ -160,12 +160,37 @@ impl SimpleAsyncConnection for AsyncPgConnection {
160160
}
161161
}
162162

163-
impl AsyncConnection for AsyncPgConnection {
163+
impl AsyncConnectionCore for AsyncPgConnection {
164164
type LoadFuture<'conn, 'query> = BoxFuture<'query, QueryResult<Self::Stream<'conn, 'query>>>;
165165
type ExecuteFuture<'conn, 'query> = BoxFuture<'query, QueryResult<usize>>;
166166
type Stream<'conn, 'query> = BoxStream<'static, QueryResult<PgRow>>;
167167
type Row<'conn, 'query> = PgRow;
168168
type Backend = diesel::pg::Pg;
169+
170+
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
171+
where
172+
T: AsQuery + 'query,
173+
T::Query: QueryFragment<Self::Backend> + QueryId + 'query,
174+
{
175+
let query = source.as_query();
176+
let load_future = self.with_prepared_statement(query, load_prepared);
177+
178+
self.run_with_connection_future(load_future)
179+
}
180+
181+
fn execute_returning_count<'conn, 'query, T>(
182+
&'conn mut self,
183+
source: T,
184+
) -> Self::ExecuteFuture<'conn, 'query>
185+
where
186+
T: QueryFragment<Self::Backend> + QueryId + 'query,
187+
{
188+
let execute = self.with_prepared_statement(source, execute_prepared);
189+
self.run_with_connection_future(execute)
190+
}
191+
}
192+
193+
impl AsyncConnection for AsyncPgConnection {
169194
type TransactionManager = AnsiTransactionManager;
170195

171196
async fn establish(database_url: &str) -> ConnectionResult<Self> {
@@ -198,28 +223,6 @@ impl AsyncConnection for AsyncPgConnection {
198223
r
199224
}
200225

201-
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
202-
where
203-
T: AsQuery + 'query,
204-
T::Query: QueryFragment<Self::Backend> + QueryId + 'query,
205-
{
206-
let query = source.as_query();
207-
let load_future = self.with_prepared_statement(query, load_prepared);
208-
209-
self.run_with_connection_future(load_future)
210-
}
211-
212-
fn execute_returning_count<'conn, 'query, T>(
213-
&'conn mut self,
214-
source: T,
215-
) -> Self::ExecuteFuture<'conn, 'query>
216-
where
217-
T: QueryFragment<Self::Backend> + QueryId + 'query,
218-
{
219-
let execute = self.with_prepared_statement(source, execute_prepared);
220-
self.run_with_connection_future(execute)
221-
}
222-
223226
fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
224227
// there should be no other pending future when this is called
225228
// that means there is only one instance of this arc and
@@ -467,7 +470,7 @@ impl AsyncPgConnection {
467470
}
468471

469472
fn with_prepared_statement<'a, T, F, R>(
470-
&mut self,
473+
&self,
471474
query: T,
472475
callback: fn(Arc<tokio_postgres::Client>, Statement, Vec<ToSqlHelper>) -> F,
473476
) -> BoxFuture<'a, QueryResult<R>>
@@ -502,7 +505,7 @@ impl AsyncPgConnection {
502505
}
503506

504507
fn with_prepared_statement_after_sql_built<'a, F, R>(
505-
&mut self,
508+
&self,
506509
callback: fn(Arc<tokio_postgres::Client>, Statement, Vec<ToSqlHelper>) -> F,
507510
is_safe_to_cache_prepared: QueryResult<bool>,
508511
query_id: Option<std::any::TypeId>,

src/pooled_connection/mod.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
//! * [deadpool](self::deadpool)
66
//! * [bb8](self::bb8)
77
//! * [mobc](self::mobc)
8-
use crate::{AsyncConnection, SimpleAsyncConnection};
8+
use crate::{AsyncConnection, AsyncConnectionCore, SimpleAsyncConnection};
99
use crate::{TransactionManager, UpdateAndFetchResults};
1010
use diesel::associations::HasTable;
1111
use diesel::connection::{CacheSize, Instrumentation};
@@ -176,27 +176,18 @@ where
176176
}
177177
}
178178

179-
impl<C> AsyncConnection for C
179+
impl<C> AsyncConnectionCore for C
180180
where
181181
C: DerefMut + Send,
182-
C::Target: AsyncConnection,
182+
C::Target: AsyncConnectionCore,
183183
{
184184
type ExecuteFuture<'conn, 'query> =
185-
<C::Target as AsyncConnection>::ExecuteFuture<'conn, 'query>;
186-
type LoadFuture<'conn, 'query> = <C::Target as AsyncConnection>::LoadFuture<'conn, 'query>;
187-
type Stream<'conn, 'query> = <C::Target as AsyncConnection>::Stream<'conn, 'query>;
188-
type Row<'conn, 'query> = <C::Target as AsyncConnection>::Row<'conn, 'query>;
185+
<C::Target as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>;
186+
type LoadFuture<'conn, 'query> = <C::Target as AsyncConnectionCore>::LoadFuture<'conn, 'query>;
187+
type Stream<'conn, 'query> = <C::Target as AsyncConnectionCore>::Stream<'conn, 'query>;
188+
type Row<'conn, 'query> = <C::Target as AsyncConnectionCore>::Row<'conn, 'query>;
189189

190-
type Backend = <C::Target as AsyncConnection>::Backend;
191-
192-
type TransactionManager =
193-
PoolTransactionManager<<C::Target as AsyncConnection>::TransactionManager>;
194-
195-
async fn establish(_database_url: &str) -> diesel::ConnectionResult<Self> {
196-
Err(diesel::result::ConnectionError::BadConnection(
197-
String::from("Cannot directly establish a pooled connection"),
198-
))
199-
}
190+
type Backend = <C::Target as AsyncConnectionCore>::Backend;
200191

201192
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
202193
where
@@ -221,6 +212,21 @@ where
221212
let conn = self.deref_mut();
222213
conn.execute_returning_count(source)
223214
}
215+
}
216+
217+
impl<C> AsyncConnection for C
218+
where
219+
C: DerefMut + Send,
220+
C::Target: AsyncConnection,
221+
{
222+
type TransactionManager =
223+
PoolTransactionManager<<C::Target as AsyncConnection>::TransactionManager>;
224+
225+
async fn establish(_database_url: &str) -> diesel::ConnectionResult<Self> {
226+
Err(diesel::result::ConnectionError::BadConnection(
227+
String::from("Cannot directly establish a pooled connection"),
228+
))
229+
}
224230

225231
fn transaction_state(
226232
&mut self,

0 commit comments

Comments
 (0)