Skip to content

Commit 3748ff9

Browse files
committed
clean up module layout
1 parent aae1066 commit 3748ff9

File tree

9 files changed

+136
-146
lines changed

9 files changed

+136
-146
lines changed

frameworks/Rust/xitca-web/Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

frameworks/Rust/xitca-web/src/db.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use super::{
1010
util::{DB_URL, HandleResult},
1111
};
1212

13-
use db_util::{FORTUNE_STMT, Shared, UPDATE_BATCH_STMT, WORLD_STMT, not_found};
13+
use db_util::{FORTUNE_STMT, Shared, UPDATE_STMT, WORLD_STMT, not_found};
1414

1515
pub struct Client {
1616
pool: Pool,
@@ -60,7 +60,7 @@ impl Client {
6060
pub async fn update(&self, num: u16) -> HandleResult<Vec<World>> {
6161
let mut conn = self.pool.get().await?;
6262
let world_stmt = WORLD_STMT.execute(&mut conn).await?;
63-
let update_stmt = UPDATE_BATCH_STMT.execute(&mut conn).await?;
63+
let update_stmt = UPDATE_STMT.execute(&mut conn).await?;
6464

6565
let (mut res, worlds) = {
6666
let (ref mut rng, ref mut buf) = *self.shared.borrow_mut();
Lines changed: 78 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,3 @@
1-
#[path = "./db_util.rs"]
2-
mod db_util;
3-
4-
use core::cell::RefCell;
5-
6-
use std::io;
7-
81
use diesel::prelude::*;
92
use diesel_async::{
103
RunQueryDsl,
@@ -18,31 +11,29 @@ use crate::{
1811
util::{DB_URL, HandleResult, Rand},
1912
};
2013

21-
use db_util::update_query_from_ids;
22-
2314
pub struct Pool {
2415
pool: bb8::Pool<AsyncPgConnection>,
25-
rng: RefCell<Rand>,
16+
rng: core::cell::RefCell<Rand>,
2617
}
2718

28-
pub async fn create() -> io::Result<Pool> {
29-
bb8::Pool::builder()
30-
.max_size(1)
31-
.min_idle(Some(1))
32-
.test_on_check_out(false)
33-
.build(AsyncDieselConnectionManager::new(DB_URL))
34-
.await
35-
.map_err(io::Error::other)
36-
.map(|pool| Pool {
19+
impl Pool {
20+
pub async fn create() -> HandleResult<Self> {
21+
let pool = bb8::Pool::builder()
22+
.max_size(1)
23+
.min_idle(Some(1))
24+
.test_on_check_out(false)
25+
.build(AsyncDieselConnectionManager::new(DB_URL))
26+
.await?;
27+
28+
Ok(Self {
3729
pool,
38-
rng: RefCell::new(Rand::default()),
30+
rng: Default::default(),
3931
})
40-
}
32+
}
4133

42-
impl Pool {
4334
pub async fn get_world(&self) -> HandleResult<World> {
4435
{
45-
use crate::schema::world::dsl::*;
36+
use schema::world::dsl::*;
4637

4738
let w_id = self.rng.borrow_mut().gen_id();
4839
let mut conn = self.pool.get().await?;
@@ -53,45 +44,44 @@ impl Pool {
5344

5445
pub async fn get_worlds(&self, num: u16) -> HandleResult<Vec<World>> {
5546
{
56-
use crate::schema::world::dsl::*;
47+
use schema::world::dsl::*;
5748

5849
let mut conn = self.pool.get().await?;
59-
let mut rng = self.rng.borrow_mut();
60-
61-
core::iter::repeat_with(move || {
62-
let w_id = rng.gen_id();
63-
world.filter(id.eq(w_id)).first(&mut conn).map_err(Into::into)
64-
})
65-
.take(num as _)
66-
.collect::<TryJoinAll<_>>()
50+
self.rng
51+
.borrow_mut()
52+
.gen_multi()
53+
.take(num as _)
54+
.map(|w_id| world.filter(id.eq(w_id)).first(&mut conn).map_err(Into::into))
55+
.collect::<TryJoinAll<_>>()
6756
}
6857
.await
6958
}
7059

7160
pub async fn update(&self, num: u16) -> HandleResult<Vec<World>> {
7261
{
73-
use crate::schema::world::dsl::*;
62+
use schema::world::dsl::*;
7463

7564
let mut conn = self.pool.get().await?;
7665
let mut rng = self.rng.borrow_mut();
7766
let mut params = Vec::with_capacity(num as _);
7867

79-
let get = core::iter::repeat_with(|| {
80-
let w_id = rng.gen_id();
81-
let rng = rng.gen_id();
68+
let get = rng
69+
.clone()
70+
.gen_multi()
71+
.take(num as _)
72+
.zip(rng.gen_multi())
73+
.map(|(w_id, rng)| {
74+
let get = world.filter(id.eq(w_id)).first::<World>(&mut conn);
8275

83-
let get = world.filter(id.eq(w_id)).first::<World>(&mut conn);
76+
params.push((w_id, rng));
8477

85-
params.push((w_id, rng));
86-
87-
async move {
88-
let mut w = get.await?;
89-
w.randomnumber = rng;
90-
HandleResult::Ok(w)
91-
}
92-
})
93-
.take(num as _)
94-
.collect::<TryJoinAll<_>>();
78+
async move {
79+
let mut w = get.await?;
80+
w.randomnumber = rng;
81+
HandleResult::Ok(w)
82+
}
83+
})
84+
.collect::<TryJoinAll<_>>();
9585

9686
let sql = update_query_from_ids(params);
9787
let update = diesel::sql_query(sql).execute(&mut conn).map_err(Into::into);
@@ -104,7 +94,7 @@ impl Pool {
10494

10595
pub async fn tell_fortune(&self) -> HandleResult<Fortunes> {
10696
{
107-
use crate::schema::fortune::dsl::*;
97+
use schema::fortune::dsl::*;
10898

10999
let mut conn = self.pool.get().await?;
110100
fortune.load(&mut conn).map_err(Into::into)
@@ -113,3 +103,43 @@ impl Pool {
113103
.map(Fortunes::new)
114104
}
115105
}
106+
107+
mod schema {
108+
diesel::table! {
109+
world (id) {
110+
id -> Integer,
111+
randomnumber -> Integer,
112+
}
113+
}
114+
115+
diesel::table! {
116+
fortune (id) {
117+
id -> Integer,
118+
message -> Text,
119+
}
120+
}
121+
}
122+
123+
// diesel does not support high level bulk update api. use raw sql to bypass the limitation.
124+
// relate discussion: https://github.com/diesel-rs/diesel/discussions/2879
125+
fn update_query_from_ids(mut rngs: Vec<(i32, i32)>) -> String {
126+
rngs.sort_by(|(a, _), (b, _)| a.cmp(b));
127+
128+
const PREFIX: &str = "UPDATE world SET randomNumber=w.r FROM (VALUES ";
129+
const SUFFIX: &str = ") AS w (i,r) WHERE world.id=w.i";
130+
131+
let mut query = String::from(PREFIX);
132+
133+
use core::fmt::Write;
134+
rngs.iter().for_each(|(w_id, num)| {
135+
write!(query, "({}::int,{}::int),", w_id, num).unwrap();
136+
});
137+
138+
if query.ends_with(',') {
139+
query.pop();
140+
}
141+
142+
query.push_str(SUFFIX);
143+
144+
query
145+
}

frameworks/Rust/xitca-web/src/db_toasty.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,22 @@ pub struct Pool {
1212
rng: core::cell::RefCell<Rand>,
1313
}
1414

15-
pub async fn create() -> HandleResult<Pool> {
16-
let conn = xitca_postgres_toasty::PostgreSQL::connect(DB_URL).await?;
17-
18-
let db = Db::builder()
19-
.register::<World>()
20-
.register::<Fortune>()
21-
.build(conn)
22-
.await?;
23-
24-
Ok(Pool {
25-
db,
26-
rng: Default::default(),
27-
})
28-
}
29-
3015
impl Pool {
16+
pub async fn create() -> HandleResult<Self> {
17+
let conn = xitca_postgres_toasty::PostgreSQL::connect(DB_URL).await?;
18+
19+
let db = Db::builder()
20+
.register::<World>()
21+
.register::<Fortune>()
22+
.build(conn)
23+
.await?;
24+
25+
Ok(Self {
26+
db,
27+
rng: Default::default(),
28+
})
29+
}
30+
3131
pub async fn get_world(&self) -> HandleResult<World> {
3232
let id = self.rng.borrow_mut().gen_id();
3333
World::get_by_id(&self.db, id).await.map_err(Into::into)

frameworks/Rust/xitca-web/src/db_unrealistic.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ pub struct Client {
2626
pub async fn create() -> HandleResult<Client> {
2727
let (cli, mut drv) = xitca_postgres::Postgres::new(DB_URL).connect().await?;
2828

29-
tokio::task::spawn(tokio::task::unconstrained(async move {
29+
tokio::task::spawn(async move {
3030
while drv.try_next().await?.is_some() {}
3131
HandleResult::Ok(())
32-
}));
32+
});
3333

3434
let world = WORLD_STMT.execute(&cli).await?.leak();
3535
let fortune = FORTUNE_STMT.execute(&cli).await?.leak();
@@ -53,16 +53,16 @@ impl Client {
5353
}
5454

5555
pub async fn get_worlds(&self, num: u16) -> HandleResult<Vec<World>> {
56-
let len = num as usize;
57-
5856
let mut res = {
5957
let (ref mut rng, ref mut buf) = *self.shared.borrow_mut();
60-
let mut pipe = Pipeline::with_capacity_from_buf(len, buf);
61-
(0..num).try_for_each(|_| self.world.bind([rng.gen_id()]).query(&mut pipe))?;
58+
let mut pipe = Pipeline::with_capacity_from_buf(num as _, buf);
59+
rng.gen_multi()
60+
.take(num as _)
61+
.try_for_each(|id| self.world.bind([id]).query(&mut pipe))?;
6262
pipe.query(&self.cli)?
6363
};
6464

65-
let mut worlds = Vec::with_capacity(len);
65+
let mut worlds = Vec::with_capacity(num as _);
6666

6767
while let Some(mut item) = res.try_next().await? {
6868
while let Some(row) = item.try_next().await? {
@@ -78,23 +78,21 @@ impl Client {
7878

7979
let (mut res, worlds) = {
8080
let (ref mut rng, ref mut buf) = *self.shared.borrow_mut();
81-
// unrealistic as all queries are sent with only one sync point.
82-
let mut pipe = Pipeline::unsync_with_capacity_from_buf(len + 1, buf);
83-
84-
let (mut params, worlds) = core::iter::repeat_with(|| {
85-
let id = rng.gen_id();
86-
let rand = rng.gen_id();
87-
self.world.bind([id]).query(&mut pipe)?;
88-
HandleResult::Ok(((id, rand), World::new(id, rand)))
89-
})
90-
.take(len)
91-
.collect::<HandleResult<(Vec<_>, Vec<_>)>>()?;
92-
93-
params.sort();
94-
95-
params
96-
.into_iter()
97-
.try_for_each(|(id, rng)| self.update.bind([rng, id]).query(&mut pipe))?;
81+
let mut pipe = Pipeline::with_capacity_from_buf(len + 1, buf);
82+
83+
let mut ids = rng.gen_multi().take(num as _).collect::<Vec<_>>();
84+
ids.sort();
85+
86+
let (rngs, worlds) = ids
87+
.iter()
88+
.cloned()
89+
.zip(rng.gen_multi())
90+
.map(|(id, rand)| {
91+
self.world.bind([id]).query(&mut pipe)?;
92+
HandleResult::Ok((rand, World::new(id, rand)))
93+
})
94+
.collect::<HandleResult<(Vec<_>, Vec<_>)>>()?;
95+
self.update.bind([&ids, &rngs]).query(&mut pipe)?;
9896

9997
(pipe.query(&self.cli)?, worlds)
10098
};

frameworks/Rust/xitca-web/src/db_util.rs

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,3 @@
1-
#[cfg(feature = "diesel")]
2-
// diesel does not support high level bulk update api. use raw sql to bypass the limitation.
3-
// relate discussion: https://github.com/diesel-rs/diesel/discussions/2879
4-
pub fn update_query_from_ids(mut rngs: Vec<(i32, i32)>) -> String {
5-
rngs.sort_by(|(a, _), (b, _)| a.cmp(b));
6-
7-
const PREFIX: &str = "UPDATE world SET randomNumber=w.r FROM (VALUES ";
8-
const SUFFIX: &str = ") AS w (i,r) WHERE world.id=w.i";
9-
10-
let mut query = String::from(PREFIX);
11-
12-
use core::fmt::Write;
13-
rngs.iter().for_each(|(w_id, num)| {
14-
write!(query, "({}::int,{}::int),", w_id, num).unwrap();
15-
});
16-
17-
if query.ends_with(',') {
18-
query.pop();
19-
}
20-
21-
query.push_str(SUFFIX);
22-
23-
query
24-
}
25-
261
#[cfg(feature = "pg")]
272
pub use pg::*;
283

@@ -44,10 +19,6 @@ pub mod pg {
4419
pub const WORLD_STMT: StatementNamed =
4520
Statement::named("SELECT id,randomnumber FROM world WHERE id=$1", &[Type::INT4]);
4621
pub const UPDATE_STMT: StatementNamed = Statement::named(
47-
"UPDATE world SET randomnumber=$1 WHERE id=$2",
48-
&[Type::INT4, Type::INT4],
49-
);
50-
pub const UPDATE_BATCH_STMT: StatementNamed = Statement::named(
5122
"UPDATE world SET randomnumber=w.r FROM (SELECT unnest($1) as i,unnest($2) as r) w WHERE world.id=w.i",
5223
&[Type::INT4_ARRAY, Type::INT4_ARRAY],
5324
);

0 commit comments

Comments
 (0)