11use crate :: stmt_cache:: { PrepareCallback , StmtCache } ;
22use crate :: { AnsiTransactionManager , AsyncConnection , SimpleAsyncConnection } ;
33use diesel:: connection:: statement_cache:: { MaybeCached , StatementCacheKey } ;
4+ use diesel:: connection:: Instrumentation ;
5+ use diesel:: connection:: InstrumentationEvent ;
6+ use diesel:: connection:: StrQueryHelper ;
47use diesel:: mysql:: { Mysql , MysqlQueryBuilder , MysqlType } ;
58use diesel:: query_builder:: QueryBuilder ;
69use diesel:: query_builder:: { bind_collector:: RawBytesBindCollector , QueryFragment , QueryId } ;
@@ -26,12 +29,28 @@ pub struct AsyncMysqlConnection {
2629 conn : mysql_async:: Conn ,
2730 stmt_cache : StmtCache < Mysql , Statement > ,
2831 transaction_manager : AnsiTransactionManager ,
32+ instrumentation : std:: sync:: Mutex < Option < Box < dyn Instrumentation > > > ,
2933}
3034
3135#[ async_trait:: async_trait]
3236impl SimpleAsyncConnection for AsyncMysqlConnection {
3337 async fn batch_execute ( & mut self , query : & str ) -> diesel:: QueryResult < ( ) > {
34- Ok ( self . conn . query_drop ( query) . await . map_err ( ErrorHelper ) ?)
38+ self . instrumentation ( )
39+ . on_connection_event ( InstrumentationEvent :: start_query ( & StrQueryHelper :: new (
40+ query,
41+ ) ) ) ;
42+ let result = self
43+ . conn
44+ . query_drop ( query)
45+ . await
46+ . map_err ( ErrorHelper )
47+ . map_err ( Into :: into) ;
48+ self . instrumentation ( )
49+ . on_connection_event ( InstrumentationEvent :: finish_query (
50+ & StrQueryHelper :: new ( query) ,
51+ result. as_ref ( ) . err ( ) ,
52+ ) ) ;
53+ result
3554 }
3655}
3756
@@ -53,20 +72,18 @@ impl AsyncConnection for AsyncMysqlConnection {
5372 type TransactionManager = AnsiTransactionManager ;
5473
5574 async fn establish ( database_url : & str ) -> diesel:: ConnectionResult < Self > {
56- let opts = Opts :: from_url ( database_url)
57- . map_err ( |e| diesel:: result:: ConnectionError :: InvalidConnectionUrl ( e. to_string ( ) ) ) ?;
58- let builder = OptsBuilder :: from_opts ( opts)
59- . init ( CONNECTION_SETUP_QUERIES . to_vec ( ) )
60- . stmt_cache_size ( 0 ) // We have our own cache
61- . client_found_rows ( true ) ; // This allows a consistent behavior between MariaDB/MySQL and PostgreSQL (and is already set in `diesel`)
62-
63- let conn = mysql_async:: Conn :: new ( builder) . await . map_err ( ErrorHelper ) ?;
64-
65- Ok ( AsyncMysqlConnection {
66- conn,
67- stmt_cache : StmtCache :: new ( ) ,
68- transaction_manager : AnsiTransactionManager :: default ( ) ,
69- } )
75+ let mut instrumentation = diesel:: connection:: get_default_instrumentation ( ) ;
76+ instrumentation. on_connection_event ( InstrumentationEvent :: start_establish_connection (
77+ database_url,
78+ ) ) ;
79+ let r = Self :: establish_connection_inner ( database_url) . await ;
80+ instrumentation. on_connection_event ( InstrumentationEvent :: finish_establish_connection (
81+ database_url,
82+ r. as_ref ( ) . err ( ) ,
83+ ) ) ;
84+ let mut conn = r?;
85+ conn. instrumentation = std:: sync:: Mutex :: new ( instrumentation) ;
86+ Ok ( conn)
7087 }
7188
7289 fn load < ' conn , ' query , T > ( & ' conn mut self , source : T ) -> Self :: LoadFuture < ' conn , ' query >
@@ -80,7 +97,10 @@ impl AsyncConnection for AsyncMysqlConnection {
8097 let stmt_for_exec = match stmt {
8198 MaybeCached :: Cached ( ref s) => ( * s) . clone ( ) ,
8299 MaybeCached :: CannotCache ( ref s) => s. clone ( ) ,
83- _ => todo ! ( ) ,
100+ _ => unreachable ! (
101+ "Diesel has only two variants here at the time of writing.\n \
102+ If you ever see this error message please open in issue in the diesel-async issue tracker"
103+ ) ,
84104 } ;
85105
86106 let ( tx, rx) = futures_channel:: mpsc:: channel ( 0 ) ;
@@ -152,6 +172,19 @@ impl AsyncConnection for AsyncMysqlConnection {
152172 fn transaction_state ( & mut self ) -> & mut AnsiTransactionManager {
153173 & mut self . transaction_manager
154174 }
175+
176+ fn instrumentation ( & mut self ) -> & mut dyn Instrumentation {
177+ self . instrumentation
178+ . get_mut ( )
179+ . unwrap_or_else ( |p| p. into_inner ( ) )
180+ }
181+
182+ fn set_instrumentation ( & mut self , instrumentation : impl Instrumentation ) {
183+ * self
184+ . instrumentation
185+ . get_mut ( )
186+ . unwrap_or_else ( |p| p. into_inner ( ) ) = Some ( Box :: new ( instrumentation) ) ;
187+ }
155188}
156189
157190#[ inline( always) ]
@@ -195,6 +228,9 @@ impl AsyncMysqlConnection {
195228 conn,
196229 stmt_cache : StmtCache :: new ( ) ,
197230 transaction_manager : AnsiTransactionManager :: default ( ) ,
231+ instrumentation : std:: sync:: Mutex :: new (
232+ diesel:: connection:: get_default_instrumentation ( ) ,
233+ ) ,
198234 } ;
199235
200236 for stmt in CONNECTION_SETUP_QUERIES {
@@ -219,6 +255,10 @@ impl AsyncMysqlConnection {
219255 T : QueryFragment < Mysql > + QueryId ,
220256 F : Future < Output = QueryResult < R > > + Send ,
221257 {
258+ self . instrumentation ( )
259+ . on_connection_event ( InstrumentationEvent :: start_query ( & diesel:: debug_query (
260+ & query,
261+ ) ) ) ;
222262 let mut bind_collector = RawBytesBindCollector :: < Mysql > :: new ( ) ;
223263 let bind_collector = query
224264 . collect_binds ( & mut bind_collector, & mut ( ) , & Mysql )
@@ -228,6 +268,7 @@ impl AsyncMysqlConnection {
228268 ref mut conn,
229269 ref mut stmt_cache,
230270 ref mut transaction_manager,
271+ ref mut instrumentation,
231272 ..
232273 } = self ;
233274
@@ -242,28 +283,37 @@ impl AsyncMysqlConnection {
242283 } = bind_collector?;
243284 let is_safe_to_cache_prepared = is_safe_to_cache_prepared?;
244285 let sql = sql?;
245- let cache_key = if let Some ( query_id) = query_id {
246- StatementCacheKey :: Type ( query_id)
247- } else {
248- StatementCacheKey :: Sql {
249- sql : sql. clone ( ) ,
250- bind_types : metadata. clone ( ) ,
251- }
286+ let inner = async {
287+ let cache_key = if let Some ( query_id) = query_id {
288+ StatementCacheKey :: Type ( query_id)
289+ } else {
290+ StatementCacheKey :: Sql {
291+ sql : sql. clone ( ) ,
292+ bind_types : metadata. clone ( ) ,
293+ }
294+ } ;
295+
296+ let ( stmt, conn) = stmt_cache
297+ . cached_prepared_statement (
298+ cache_key,
299+ sql. clone ( ) ,
300+ is_safe_to_cache_prepared,
301+ & metadata,
302+ conn,
303+ instrumentation,
304+ )
305+ . await ?;
306+ callback ( conn, stmt, ToSqlHelper { metadata, binds } ) . await
252307 } ;
253-
254- let ( stmt, conn) = stmt_cache
255- . cached_prepared_statement (
256- cache_key,
257- sql,
258- is_safe_to_cache_prepared,
259- & metadata,
260- conn,
261- )
262- . await ?;
263- update_transaction_manager_status (
264- callback ( conn, stmt, ToSqlHelper { metadata, binds } ) . await ,
265- transaction_manager,
266- )
308+ let r = update_transaction_manager_status ( inner. await , transaction_manager) ;
309+ instrumentation
310+ . get_mut ( )
311+ . unwrap_or_else ( |p| p. into_inner ( ) )
312+ . on_connection_event ( InstrumentationEvent :: finish_query (
313+ & StrQueryHelper :: new ( & sql) ,
314+ r. as_ref ( ) . err ( ) ,
315+ ) ) ;
316+ r
267317 }
268318 . boxed ( )
269319 }
@@ -300,6 +350,26 @@ impl AsyncMysqlConnection {
300350
301351 Ok ( ( ) )
302352 }
353+
354+ async fn establish_connection_inner (
355+ database_url : & str ,
356+ ) -> Result < AsyncMysqlConnection , ConnectionError > {
357+ let opts = Opts :: from_url ( database_url)
358+ . map_err ( |e| diesel:: result:: ConnectionError :: InvalidConnectionUrl ( e. to_string ( ) ) ) ?;
359+ let builder = OptsBuilder :: from_opts ( opts)
360+ . init ( CONNECTION_SETUP_QUERIES . to_vec ( ) )
361+ . stmt_cache_size ( 0 ) // We have our own cache
362+ . client_found_rows ( true ) ; // This allows a consistent behavior between MariaDB/MySQL and PostgreSQL (and is already set in `diesel`)
363+
364+ let conn = mysql_async:: Conn :: new ( builder) . await . map_err ( ErrorHelper ) ?;
365+
366+ Ok ( AsyncMysqlConnection {
367+ conn,
368+ stmt_cache : StmtCache :: new ( ) ,
369+ transaction_manager : AnsiTransactionManager :: default ( ) ,
370+ instrumentation : std:: sync:: Mutex :: new ( None ) ,
371+ } )
372+ }
303373}
304374
305375#[ cfg( any(
0 commit comments