Skip to content

Commit 91cefe6

Browse files
authored
[xitca-web] add connection pool (#9252)
* [xitca-web] connection pool * fix plaintext. * update dep * simplify api
1 parent 8afa827 commit 91cefe6

File tree

11 files changed

+234
-209
lines changed

11 files changed

+234
-209
lines changed

frameworks/Rust/xitca-web/Cargo.lock

Lines changed: 34 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

frameworks/Rust/xitca-web/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,5 +95,5 @@ codegen-units = 1
9595
panic = "abort"
9696

9797
[patch.crates-io]
98-
xitca-postgres = { git = "https://github.com/HFQR/xitca-web.git", rev = "6870448" }
98+
xitca-postgres = { git = "https://github.com/HFQR/xitca-web.git", rev = "9835c0b" }
9999
mio = { git = "https://github.com/fakeshadow/mio", rev = "9bae6012b7ecfc6083350785f71a5e8265358178" }
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
max_width = 120

frameworks/Rust/xitca-web/src/db.rs

Lines changed: 97 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,71 @@
11
use std::fmt::Write;
22

33
use xitca_io::bytes::BytesMut;
4-
use xitca_postgres::{
5-
pipeline::Pipeline, statement::Statement, AsyncLendingIterator, SharedClient,
6-
};
4+
use xitca_postgres::{pipeline::Pipeline, AsyncLendingIterator, Pool, Type};
75

86
use super::{
97
ser::{Fortune, Fortunes, World},
108
util::{HandleResult, Rand, DB_URL},
119
};
1210

1311
pub struct Client {
14-
client: SharedClient,
12+
pool: Pool,
1513
#[cfg(not(feature = "pg-sync"))]
1614
shared: std::cell::RefCell<Shared>,
1715
#[cfg(feature = "pg-sync")]
1816
shared: std::sync::Mutex<Shared>,
19-
fortune: Statement,
20-
world: Statement,
21-
updates: Box<[Statement]>,
17+
updates: Box<[Box<str>]>,
2218
}
2319

2420
type Shared = (Rand, BytesMut);
2521

26-
pub async fn create() -> HandleResult<Client> {
27-
let mut client = SharedClient::new(DB_URL.to_string()).await?;
22+
const FORTUNE_SQL: &str = "SELECT * FROM fortune";
2823

29-
let fortune = client.prepare_cached("SELECT * FROM fortune", &[]).await?;
24+
const FORTUNE_SQL_TYPES: &[Type] = &[];
3025

31-
let world = client
32-
.prepare_cached("SELECT * FROM world WHERE id=$1", &[])
33-
.await?;
26+
const WORLD_SQL: &str = "SELECT * FROM world WHERE id=$1";
3427

35-
let mut updates = Vec::new();
28+
const WORLD_SQL_TYPES: &[Type] = &[Type::INT4];
3629

37-
// a dummy statement as placeholder of 0 index.
38-
// avoid off by one calculation when using non zero u16 as slicing index.
39-
updates.push(Statement::default());
30+
fn update_query(num: usize) -> Box<str> {
31+
const PREFIX: &str = "UPDATE world SET randomNumber = w.r FROM (VALUES ";
32+
const SUFFIX: &str = ") AS w (i,r) WHERE world.id = w.i";
4033

41-
for num in 1..=500u16 {
42-
let mut pl = 1;
43-
let mut q = String::new();
44-
q.push_str("UPDATE world SET randomnumber = CASE id ");
45-
for _ in 1..=num {
46-
let _ = write!(&mut q, "when ${} then ${} ", pl, pl + 1);
47-
pl += 2;
48-
}
49-
q.push_str("ELSE randomnumber END WHERE id IN (");
50-
for _ in 1..=num {
51-
let _ = write!(&mut q, "${},", pl);
52-
pl += 1;
53-
}
54-
q.pop();
55-
q.push(')');
34+
let (_, mut query) = (1..=num).fold((1, String::from(PREFIX)), |(idx, mut query), _| {
35+
write!(query, "(${}::int,${}::int),", idx, idx + 1).unwrap();
36+
(idx + 2, query)
37+
});
5638

57-
let st = client.prepare_cached(&q, &[]).await?;
58-
updates.push(st);
59-
}
39+
query.pop();
40+
41+
query.push_str(SUFFIX);
42+
43+
query.into_boxed_str()
44+
}
45+
46+
pub async fn create() -> HandleResult<Client> {
47+
let pool = Pool::builder(DB_URL).capacity(1).build()?;
6048

6149
let shared = (Rand::default(), BytesMut::new());
6250

51+
let updates = core::iter::once(Box::from(""))
52+
.chain((1..=500).map(update_query))
53+
.collect::<Box<[Box<str>]>>();
54+
55+
{
56+
let mut conn = pool.get().await?;
57+
for update in updates.iter().skip(1) {
58+
conn.prepare(update, &[]).await?;
59+
}
60+
}
61+
6362
Ok(Client {
64-
client,
63+
pool,
6564
#[cfg(not(feature = "pg-sync"))]
6665
shared: std::cell::RefCell::new(shared),
6766
#[cfg(feature = "pg-sync")]
6867
shared: std::sync::Mutex::new(shared),
69-
fortune,
70-
world,
71-
updates: updates.into_boxed_slice(),
68+
updates,
7269
})
7370
}
7471

@@ -84,29 +81,26 @@ impl Client {
8481
}
8582

8683
pub async fn get_world(&self) -> HandleResult<World> {
84+
let mut conn = self.pool.get().await?;
85+
let stmt = conn.prepare(WORLD_SQL, WORLD_SQL_TYPES).await?;
8786
let id = self.shared().0.gen_id();
88-
self.client
89-
.query_raw(&self.world, [id])
90-
.await?
91-
.try_next()
92-
.await?
93-
.map(|row| World::new(row.get_raw(0), row.get_raw(1)))
94-
.ok_or_else(|| "World does not exist".into())
87+
let mut res = conn.consume().query_raw(&stmt, [id])?;
88+
let row = res.try_next().await?.ok_or_else(|| "World does not exist")?;
89+
Ok(World::new(row.get_raw(0), row.get_raw(1)))
9590
}
9691

9792
pub async fn get_worlds(&self, num: u16) -> HandleResult<Vec<World>> {
9893
let len = num as usize;
9994

95+
let mut conn = self.pool.get().await?;
96+
let stmt = conn.prepare(WORLD_SQL, WORLD_SQL_TYPES).await?;
97+
10098
let mut res = {
10199
let (ref mut rng, ref mut buf) = *self.shared();
102-
103100
let mut pipe = Pipeline::with_capacity_from_buf(len, buf);
104-
105-
(0..num).try_for_each(|_| pipe.query_raw(&self.world, [rng.gen_id()]))?;
106-
107-
self.client.pipeline(pipe)
108-
}
109-
.await?;
101+
(0..num).try_for_each(|_| pipe.query_raw(&stmt, [rng.gen_id()]))?;
102+
conn.consume().pipeline(pipe)?
103+
};
110104

111105
let mut worlds = Vec::with_capacity(len);
112106

@@ -122,37 +116,34 @@ impl Client {
122116
pub async fn update(&self, num: u16) -> HandleResult<Vec<World>> {
123117
let len = num as usize;
124118

125-
let mut params = Vec::new();
126-
params.reserve(len * 3);
119+
let update = self.updates.get(len).ok_or_else(|| "num out of bound")?;
120+
121+
let mut conn = self.pool.get().await?;
122+
let world_stmt = conn.prepare(WORLD_SQL, WORLD_SQL_TYPES).await?;
123+
let update_stmt = conn.prepare(&update, &[]).await?;
124+
125+
let mut params = Vec::with_capacity(len);
127126

128127
let mut res = {
129128
let (ref mut rng, ref mut buf) = *self.shared();
130-
131129
let mut pipe = Pipeline::with_capacity_from_buf(len + 1, buf);
132-
133130
(0..num).try_for_each(|_| {
134131
let w_id = rng.gen_id();
135132
let r_id = rng.gen_id();
136-
params.extend([w_id, r_id]);
137-
pipe.query_raw(&self.world, [w_id])
133+
params.push([w_id, r_id]);
134+
pipe.query_raw(&world_stmt, [w_id])
138135
})?;
136+
pipe.query_raw(&update_stmt, sort_update_params(&params))?;
137+
conn.consume().pipeline(pipe)?
138+
};
139139

140-
params.extend_from_within(..len);
141-
142-
let st = self.updates.get(len).unwrap();
143-
pipe.query_raw(st, &params)?;
144-
145-
self.client.pipeline(pipe)
146-
}
147-
.await?;
140+
let mut worlds = Vec::with_capacity(len);
148141

149-
let mut worlds = Vec::new();
150-
worlds.reserve(len);
151-
let mut r_ids = params.into_iter().skip(1).step_by(2);
142+
let mut r_ids = params.into_iter();
152143

153144
while let Some(mut item) = res.try_next().await? {
154145
while let Some(row) = item.try_next().await? {
155-
let r_id = r_ids.next().unwrap();
146+
let r_id = r_ids.next().unwrap()[1];
156147
worlds.push(World::new(row.get_raw(0), r_id))
157148
}
158149
}
@@ -164,12 +155,46 @@ impl Client {
164155
let mut items = Vec::with_capacity(32);
165156
items.push(Fortune::new(0, "Additional fortune added at request time."));
166157

167-
let mut res = self.client.query_raw::<[i32; 0]>(&self.fortune, []).await?;
158+
let mut conn = self.pool.get().await?;
159+
let stmt = conn.prepare(FORTUNE_SQL, FORTUNE_SQL_TYPES).await?;
160+
let mut res = conn.consume().query_raw::<[i32; 0]>(&stmt, [])?;
161+
168162
while let Some(row) = res.try_next().await? {
169163
items.push(Fortune::new(row.get_raw(0), row.get_raw::<String>(1)));
170164
}
165+
171166
items.sort_by(|it, next| it.message.cmp(&next.message));
172167

173168
Ok(Fortunes::new(items))
174169
}
175170
}
171+
172+
fn sort_update_params(params: &Vec<[i32; 2]>) -> impl ExactSizeIterator<Item = i32> {
173+
let mut params = params.clone();
174+
params.sort_by(|a, b| a[0].cmp(&b[0]));
175+
176+
struct ParamIter<I>(I);
177+
178+
impl<I> Iterator for ParamIter<I>
179+
where
180+
I: Iterator,
181+
{
182+
type Item = I::Item;
183+
184+
#[inline]
185+
fn next(&mut self) -> Option<Self::Item> {
186+
self.0.next()
187+
}
188+
189+
#[inline]
190+
fn size_hint(&self) -> (usize, Option<usize>) {
191+
self.0.size_hint()
192+
}
193+
}
194+
195+
// impl depends on compiler optimization to flat Vec<[T]> to Vec<T> when inferring
196+
// it's size hint. possible to cause runtime panic.
197+
impl<I> ExactSizeIterator for ParamIter<I> where I: Iterator {}
198+
199+
ParamIter(params.into_iter().flatten())
200+
}

frameworks/Rust/xitca-web/src/db_diesel.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,7 @@ impl _Pool {
4343

4444
let w_id = self.rng.lock().unwrap().gen_id();
4545
let mut conn = self.pool.get()?;
46-
world
47-
.filter(id.eq(w_id))
48-
.load(&mut conn)?
49-
.pop()
50-
.ok_or_else(not_found)
46+
world.filter(id.eq(w_id)).load(&mut conn)?.pop().ok_or_else(not_found)
5147
}
5248

5349
pub fn get_worlds(&self, num: u16) -> HandleResult<Vec<World>> {

0 commit comments

Comments
 (0)