1- // clippy is dumb and have no idea what should be lazy or not
2- #! [ allow ( clippy :: unnecessary_lazy_evaluations ) ]
1+ # [ path = "./db_util.rs" ]
2+ mod db_util ;
33
4- use xitca_io:: bytes:: BytesMut ;
5- use xitca_postgres:: { pipeline:: Pipeline , pool:: Pool , AsyncLendingIterator , Type } ;
4+ use std:: cell:: RefCell ;
5+
6+ use xitca_postgres:: {
7+ iter:: AsyncLendingIterator , pipeline:: Pipeline , pool:: Pool , statement:: Statement , Execute , ExecuteMut ,
8+ } ;
69
710use super :: {
811 ser:: { Fortune , Fortunes , World } ,
9- util:: { bulk_update_gen , HandleResult , Rand , DB_URL } ,
12+ util:: { HandleResult , DB_URL } ,
1013} ;
1114
15+ use db_util:: { sort_update_params, update_query, Shared , FORTUNE_STMT , WORLD_STMT } ;
16+
1217pub struct Client {
1318 pool : Pool ,
14- #[ cfg( not( feature = "pg-sync" ) ) ]
15- shared : std:: cell:: RefCell < Shared > ,
16- #[ cfg( feature = "pg-sync" ) ]
17- shared : std:: sync:: Mutex < Shared > ,
19+ shared : RefCell < Shared > ,
1820 updates : Box < [ Box < str > ] > ,
1921}
2022
21- type Shared = ( Rand , BytesMut ) ;
22-
23- const FORTUNE_SQL : & str = "SELECT * FROM fortune" ;
24-
25- const FORTUNE_SQL_TYPES : & [ Type ] = & [ ] ;
26-
27- const WORLD_SQL : & str = "SELECT * FROM world WHERE id=$1" ;
28-
29- const WORLD_SQL_TYPES : & [ Type ] = & [ Type :: INT4 ] ;
30-
31- fn update_query ( num : usize ) -> Box < str > {
32- bulk_update_gen ( |query| {
33- use std:: fmt:: Write ;
34- ( 1 ..=num) . fold ( ( 1 , query) , |( idx, query) , _| {
35- write ! ( query, "(${}::int,${}::int)," , idx, idx + 1 ) . unwrap ( ) ;
36- ( idx + 2 , query)
37- } ) ;
38- } )
39- . into_boxed_str ( )
40- }
41-
4223pub async fn create ( ) -> HandleResult < Client > {
43- let pool = Pool :: builder ( DB_URL ) . capacity ( 1 ) . build ( ) ?;
44-
45- let shared = ( Rand :: default ( ) , BytesMut :: new ( ) ) ;
46-
47- let updates = core:: iter:: once ( Box :: from ( "" ) )
48- . chain ( ( 1 ..=500 ) . map ( update_query) )
49- . collect ( ) ;
50-
5124 Ok ( Client {
52- pool,
53- #[ cfg( not( feature = "pg-sync" ) ) ]
54- shared : std:: cell:: RefCell :: new ( shared) ,
55- #[ cfg( feature = "pg-sync" ) ]
56- shared : std:: sync:: Mutex :: new ( shared) ,
57- updates,
25+ pool : Pool :: builder ( DB_URL ) . capacity ( 1 ) . build ( ) ?,
26+ shared : Default :: default ( ) ,
27+ updates : core:: iter:: once ( Box :: from ( "" ) )
28+ . chain ( ( 1 ..=500 ) . map ( update_query) )
29+ . collect ( ) ,
5830 } )
5931}
6032
6133impl Client {
62- #[ cfg( not( feature = "pg-sync" ) ) ]
63- fn shared ( & self ) -> std:: cell:: RefMut < ' _ , Shared > {
64- self . shared . borrow_mut ( )
65- }
66-
67- #[ cfg( feature = "pg-sync" ) ]
68- fn shared ( & self ) -> std:: sync:: MutexGuard < ' _ , Shared > {
69- self . shared . lock ( ) . unwrap ( )
70- }
71-
7234 pub async fn get_world ( & self ) -> HandleResult < World > {
7335 let mut conn = self . pool . get ( ) . await ?;
74- let stmt = conn . prepare ( WORLD_SQL , WORLD_SQL_TYPES ) . await ?;
75- let id = self . shared ( ) . 0 . gen_id ( ) ;
76- let mut res = conn . consume ( ) . query_raw ( & stmt , [ id ] ) ?;
77- let row = res. try_next ( ) . await ?. ok_or_else ( || " World does not exist") ?;
78- Ok ( World :: new ( row. get_raw ( 0 ) , row. get_raw ( 1 ) ) )
36+ let stmt = WORLD_STMT . execute_mut ( & mut conn ) . await ?;
37+ let id = self . shared . borrow_mut ( ) . 0 . gen_id ( ) ;
38+ let mut res = stmt . bind ( [ id ] ) . query ( & conn . consume ( ) ) . await ?;
39+ let row = res. try_next ( ) . await ?. ok_or ( "request World does not exist") ?;
40+ Ok ( World :: new ( row. get ( 0 ) , row. get ( 1 ) ) )
7941 }
8042
8143 pub async fn get_worlds ( & self , num : u16 ) -> HandleResult < Vec < World > > {
8244 let len = num as usize ;
8345
8446 let mut conn = self . pool . get ( ) . await ?;
85- let stmt = conn . prepare ( WORLD_SQL , WORLD_SQL_TYPES ) . await ?;
47+ let stmt = WORLD_STMT . execute_mut ( & mut conn ) . await ?;
8648
8749 let mut res = {
88- let ( ref mut rng, ref mut buf) = * self . shared ( ) ;
50+ let ( ref mut rng, ref mut buf) = * self . shared . borrow_mut ( ) ;
8951 let mut pipe = Pipeline :: with_capacity_from_buf ( len, buf) ;
90- ( 0 ..num) . try_for_each ( |_| pipe . query_raw ( & stmt , [ rng. gen_id ( ) ] ) ) ?;
91- conn. consume ( ) . pipeline ( pipe ) ?
52+ ( 0 ..num) . try_for_each ( |_| stmt . bind ( [ rng. gen_id ( ) ] ) . query_mut ( & mut pipe ) ) ?;
53+ pipe . query ( & conn. consume ( ) ) ?
9254 } ;
9355
9456 let mut worlds = Vec :: with_capacity ( len) ;
9557
9658 while let Some ( mut item) = res. try_next ( ) . await ? {
9759 while let Some ( row) = item. try_next ( ) . await ? {
98- worlds. push ( World :: new ( row. get_raw ( 0 ) , row. get_raw ( 1 ) ) )
60+ worlds. push ( World :: new ( row. get ( 0 ) , row. get ( 1 ) ) )
9961 }
10062 }
10163
@@ -105,25 +67,24 @@ impl Client {
10567 pub async fn update ( & self , num : u16 ) -> HandleResult < Vec < World > > {
10668 let len = num as usize ;
10769
108- let update = self . updates . get ( len) . ok_or_else ( || "num out of bound" ) ?;
109-
70+ let update = self . updates . get ( len) . ok_or ( "request num is out of range" ) ?;
11071 let mut conn = self . pool . get ( ) . await ?;
111- let world_stmt = conn . prepare ( WORLD_SQL , WORLD_SQL_TYPES ) . await ?;
112- let update_stmt = conn . prepare ( update, & [ ] ) . await ?;
72+ let world_stmt = WORLD_STMT . execute_mut ( & mut conn ) . await ?;
73+ let update_stmt = Statement :: named ( update, & [ ] ) . execute_mut ( & mut conn ) . await ?;
11374
11475 let mut params = Vec :: with_capacity ( len) ;
11576
11677 let mut res = {
117- let ( ref mut rng, ref mut buf) = * self . shared ( ) ;
78+ let ( ref mut rng, ref mut buf) = * self . shared . borrow_mut ( ) ;
11879 let mut pipe = Pipeline :: with_capacity_from_buf ( len + 1 , buf) ;
11980 ( 0 ..num) . try_for_each ( |_| {
12081 let w_id = rng. gen_id ( ) ;
12182 let r_id = rng. gen_id ( ) ;
12283 params. push ( [ w_id, r_id] ) ;
123- pipe . query_raw ( & world_stmt , [ w_id] )
84+ world_stmt . bind ( [ w_id] ) . query_mut ( & mut pipe )
12485 } ) ?;
125- pipe . query_raw ( & update_stmt , sort_update_params ( & params) ) ?;
126- conn. consume ( ) . pipeline ( pipe ) ?
86+ update_stmt . bind ( sort_update_params ( & params) ) . query_mut ( & mut pipe ) ?;
87+ pipe . query ( & conn. consume ( ) ) ?
12788 } ;
12889
12990 let mut worlds = Vec :: with_capacity ( len) ;
@@ -133,7 +94,7 @@ impl Client {
13394 while let Some ( mut item) = res. try_next ( ) . await ? {
13495 while let Some ( row) = item. try_next ( ) . await ? {
13596 let r_id = r_ids. next ( ) . unwrap ( ) [ 1 ] ;
136- worlds. push ( World :: new ( row. get_raw ( 0 ) , r_id) )
97+ worlds. push ( World :: new ( row. get ( 0 ) , r_id) )
13798 }
13899 }
139100
@@ -145,45 +106,15 @@ impl Client {
145106 items. push ( Fortune :: new ( 0 , "Additional fortune added at request time." ) ) ;
146107
147108 let mut conn = self . pool . get ( ) . await ?;
148- let stmt = conn . prepare ( FORTUNE_SQL , FORTUNE_SQL_TYPES ) . await ?;
149- let mut res = conn. consume ( ) . query_raw :: < [ i32 ; 0 ] > ( & stmt , [ ] ) ?;
109+ let stmt = FORTUNE_STMT . execute_mut ( & mut conn ) . await ?;
110+ let mut res = stmt . query ( & conn. consume ( ) ) . await ?;
150111
151112 while let Some ( row) = res. try_next ( ) . await ? {
152- items. push ( Fortune :: new ( row. get_raw ( 0 ) , row. get_raw :: < String > ( 1 ) ) ) ;
113+ items. push ( Fortune :: new ( row. get ( 0 ) , row. get :: < String > ( 1 ) ) ) ;
153114 }
154115
155116 items. sort_by ( |it, next| it. message . cmp ( & next. message ) ) ;
156117
157118 Ok ( Fortunes :: new ( items) )
158119 }
159120}
160-
161- fn sort_update_params ( params : & [ [ i32 ; 2 ] ] ) -> impl ExactSizeIterator < Item = i32 > {
162- let mut params = params. to_owned ( ) ;
163- params. sort_by ( |a, b| a[ 0 ] . cmp ( & b[ 0 ] ) ) ;
164-
165- struct ParamIter < I > ( I ) ;
166-
167- impl < I > Iterator for ParamIter < I >
168- where
169- I : Iterator ,
170- {
171- type Item = I :: Item ;
172-
173- #[ inline]
174- fn next ( & mut self ) -> Option < Self :: Item > {
175- self . 0 . next ( )
176- }
177-
178- #[ inline]
179- fn size_hint ( & self ) -> ( usize , Option < usize > ) {
180- self . 0 . size_hint ( )
181- }
182- }
183-
184- // impl depends on compiler optimization to flat Vec<[T]> to Vec<T> when inferring
185- // it's size hint. possible to cause runtime panic.
186- impl < I > ExactSizeIterator for ParamIter < I > where I : Iterator { }
187-
188- ParamIter ( params. into_iter ( ) . flatten ( ) )
189- }
0 commit comments