Skip to content

Commit 58b7f2e

Browse files
committed
Improve the postgres instrumentation code
1 parent a06702c commit 58b7f2e

File tree

1 file changed

+31
-3
lines changed

1 file changed

+31
-3
lines changed

src/pg/mod.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ impl AsyncConnection for AsyncPgConnection {
147147
type TransactionManager = AnsiTransactionManager;
148148

149149
async fn establish(database_url: &str) -> ConnectionResult<Self> {
150+
let mut instrumentation = diesel::connection::get_default_instrumentation();
151+
instrumentation.on_connection_event(InstrumentationEvent::start_establish_connection(
152+
database_url,
153+
));
154+
let instrumentation = Arc::new(std::sync::Mutex::new(instrumentation));
150155
let (client, connection) = tokio_postgres::connect(database_url, tokio_postgres::NoTls)
151156
.await
152157
.map_err(ErrorHelper)?;
@@ -161,7 +166,21 @@ impl AsyncConnection for AsyncPgConnection {
161166
}
162167
});
163168

164-
Self::setup(client, Some(rx), Some(shutdown_tx)).await
169+
let r = Self::setup(
170+
client,
171+
Some(rx),
172+
Some(shutdown_tx),
173+
Arc::clone(&instrumentation),
174+
)
175+
.await;
176+
instrumentation
177+
.lock()
178+
.unwrap_or_else(|e| e.into_inner())
179+
.on_connection_event(InstrumentationEvent::finish_establish_connection(
180+
database_url,
181+
r.as_ref().err(),
182+
));
183+
r
165184
}
166185

167186
fn load<'conn, 'query, T>(&'conn mut self, source: T) -> Self::LoadFuture<'conn, 'query>
@@ -336,13 +355,22 @@ impl AsyncPgConnection {
336355

337356
/// Construct a new `AsyncPgConnection` instance from an existing [`tokio_postgres::Client`]
338357
pub async fn try_from(conn: tokio_postgres::Client) -> ConnectionResult<Self> {
339-
Self::setup(conn, None, None).await
358+
Self::setup(
359+
conn,
360+
None,
361+
None,
362+
Arc::new(std::sync::Mutex::new(
363+
diesel::connection::get_default_instrumentation(),
364+
)),
365+
)
366+
.await
340367
}
341368

342369
async fn setup(
343370
conn: tokio_postgres::Client,
344371
connection_future: Option<broadcast::Receiver<Arc<tokio_postgres::Error>>>,
345372
shutdown_channel: Option<oneshot::Sender<()>>,
373+
instrumentation: Arc<std::sync::Mutex<Option<Box<dyn Instrumentation>>>>,
346374
) -> ConnectionResult<Self> {
347375
let mut conn = Self {
348376
conn: Arc::new(conn),
@@ -351,7 +379,7 @@ impl AsyncPgConnection {
351379
metadata_cache: Arc::new(Mutex::new(PgMetadataCache::new())),
352380
connection_future,
353381
shutdown_channel,
354-
instrumentation: Arc::new(std::sync::Mutex::new(None)),
382+
instrumentation,
355383
};
356384
conn.set_config_options()
357385
.await

0 commit comments

Comments
 (0)