Skip to content

Commit 8650b68

Browse files
authored
use async connection pool for db. (#6832)
1 parent 9b2c2e9 commit 8650b68

File tree

5 files changed

+133
-63
lines changed

5 files changed

+133
-63
lines changed

frameworks/Rust/xitca-web/Cargo.toml

100644100755
Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,21 @@ name = "xitca-web-diesel"
1212
path = "./src/main_diesel.rs"
1313

1414
[dependencies]
15-
xitca-http = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "7499e8a5aa4f7e1f17bbf6b6ee0816828dff3149" }
16-
xitca-web = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "7499e8a5aa4f7e1f17bbf6b6ee0816828dff3149" }
15+
xitca-http = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "a470092d8f1e1c3bb7a9831c175bf112b70f81e7" }
16+
xitca-web = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "a470092d8f1e1c3bb7a9831c175bf112b70f81e7" }
1717

1818
ahash = { version = "0.7.4", features = ["compile-time-rng"] }
1919
atoi = "0.4.0"
2020
bytes = "1"
2121
core_affinity = "0.5.10"
22-
diesel = { version = "1.4.7", features = ["postgres", "r2d2"] }
22+
diesel = { version = "1.4.7", features = ["postgres"] }
2323
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
2424
mimalloc = { version = "0.1.25", default-features = false }
2525
rand = { version = "0.8", default-features = false, features = ["min_const_gen", "small_rng"] }
2626
sailfish = "0.3.3"
2727
serde = "1"
2828
simd-json = "0.4.6"
29+
tang-rs = "0.2"
2930
tokio = { version = "1.7", features = ["macros", "rt"] }
3031
tokio-postgres = "0.7.2"
3132

@@ -36,6 +37,6 @@ codegen-units = 1
3637
panic = "abort"
3738

3839
[patch.crates-io]
39-
xitca-http = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "7499e8a5aa4f7e1f17bbf6b6ee0816828dff3149" }
40-
xitca-server = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "7499e8a5aa4f7e1f17bbf6b6ee0816828dff3149" }
41-
xitca-service = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "7499e8a5aa4f7e1f17bbf6b6ee0816828dff3149" }
40+
xitca-http = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "a470092d8f1e1c3bb7a9831c175bf112b70f81e7" }
41+
xitca-server = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "a470092d8f1e1c3bb7a9831c175bf112b70f81e7" }
42+
xitca-service = { git = "https://github.com/fakeshadow/xitca-web.git", rev = "a470092d8f1e1c3bb7a9831c175bf112b70f81e7" }

frameworks/Rust/xitca-web/src/db_diesel.rs

100644100755
Lines changed: 103 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,105 @@
1-
use std::{error::Error, io};
1+
use std::{error::Error, fmt, future::Future, io, time::Duration};
22

3-
use diesel::{prelude::*, r2d2};
3+
use diesel::prelude::*;
44
use rand::{rngs::SmallRng, Rng, SeedableRng};
5-
use tokio::task::spawn_blocking;
5+
use tang_rs::{Manager, ManagerFuture, ManagerTimeout, Pool};
6+
use tokio::{
7+
task::spawn_blocking,
8+
time::{sleep, Sleep},
9+
};
610

711
use super::ser::{Fortune, Fortunes, World};
812

913
type DbResult<T> = Result<T, Box<dyn Error + Send + Sync + 'static>>;
1014

