Skip to content

Commit f1fecde

Browse files
committed
Add support for various async connection pool implementations
1 parent 7fb327b commit f1fecde

File tree

9 files changed

+586
-28
lines changed

9 files changed

+586
-28
lines changed

Cargo.lock

Lines changed: 136 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@ tokio = { version = "1.12.0", features = ["rt"], optional = true}
1717
mysql_async = { version = "0.29.0", optional = true}
1818
mysql_common = {version = "0.28.0", optional = true}
1919

20+
bb8 = {version = "0.8", optional = true}
21+
deadpool = {version = "0.9", optional = true}
22+
mobc = {version = "0.7", optional = true}
23+
2024
[dev-dependencies]
2125
tokio = {version = "1.12.0", features = ["rt", "macros"]}
2226
cfg-if = "1"
2327
chrono = "0.4"
2428
diesel = { version = "2.0.0-rc.0", default-features = false, features = ["chrono"]}
2529

2630
[features]
27-
default = ["postgres", "mysql"]
31+
default = ["postgres", "mysql", "deadpool", "bb8", "mobc"]
2832
mysql = ["diesel/mysql_backend", "mysql_async", "mysql_common"]
2933
postgres = ["diesel/postgres_backend", "tokio-postgres", "tokio", "tokio/rt-multi-thread"]
3034

31-
3235
[[test]]
3336
name = "integration_tests"
3437
path = "tests/lib.rs"

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ mod pg;
7575
mod run_query_dsl;
7676
mod stmt_cache;
7777
mod transaction_manager;
78+
#[cfg(any(feature = "deadpool", feature = "bb8", feature = "mobc"))]
79+
pub mod pooled_connection;
7880

7981
#[cfg(feature = "mysql")]
8082
pub use self::mysql::AsyncMysqlConnection;

src/mysql/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,3 +213,6 @@ impl AsyncMysqlConnection {
213213
}).boxed()
214214
}
215215
}
216+
217+
#[cfg(any(feature = "deadpool", feature = "bb8", feature = "mobc"))]
218+
impl crate::pooled_connection::PoolableConnection for AsyncMysqlConnection {}

src/pg/mod.rs

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -482,26 +482,8 @@ async fn lookup_type(
482482
Ok((r.get(0), r.get(1)))
483483
}
484484

485-
diesel::table! {
486-
pg_type (oid) {
487-
oid -> Oid,
488-
typname -> Text,
489-
typarray -> Oid,
490-
typnamespace -> Oid,
491-
}
492-
}
493-
494-
diesel::table! {
495-
pg_namespace (oid) {
496-
oid -> Oid,
497-
nspname -> Text,
498-
}
499-
}
500-
501-
diesel::joinable!(pg_type -> pg_namespace(typnamespace));
502-
diesel::allow_tables_to_appear_in_same_query!(pg_type, pg_namespace);
503-
504-
diesel::sql_function! { fn pg_my_temp_schema() -> Oid; }
485+
#[cfg(any(feature = "deadpool", feature = "bb8", feature = "mobc"))]
486+
impl crate::pooled_connection::PoolableConnection for AsyncPgConnection {}
505487

506488
#[cfg(test)]
507489
pub mod tests {

src/pooled_connection/bb8.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
//! A pool implementation for `diesel-async` based on [`bb8`]
2+
//!
3+
//! ```rust
4+
//! # include!("../doctest_setup.rs");
5+
//! use diesel::result::Error;
6+
//! use futures::FutureExt;
7+
//! use diesel_async::pooled_connection::AsyncDieselConnectionManager;
8+
//! use diesel_async::pooled_connection::bb8::Pool;
9+
//!
10+
//! # #[tokio::main(flavor = "current_thread")]
11+
//! # async fn main() {
12+
//! # run_test().await.unwrap();
13+
//! # }
14+
//! #
15+
//! # #[cfg(feature = "postgres")]
16+
//! # fn get_config() -> AsyncDieselConnectionManager<diesel_async::AsyncPgConnection> {
17+
//! # let db_url = database_url_from_env("PG_DATABASE_URL");
18+
//! let config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(db_url);
19+
//! # config
20+
//! # }
21+
//! #
22+
//! # #[cfg(feature = "mysql")]
23+
//! # fn get_config() -> AsyncDieselConnectionManager<diesel_async::AsyncMysqlConnection> {
24+
//! # let db_url = database_url_from_env("MYSQL_DATABASE_URL");
25+
//! # let config = AsyncDieselConnectionManager::<diesel_async::AsyncMysqlConnection>::new(db_url);
26+
//! # config
27+
//! # }
28+
//! #
29+
//! # async fn run_test() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
30+
//! # use schema::users::dsl::*;
31+
//! # let config = get_config();
32+
//! let pool = Pool::builder().build(config).await?;
33+
//! let mut conn = pool.get().await?;
34+
//! let res = users.load::<(i32, String)>(&mut conn).await?;
35+
//! # Ok(())
36+
//! # }
37+
//! ```
38+
39+
use super::{AsyncDieselConnectionManager, PoolError, PoolableConnection};
40+
use bb8::ManageConnection;
41+
42+
/// Type alias for using [`bb8::Pool`] with [`diesel-async`]
43+
pub type Pool<C> = bb8::Pool<AsyncDieselConnectionManager<C>>;
44+
45+
#[async_trait::async_trait]
46+
impl<C> ManageConnection for AsyncDieselConnectionManager<C>
47+
where
48+
C: PoolableConnection + 'static,
49+
{
50+
type Connection = C;
51+
52+
type Error = PoolError;
53+
54+
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
55+
C::establish(&self.connection_url)
56+
.await
57+
.map_err(PoolError::ConnectionError)
58+
}
59+
60+
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
61+
conn.ping().await.map_err(PoolError::QueryError)
62+
}
63+
64+
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
65+
std::thread::panicking() || conn.is_broken()
66+
}
67+
}

0 commit comments

Comments
 (0)