1
1
use crate :: stmt_cache:: { PrepareCallback , StmtCache } ;
2
2
use crate :: { AnsiTransactionManager , AsyncConnection , SimpleAsyncConnection } ;
3
3
use diesel:: connection:: statement_cache:: { MaybeCached , StatementCacheKey } ;
4
+ use diesel:: connection:: Instrumentation ;
5
+ use diesel:: connection:: InstrumentationEvent ;
6
+ use diesel:: connection:: StrQueryHelper ;
4
7
use diesel:: mysql:: { Mysql , MysqlQueryBuilder , MysqlType } ;
5
8
use diesel:: query_builder:: QueryBuilder ;
6
9
use diesel:: query_builder:: { bind_collector:: RawBytesBindCollector , QueryFragment , QueryId } ;
@@ -26,12 +29,32 @@ pub struct AsyncMysqlConnection {
26
29
conn : mysql_async:: Conn ,
27
30
stmt_cache : StmtCache < Mysql , Statement > ,
28
31
transaction_manager : AnsiTransactionManager ,
32
+ instrumentation : std:: sync:: Mutex < Option < Box < dyn Instrumentation > > > ,
29
33
}
30
34
31
35
#[ async_trait:: async_trait]
32
36
impl SimpleAsyncConnection for AsyncMysqlConnection {
33
37
async fn batch_execute ( & mut self , query : & str ) -> diesel:: QueryResult < ( ) > {
34
- Ok ( self . conn . query_drop ( query) . await . map_err ( ErrorHelper ) ?)
38
+ self . instrumentation
39
+ . lock ( )
40
+ . unwrap_or_else ( |p| p. into_inner ( ) )
41
+ . on_connection_event ( InstrumentationEvent :: start_query ( & StrQueryHelper :: new (
42
+ query,
43
+ ) ) ) ;
44
+ let result = self
45
+ . conn
46
+ . query_drop ( query)
47
+ . await
48
+ . map_err ( ErrorHelper )
49
+ . map_err ( Into :: into) ;
50
+ self . instrumentation
51
+ . lock ( )
52
+ . unwrap_or_else ( |p| p. into_inner ( ) )
53
+ . on_connection_event ( InstrumentationEvent :: finish_query (
54
+ & StrQueryHelper :: new ( query) ,
55
+ result. as_ref ( ) . err ( ) ,
56
+ ) ) ;
57
+ result
35
58
}
36
59
}
37
60
@@ -53,20 +76,18 @@ impl AsyncConnection for AsyncMysqlConnection {
53
76
type TransactionManager = AnsiTransactionManager ;
54
77
55
78
async fn establish ( database_url : & str ) -> diesel:: ConnectionResult < Self > {
56
- let opts = Opts :: from_url ( database_url)
57
- . map_err ( |e| diesel:: result:: ConnectionError :: InvalidConnectionUrl ( e. to_string ( ) ) ) ?;
58
- let builder = OptsBuilder :: from_opts ( opts)
59
- . init ( CONNECTION_SETUP_QUERIES . to_vec ( ) )
60
- . stmt_cache_size ( 0 ) // We have our own cache
61
- . client_found_rows ( true ) ; // This allows a consistent behavior between MariaDB/MySQL and PostgreSQL (and is already set in `diesel`)
62
-
63
- let conn = mysql_async:: Conn :: new ( builder) . await . map_err ( ErrorHelper ) ?;
64
-
65
- Ok ( AsyncMysqlConnection {
66
- conn,
67
- stmt_cache : StmtCache :: new ( ) ,
68
- transaction_manager : AnsiTransactionManager :: default ( ) ,
69
- } )
79
+ let mut instrumentation = diesel:: connection:: get_default_instrumentation ( ) ;
80
+ instrumentation. on_connection_event ( InstrumentationEvent :: start_establish_connection (
81
+ database_url,
82
+ ) ) ;
83
+ let r = Self :: establish_connection_inner ( database_url) . await ;
84
+ instrumentation. on_connection_event ( InstrumentationEvent :: finish_establish_connection (
85
+ database_url,
86
+ r. as_ref ( ) . err ( ) ,
87
+ ) ) ;
88
+ let mut conn = r?;
89
+ conn. instrumentation = std:: sync:: Mutex :: new ( instrumentation) ;
90
+ Ok ( conn)
70
91
}
71
92
72
93
fn load < ' conn , ' query , T > ( & ' conn mut self , source : T ) -> Self :: LoadFuture < ' conn , ' query >
@@ -80,7 +101,10 @@ impl AsyncConnection for AsyncMysqlConnection {
80
101
let stmt_for_exec = match stmt {
81
102
MaybeCached :: Cached ( ref s) => ( * s) . clone ( ) ,
82
103
MaybeCached :: CannotCache ( ref s) => s. clone ( ) ,
83
- _ => todo ! ( ) ,
104
+ _ => unreachable ! (
105
+ "Diesel has only two variants here at the time of writing.\n \
106
+ If you ever see this error message please open in issue in the diesel-async issue tracker"
107
+ ) ,
84
108
} ;
85
109
86
110
let ( tx, rx) = futures_channel:: mpsc:: channel ( 0 ) ;
@@ -152,6 +176,19 @@ impl AsyncConnection for AsyncMysqlConnection {
152
176
fn transaction_state ( & mut self ) -> & mut AnsiTransactionManager {
153
177
& mut self . transaction_manager
154
178
}
179
+
180
+ fn instrumentation ( & mut self ) -> & mut dyn Instrumentation {
181
+ self . instrumentation
182
+ . get_mut ( )
183
+ . unwrap_or_else ( |p| p. into_inner ( ) )
184
+ }
185
+
186
+ fn set_instrumentation ( & mut self , instrumentation : impl Instrumentation ) {
187
+ * self
188
+ . instrumentation
189
+ . get_mut ( )
190
+ . unwrap_or_else ( |p| p. into_inner ( ) ) = Some ( Box :: new ( instrumentation) ) ;
191
+ }
155
192
}
156
193
157
194
#[ inline( always) ]
@@ -195,6 +232,7 @@ impl AsyncMysqlConnection {
195
232
conn,
196
233
stmt_cache : StmtCache :: new ( ) ,
197
234
transaction_manager : AnsiTransactionManager :: default ( ) ,
235
+ instrumentation : std:: sync:: Mutex :: new ( None ) ,
198
236
} ;
199
237
200
238
for stmt in CONNECTION_SETUP_QUERIES {
@@ -219,6 +257,12 @@ impl AsyncMysqlConnection {
219
257
T : QueryFragment < Mysql > + QueryId ,
220
258
F : Future < Output = QueryResult < R > > + Send ,
221
259
{
260
+ self . instrumentation
261
+ . lock ( )
262
+ . unwrap_or_else ( |p| p. into_inner ( ) )
263
+ . on_connection_event ( InstrumentationEvent :: start_query ( & diesel:: debug_query (
264
+ & query,
265
+ ) ) ) ;
222
266
let mut bind_collector = RawBytesBindCollector :: < Mysql > :: new ( ) ;
223
267
let bind_collector = query
224
268
. collect_binds ( & mut bind_collector, & mut ( ) , & Mysql )
@@ -228,6 +272,7 @@ impl AsyncMysqlConnection {
228
272
ref mut conn,
229
273
ref mut stmt_cache,
230
274
ref mut transaction_manager,
275
+ ref instrumentation,
231
276
..
232
277
} = self ;
233
278
@@ -242,28 +287,37 @@ impl AsyncMysqlConnection {
242
287
} = bind_collector?;
243
288
let is_safe_to_cache_prepared = is_safe_to_cache_prepared?;
244
289
let sql = sql?;
245
- let cache_key = if let Some ( query_id) = query_id {
246
- StatementCacheKey :: Type ( query_id)
247
- } else {
248
- StatementCacheKey :: Sql {
249
- sql : sql. clone ( ) ,
250
- bind_types : metadata. clone ( ) ,
251
- }
290
+ let inner = async {
291
+ let cache_key = if let Some ( query_id) = query_id {
292
+ StatementCacheKey :: Type ( query_id)
293
+ } else {
294
+ StatementCacheKey :: Sql {
295
+ sql : sql. clone ( ) ,
296
+ bind_types : metadata. clone ( ) ,
297
+ }
298
+ } ;
299
+
300
+ let ( stmt, conn) = stmt_cache
301
+ . cached_prepared_statement (
302
+ cache_key,
303
+ sql. clone ( ) ,
304
+ is_safe_to_cache_prepared,
305
+ & metadata,
306
+ conn,
307
+ instrumentation,
308
+ )
309
+ . await ?;
310
+ callback ( conn, stmt, ToSqlHelper { metadata, binds } ) . await
252
311
} ;
253
-
254
- let ( stmt, conn) = stmt_cache
255
- . cached_prepared_statement (
256
- cache_key,
257
- sql,
258
- is_safe_to_cache_prepared,
259
- & metadata,
260
- conn,
261
- )
262
- . await ?;
263
- update_transaction_manager_status (
264
- callback ( conn, stmt, ToSqlHelper { metadata, binds } ) . await ,
265
- transaction_manager,
266
- )
312
+ let r = update_transaction_manager_status ( inner. await , transaction_manager) ;
313
+ instrumentation
314
+ . lock ( )
315
+ . unwrap_or_else ( |p| p. into_inner ( ) )
316
+ . on_connection_event ( InstrumentationEvent :: finish_query (
317
+ & StrQueryHelper :: new ( & sql) ,
318
+ r. as_ref ( ) . err ( ) ,
319
+ ) ) ;
320
+ r
267
321
}
268
322
. boxed ( )
269
323
}
@@ -300,6 +354,26 @@ impl AsyncMysqlConnection {
300
354
301
355
Ok ( ( ) )
302
356
}
357
+
358
+ async fn establish_connection_inner (
359
+ database_url : & str ,
360
+ ) -> Result < AsyncMysqlConnection , ConnectionError > {
361
+ let opts = Opts :: from_url ( database_url)
362
+ . map_err ( |e| diesel:: result:: ConnectionError :: InvalidConnectionUrl ( e. to_string ( ) ) ) ?;
363
+ let builder = OptsBuilder :: from_opts ( opts)
364
+ . init ( CONNECTION_SETUP_QUERIES . to_vec ( ) )
365
+ . stmt_cache_size ( 0 ) // We have our own cache
366
+ . client_found_rows ( true ) ; // This allows a consistent behavior between MariaDB/MySQL and PostgreSQL (and is already set in `diesel`)
367
+
368
+ let conn = mysql_async:: Conn :: new ( builder) . await . map_err ( ErrorHelper ) ?;
369
+
370
+ Ok ( AsyncMysqlConnection {
371
+ conn,
372
+ stmt_cache : StmtCache :: new ( ) ,
373
+ transaction_manager : AnsiTransactionManager :: default ( ) ,
374
+ instrumentation : std:: sync:: Mutex :: new ( None ) ,
375
+ } )
376
+ }
303
377
}
304
378
305
379
#[ cfg( any(
0 commit comments