Skip to content

Commit 0e703ba

Browse files
authored
Merge pull request #248 from stormshield-kg/pg_ref_run_query
Improve pipelining for Postgres
2 parents 09c91a7 + 682e032 commit 0e703ba

File tree

9 files changed

+342
-131
lines changed

9 files changed

+342
-131
lines changed

src/lib.rs

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,8 @@ pub trait SimpleAsyncConnection {
125125
fn batch_execute(&mut self, query: &str) -> impl Future<Output = QueryResult<()>> + Send;
126126
}
127127

128-
/// An async connection to a database
129-
///
130-
/// This trait represents a n async database connection. It can be used to query the database through
131-
/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
132-
/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
133-
pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
128+
/// Core trait for an async database connection
129+
pub trait AsyncConnectionCore: SimpleAsyncConnection + Send {
134130
/// The future returned by `AsyncConnection::execute`
135131
type ExecuteFuture<'conn, 'query>: Future<Output = QueryResult<usize>> + Send;
136132
/// The future returned by `AsyncConnection::load`
@@ -143,6 +139,37 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
143139
/// The backend this type connects to
144140
type Backend: Backend;
145141

142+
#[doc(hidden)]
143+
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
144+
where
145+
T: AsQuery + 'query,
146+
T::Query: QueryFragment<Self::Backend> + QueryId + 'query;
147+
148+
#[doc(hidden)]
149+
fn execute_returning_count<'conn, 'query, T>(
150+
&'conn mut self,
151+
source: T,
152+
) -> Self::ExecuteFuture<'conn, 'query>
153+
where
154+
T: QueryFragment<Self::Backend> + QueryId + 'query;
155+
156+
// These functions allow the associated types (`ExecuteFuture`, `LoadFuture`, etc.) to
157+
// compile without a `where Self: '_` clause. This is needed the because bound causes
158+
// lifetime issues when using `transaction()` with generic `AsyncConnection`s.
159+
//
160+
// See: https://github.com/rust-lang/rust/issues/87479
161+
#[doc(hidden)]
162+
fn _silence_lint_on_execute_future(_: Self::ExecuteFuture<'_, '_>) {}
163+
#[doc(hidden)]
164+
fn _silence_lint_on_load_future(_: Self::LoadFuture<'_, '_>) {}
165+
}
166+
167+
/// An async connection to a database
168+
///
169+
/// This trait represents an async database connection. It can be used to query the database through
170+
/// the query dsl provided by diesel, custom extensions or raw sql queries. It essentially mirrors
171+
/// the sync diesel [`Connection`](diesel::connection::Connection) implementation
172+
pub trait AsyncConnection: AsyncConnectionCore + Sized {
146173
#[doc(hidden)]
147174
type TransactionManager: TransactionManager<Self>;
148175

@@ -336,35 +363,11 @@ pub trait AsyncConnection: SimpleAsyncConnection + Sized + Send {
336363
})
337364
}
338365

339-
#[doc(hidden)]
340-
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
341-
where
342-
T: AsQuery + 'query,
343-
T::Query: QueryFragment<Self::Backend> + QueryId + 'query;
344-
345-
#[doc(hidden)]
346-
fn execute_returning_count<'conn, 'query, T>(
347-
&'conn mut self,
348-
source: T,
349-
) -> Self::ExecuteFuture<'conn, 'query>
350-
where
351-
T: QueryFragment<Self::Backend> + QueryId + 'query;
352-
353366
#[doc(hidden)]
354367
fn transaction_state(
355368
&mut self,
356369
) -> &mut <Self::TransactionManager as TransactionManager<Self>>::TransactionStateData;
357370

358-
// These functions allow the associated types (`ExecuteFuture`, `LoadFuture`, etc.) to
359-
// compile without a `where Self: '_` clause. This is needed the because bound causes
360-
// lifetime issues when using `transaction()` with generic `AsyncConnection`s.
361-
//
362-
// See: https://github.com/rust-lang/rust/issues/87479
363-
#[doc(hidden)]
364-
fn _silence_lint_on_execute_future(_: Self::ExecuteFuture<'_, '_>) {}
365-
#[doc(hidden)]
366-
fn _silence_lint_on_load_future(_: Self::LoadFuture<'_, '_>) {}
367-
368371
#[doc(hidden)]
369372
fn instrumentation(&mut self) -> &mut dyn Instrumentation;
370373

src/mysql/mod.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::stmt_cache::{CallbackHelper, QueryFragmentHelper};
2-
use crate::{AnsiTransactionManager, AsyncConnection, SimpleAsyncConnection};
2+
use crate::{AnsiTransactionManager, AsyncConnection, AsyncConnectionCore, SimpleAsyncConnection};
33
use diesel::connection::statement_cache::{
44
MaybeCached, QueryFragmentForCachedStatement, StatementCache,
55
};
@@ -64,30 +64,13 @@ const CONNECTION_SETUP_QUERIES: &[&str] = &[
6464
"SET character_set_results = 'utf8mb4'",
6565
];
6666

67-
impl AsyncConnection for AsyncMysqlConnection {
67+
impl AsyncConnectionCore for AsyncMysqlConnection {
6868
type ExecuteFuture<'conn, 'query> = BoxFuture<'conn, QueryResult<usize>>;
6969
type LoadFuture<'conn, 'query> = BoxFuture<'conn, QueryResult<Self::Stream<'conn, 'query>>>;
7070
type Stream<'conn, 'query> = BoxStream<'conn, QueryResult<Self::Row<'conn, 'query>>>;
7171
type Row<'conn, 'query> = MysqlRow;
7272
type Backend = Mysql;
7373

74-
type TransactionManager = AnsiTransactionManager;
75-
76-
async fn establish(database_url: &str) -> diesel::ConnectionResult<Self> {
77-
let mut instrumentation = DynInstrumentation::default_instrumentation();
78-
instrumentation.on_connection_event(InstrumentationEvent::start_establish_connection(
79-
database_url,
80-
));
81-
let r = Self::establish_connection_inner(database_url).await;
82-
instrumentation.on_connection_event(InstrumentationEvent::finish_establish_connection(
83-
database_url,
84-
r.as_ref().err(),
85-
));
86-
let mut conn = r?;
87-
conn.instrumentation = instrumentation;
88-
Ok(conn)
89-
}
90-
9174
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
9275
where
9376
T: diesel::query_builder::AsQuery,
@@ -173,6 +156,25 @@ impl AsyncConnection for AsyncMysqlConnection {
173156
.map_err(|e| diesel::result::Error::DeserializationError(Box::new(e)))
174157
})
175158
}
159+
}
160+
161+
impl AsyncConnection for AsyncMysqlConnection {
162+
type TransactionManager = AnsiTransactionManager;
163+
164+
async fn establish(database_url: &str) -> diesel::ConnectionResult<Self> {
165+
let mut instrumentation = DynInstrumentation::default_instrumentation();
166+
instrumentation.on_connection_event(InstrumentationEvent::start_establish_connection(
167+
database_url,
168+
));
169+
let r = Self::establish_connection_inner(database_url).await;
170+
instrumentation.on_connection_event(InstrumentationEvent::finish_establish_connection(
171+
database_url,
172+
r.as_ref().err(),
173+
));
174+
let mut conn = r?;
175+
conn.instrumentation = instrumentation;
176+
Ok(conn)
177+
}
176178

177179
fn transaction_state(&mut self) -> &mut AnsiTransactionManager {
178180
&mut self.transaction_manager

0 commit comments

Comments
 (0)