15+
pub struct DieselPoolManager(String);
16+
17+
impl Manager for DieselPoolManager {
18+
type Connection = PgConnection;
19+
type Error = DieselPoolError;
20+
type Timeout = Sleep;
21+
type TimeoutError = ();
22+
23+
fn connect(&self) -> ManagerFuture<Result<Self::Connection, Self::Error>> {
24+
let conn = PgConnection::establish(self.0.as_str());
25+
Box::pin(async move { Ok(conn?) })
26+
}
27+
28+
fn is_valid<'a>(
29+
&'a self,
30+
_: &'a mut Self::Connection,
31+
) -> ManagerFuture<'a, Result<(), Self::Error>> {
32+
Box::pin(async { Ok(()) })
33+
}
34+
35+
fn is_closed(&self, _: &mut Self::Connection) -> bool {
36+
false
37+
}
38+
39+
fn spawn<Fut>(&self, fut: Fut)
40+
where
41+
Fut: Future<Output = ()> + 'static,
42+
{
43+
tokio::task::spawn_local(fut);
44+
}
45+
46+
fn timeout<Fut: Future>(&self, fut: Fut, dur: Duration) -> ManagerTimeout<Fut, Self::Timeout> {
47+
ManagerTimeout::new(fut, sleep(dur))
48+
}
49+
}
50+
51+
pub enum DieselPoolError {
52+
Inner(ConnectionError),
53+
TimeOut,
54+
}
55+
56+
impl fmt::Debug for DieselPoolError {
57+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
58+
match self {
59+
DieselPoolError::Inner(e) => e.fmt(f),
60+
DieselPoolError::TimeOut => f
61+
.debug_struct("DieselPoolError")
62+
.field("source", &"Connection Timeout")
63+
.finish(),
64+
}
65+
}
66+
}
67+
68+
impl fmt::Display for DieselPoolError {
69+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70+
write!(f, "{:?}", self)
71+
}
72+
}
73+
74+
impl Error for DieselPoolError {}
75+
76+
impl From<ConnectionError> for DieselPoolError {
77+
fn from(e: ConnectionError) -> Self {
78+
Self::Inner(e)
79+
}
80+
}
81+
82+
impl From<()> for DieselPoolError {
83+
fn from(_: ()) -> Self {
84+
Self::TimeOut
85+
}
86+
}
87+
1188
#[derive(Clone)]
1289
pub struct DieselPool {
13-
pool: r2d2::Pool<r2d2::ConnectionManager<PgConnection>>,
90+
pool: Pool<DieselPoolManager>,
1491
rng: SmallRng,
1592
}
1693

17-
pub fn connect(config: &str) -> io::Result<DieselPool> {
18-
let manager = r2d2::ConnectionManager::new(config);
19-
let pool = r2d2::Builder::new()
94+
pub async fn create(config: &str) -> io::Result<DieselPool> {
95+
let pool = tang_rs::Builder::new()
2096
.max_size(5)
21-
.min_idle(Some(5))
22-
.test_on_check_out(false)
97+
.min_idle(5)
98+
.always_check(false)
2399
.idle_timeout(None)
24100
.max_lifetime(None)
25-
.build(manager)
101+
.build(DieselPoolManager(String::from(config)))
102+
.await
26103
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
27104

28105
Ok(DieselPool {
@@ -33,17 +110,16 @@ pub fn connect(config: &str) -> io::Result<DieselPool> {
33110

34111
impl DieselPool {
35112
pub async fn get_world(&self) -> DbResult<World> {
36-
let mut this = self.clone();
113+
let mut rng = self.rng.clone();
114+
let conn = self.pool.get_owned().await?;
37115

38116
spawn_blocking(move || {
39117
use crate::schema::world::dsl::*;
40118

41-
let conn = this.pool.get()?;
42-
43-
let random_id = this.rng.gen_range(1..10_001);
119+
let random_id = rng.gen_range(1..10_001);
44120
let w = world
45121
.filter(id.eq(random_id))
46-
.load::<World>(&conn)?
122+
.load::<World>(&*conn)?
47123
.pop()
48124
.unwrap();
49125

@@ -53,19 +129,18 @@ impl DieselPool {
53129
}
54130

55131
pub async fn get_worlds(&self, num: u16) -> DbResult<Vec<World>> {
56-
let mut this = self.clone();
132+
let mut rng = self.rng.clone();
133+
let conn = self.pool.get_owned().await?;
57134

58135
spawn_blocking(move || {
59136
use crate::schema::world::dsl::*;
60137

61-
let conn = this.pool.get()?;
62-
63138
(0..num)
64139
.map(|_| {
65-
let w_id = this.rng.gen_range(1..10_001);
140+
let w_id = rng.gen_range(1..10_001);
66141
let w = world
67142
.filter(id.eq(w_id))
68-
.load::<World>(&conn)?
143+
.load::<World>(&*conn)?
69144
.pop()
70145
.unwrap();
71146
Ok(w)
@@ -76,22 +151,21 @@ impl DieselPool {
76151
}
77152

78153
pub async fn update(&self, num: u16) -> DbResult<Vec<World>> {
79-
let mut this = self.clone();
154+
let mut rng = self.rng.clone();
155+
let conn = self.pool.get_owned().await?;
80156

81157
spawn_blocking(move || {
82158
use crate::schema::world::dsl::*;
83159

84-
let conn = this.pool.get()?;
85-
86160
let mut worlds = (0..num)
87161
.map(|_| {
88-
let w_id: i32 = this.rng.gen_range(1..10_001);
162+
let w_id: i32 = rng.gen_range(1..10_001);
89163
let mut w = world
90164
.filter(id.eq(w_id))
91-
.load::<World>(&conn)?
165+
.load::<World>(&*conn)?
92166
.pop()
93167
.unwrap();
94-
w.randomnumber = this.rng.gen_range(1..10_001);
168+
w.randomnumber = rng.gen_range(1..10_001);
95169
Ok(w)
96170
})
97171
.collect::<DbResult<Vec<_>>>()?;
@@ -103,7 +177,7 @@ impl DieselPool {
103177
diesel::update(world)
104178
.filter(id.eq(w.id))
105179
.set(randomnumber.eq(w.randomnumber))
106-
.execute(&conn)?;
180+
.execute(&*conn)?;
107181
}
108182
Ok(())
109183
})?;
@@ -114,14 +188,12 @@ impl DieselPool {
114188
}
115189

116190
pub async fn tell_fortune(&self) -> DbResult<Fortunes> {
117-
let this = self.clone();
191+
let conn = self.pool.get_owned().await?;
118192

119193
spawn_blocking(move || {
120194
use crate::schema::fortune::dsl::*;
121195

122-
let conn = this.pool.get()?;
123-
124-
let mut items = fortune.load::<Fortune>(&conn)?;
196+
let mut items = fortune.load::<Fortune>(&*conn)?;
125197

126198
items.push(Fortune::new(0, "Additional fortune added at request time."));
127199
items.sort_by(|it, next| it.message.cmp(&next.message));

frameworks/Rust/xitca-web/src/main.rs

100644100755
Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414

1515
use bytes::Bytes;
1616
use xitca_http::http::{
17-
header::{HeaderValue, CONTENT_TYPE, SERVER},
17+
header::{CONTENT_TYPE, SERVER},
1818
Method,
1919
};
2020
use xitca_web::{dev::fn_service, request::WebRequest, App, HttpServer};
@@ -79,12 +79,9 @@ async fn fortunes(req: &mut WebRequest<'_, State>) -> HandleResult {
7979
Ok(body) => {
8080
let mut res = req.as_response(body);
8181

82+
res.headers_mut().append(SERVER, util::SERVER_HEADER_VALUE);
8283
res.headers_mut()
83-
.append(SERVER, HeaderValue::from_static("TFB"));
84-
res.headers_mut().append(
85-
CONTENT_TYPE,
86-
HeaderValue::from_static("text/html; charset=utf-8"),
87-
);
84+
.append(CONTENT_TYPE, util::HTML_HEADER_VALUE);
8885

8986
Ok(res)
9087
}

frameworks/Rust/xitca-web/src/main_diesel.rs

100644100755
Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ use std::{error::Error, io};
1313

1414
use bytes::Bytes;
1515
use xitca_http::http::{
16-
header::{HeaderValue, CONTENT_TYPE, SERVER},
16+
header::{CONTENT_TYPE, SERVER},
1717
Method,
1818
};
1919
use xitca_web::{dev::fn_service, request::WebRequest, App, HttpServer};
2020

21-
use self::db_diesel::{connect, DieselPool};
21+
use self::db_diesel::{create, DieselPool};
2222
use self::util::{
2323
internal, json, json_response, not_found, plain_text, AppState, HandleResult, QueryParse,
2424
};
@@ -30,10 +30,9 @@ async fn main() -> io::Result<()> {
3030
let config = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world";
3131

3232
HttpServer::new(move || {
33-
let pool = connect(config).unwrap();
34-
App::with_async_state(move || {
35-
let pool = pool.clone();
36-
async move { AppState::new(pool.clone()) }
33+
App::with_async_state(move || async move {
34+
let pool = create(config).await.unwrap();
35+
AppState::new(pool)
3736
})
3837
.service(fn_service(handle))
3938
})
@@ -70,12 +69,9 @@ async fn fortunes(req: &mut WebRequest<'_, State>) -> HandleResult {
7069
Ok(body) => {
7170
let mut res = req.as_response(body);
7271

72+
res.headers_mut().append(SERVER, util::SERVER_HEADER_VALUE);
7373
res.headers_mut()
74-
.append(SERVER, HeaderValue::from_static("TFB"));
75-
res.headers_mut().append(
76-
CONTENT_TYPE,
77-
HeaderValue::from_static("text/html; charset=utf-8"),
78-
);
74+
.append(CONTENT_TYPE, util::HTML_HEADER_VALUE);
7975

8076
Ok(res)
8177
}

frameworks/Rust/xitca-web/src/util.rs

100644100755
Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,19 @@ impl<C> AppState<C> {
8383
}
8484
}
8585

86+
pub const SERVER_HEADER_VALUE: HeaderValue = HeaderValue::from_static("TFB");
87+
88+
pub const HTML_HEADER_VALUE: HeaderValue = HeaderValue::from_static("text/html; charset=utf-8");
89+
90+
const TEXT_HEADER_VALUE: HeaderValue = HeaderValue::from_static("text/plain");
91+
92+
const JSON_HEADER_VALUE: HeaderValue = HeaderValue::from_static("application/json");
93+
8694
pub(super) fn plain_text<D>(req: &mut WebRequest<'_, D>) -> HandleResult {
8795
let mut res = req.as_response(Bytes::from_static(b"Hello, World!"));
8896

89-
res.headers_mut()
90-
.append(SERVER, HeaderValue::from_static("TFB"));
91-
res.headers_mut()
92-
.append(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
97+
res.headers_mut().append(SERVER, SERVER_HEADER_VALUE);
98+
res.headers_mut().append(CONTENT_TYPE, TEXT_HEADER_VALUE);
9399

94100
Ok(res)
95101
}
@@ -109,10 +115,8 @@ where
109115
let body = writer.take();
110116

111117
let mut res = req.as_response(body);
112-
res.headers_mut()
113-
.append(SERVER, HeaderValue::from_static("TFB"));
114-
res.headers_mut()
115-
.append(CONTENT_TYPE, HeaderValue::from_static("application/json"));
118+
res.headers_mut().append(SERVER, SERVER_HEADER_VALUE);
119+
res.headers_mut().append(CONTENT_TYPE, JSON_HEADER_VALUE);
116120

117121
Ok(res)
118122
}
@@ -124,7 +128,7 @@ macro_rules! error {
124128
pub(super) fn $error() -> HandleResult {
125129
Ok(WebResponseBuilder::new()
126130
.status($code)
127-
.header(SERVER, HeaderValue::from_static("TFB"))
131+
.header(SERVER, SERVER_HEADER_VALUE)
128132
.body(Bytes::new().into())
129133
.unwrap())
130134
}

0 commit comments

Comments
 (0)