11use crate :: * ;
22
3- pub async fn get_db_connection ( ) -> DbPoolConnection {
4- if let Some ( db_pool) = DB . get ( ) {
5- return db_pool. clone ( ) ;
6- } ;
7- let db_pool: DbPoolConnection = connection_db ( ) . await ;
8- DB . set ( db_pool. clone ( ) )
9- . expect ( "Failed to initialize DB_POOL" ) ;
10- db_pool
3+ pub fn get_db_connection ( ) -> & ' static DbPoolConnection {
4+ & DB
115}
126
137#[ cfg( feature = "dev" ) ]
148pub async fn create_database ( ) {
15- let db_pool: DbPoolConnection = get_db_connection ( ) . await ;
9+ let db_pool: & DbPoolConnection = get_db_connection ( ) ;
1610 let _ = query ( & format ! ( "CREATE DATABASE {};" , DATABASE_NAME ) )
1711 . execute ( & db_pool)
1812 . await ;
1913}
2014
2115#[ cfg( feature = "dev" ) ]
2216pub async fn create_table ( ) {
23- let db_pool: DbPoolConnection = get_db_connection ( ) . await ;
17+ let db_pool: & DbPoolConnection = get_db_connection ( ) ;
2418 let _ = query ( & format ! (
2519 "CREATE TABLE IF NOT EXISTS {} (
2620 id SERIAL PRIMARY KEY, randomNumber INT NOT NULL
@@ -41,7 +35,7 @@ pub async fn create_table() {
4135
4236#[ cfg( feature = "dev" ) ]
4337pub async fn insert_records ( ) {
44- let db_pool: DbPoolConnection = get_db_connection ( ) . await ;
38+ let db_pool: & DbPoolConnection = get_db_connection ( ) ;
4539 let row: PgRow = query ( & format ! ( "SELECT COUNT(*) FROM {}" , TABLE_NAME_WORLD ) )
4640 . fetch_one ( & db_pool)
4741 . await
@@ -76,21 +70,21 @@ pub async fn insert_records() {
7670 let _ = query ( & sql) . execute ( & db_pool) . await ;
7771}
7872
79- pub async fn init_cache ( ) {
73+ pub async fn init_cache ( ) -> Vec < QueryRow > {
8074 let mut res: Vec < QueryRow > = Vec :: with_capacity ( RANDOM_MAX as usize ) ;
81- let db_pool: DbPoolConnection = get_db_connection ( ) . await ;
75+ let db_pool: & DbPoolConnection = get_db_connection ( ) ;
8276 let sql: String = format ! (
8377 "SELECT id, randomNumber FROM {} LIMIT {}" ,
8478 TABLE_NAME_WORLD , RANDOM_MAX
8579 ) ;
86- if let Ok ( rows) = query ( & sql) . fetch_all ( & db_pool) . await {
80+ if let Ok ( rows) = query ( & sql) . fetch_all ( db_pool) . await {
8781 for row in rows {
8882 let id: i32 = row. get ( KEY_ID ) ;
8983 let random_number: i32 = row. get ( KEY_RANDOM_NUMBER ) ;
9084 res. push ( QueryRow :: new ( id, random_number) ) ;
9185 }
9286 }
93- let _ = CACHE . set ( res) ;
87+ res
9488}
9589
9690pub async fn connection_db ( ) -> DbPoolConnection {
@@ -106,18 +100,10 @@ pub async fn connection_db() -> DbPoolConnection {
106100 DATABASE_NAME
107101 ) ,
108102 } ;
109- let pool_size: u32 = ( get_thread_count ( ) << 2 ) . max ( 10 ) . min ( 100 ) as u32 ;
110- let max_pool_size: u32 = option_env ! ( "POSTGRES_MAX_POOL_SIZE" )
111- . unwrap_or ( & pool_size. to_string ( ) )
112- . parse :: < u32 > ( )
113- . unwrap_or ( pool_size) ;
114- let min_pool_size: u32 = option_env ! ( "POSTGRES_MIN_POOL_SIZE" )
115- . unwrap_or ( & pool_size. to_string ( ) )
116- . parse :: < u32 > ( )
117- . unwrap_or ( pool_size) ;
103+ let pool_size: u32 = num_cpus:: get ( ) as u32 ;
118104 let pool: DbPoolConnection = PgPoolOptions :: new ( )
119- . max_connections ( max_pool_size )
120- . min_connections ( min_pool_size )
105+ . max_connections ( 100 )
106+ . min_connections ( pool_size )
121107 . max_lifetime ( None )
122108 . test_before_acquire ( false )
123109 . idle_timeout ( None )
@@ -130,10 +116,10 @@ pub async fn connection_db() -> DbPoolConnection {
130116pub async fn get_update_data (
131117 limit : Queries ,
132118) -> ( String , Vec < QueryRow > , Vec < Queries > , Vec < Queries > ) {
133- let db_pool: DbPoolConnection = get_db_connection ( ) . await ;
119+ let db_pool: & DbPoolConnection = get_db_connection ( ) ;
134120 let mut query_res_list: Vec < QueryRow > = Vec :: with_capacity ( limit as usize ) ;
135- let rows: Vec < QueryRow > = get_some_row_id ( limit, & db_pool) . await ;
136- let mut sql = format ! ( "UPDATE {} SET randomNumber = CASE id " , TABLE_NAME_WORLD ) ;
121+ let rows: Vec < QueryRow > = get_some_row_id ( limit, db_pool) . await ;
122+ let mut sql: String = format ! ( "UPDATE {} SET randomNumber = CASE id " , TABLE_NAME_WORLD ) ;
137123 let mut id_list: Vec < i32 > = Vec :: with_capacity ( rows. len ( ) ) ;
138124 let mut value_list: Vec < String > = Vec :: with_capacity ( rows. len ( ) * 2 ) ;
139125 let mut random_numbers: Vec < i32 > = Vec :: with_capacity ( rows. len ( ) ) ;
@@ -159,14 +145,13 @@ pub async fn get_update_data(
159145}
160146
161147pub async fn init_db ( ) {
162- get_db_connection ( ) . await ;
163148 #[ cfg( feature = "dev" ) ]
164149 {
165150 create_database ( ) . await ;
166151 create_table ( ) . await ;
167152 insert_records ( ) . await ;
168153 }
169- init_cache ( ) . await ;
154+ black_box ( init_cache ( ) . await ) ;
170155}
171156
172157pub async fn random_world_row ( db_pool : & DbPoolConnection ) -> QueryRow {
@@ -176,7 +161,7 @@ pub async fn random_world_row(db_pool: &DbPoolConnection) -> QueryRow {
176161
177162pub async fn query_world_row ( db_pool : & DbPoolConnection , id : Queries ) -> QueryRow {
178163 let sql: String = format ! (
179- "SELECT id, randomNumber FROM {} WHERE id = {} LIMIT 1 " ,
164+ "SELECT id, randomNumber FROM {} WHERE id = {}" ,
180165 TABLE_NAME_WORLD , id
181166 ) ;
182167 if let Ok ( rows) = query ( & sql) . fetch_one ( db_pool) . await {
@@ -187,7 +172,7 @@ pub async fn query_world_row(db_pool: &DbPoolConnection, id: Queries) -> QueryRo
187172}
188173
189174pub async fn update_world_rows ( limit : Queries ) -> Vec < QueryRow > {
190- let db_pool: DbPoolConnection = get_db_connection ( ) . await ;
175+ let db_pool: & DbPoolConnection = get_db_connection ( ) ;
191176 let ( sql, data, id_list, random_numbers) = get_update_data ( limit) . await ;
192177 let mut query_builder: query:: Query < ' _ , Postgres , postgres:: PgArguments > = query ( & sql) ;
193178 for ( id, random_number) in id_list. iter ( ) . zip ( random_numbers. iter ( ) ) {
@@ -196,23 +181,34 @@ pub async fn update_world_rows(limit: Queries) -> Vec<QueryRow> {
196181 for id in & id_list {
197182 query_builder = query_builder. bind ( id) ;
198183 }
199- let _ = query_builder. execute ( & db_pool) . await ;
184+ let _ = query_builder. execute ( db_pool) . await ;
200185 data
201186}
202187
203188pub async fn all_world_row ( ) -> Vec < PgRow > {
204- let db_pool: DbPoolConnection = get_db_connection ( ) . await ;
189+ let db_pool: & DbPoolConnection = get_db_connection ( ) ;
205190 let sql: String = format ! ( "SELECT id, message FROM {}" , TABLE_NAME_FORTUNE ) ;
206- let res: Vec < PgRow > = query ( & sql) . fetch_all ( & db_pool) . await . unwrap_or_default ( ) ;
191+ let res: Vec < PgRow > = query ( & sql) . fetch_all ( db_pool) . await . unwrap_or_default ( ) ;
207192 return res;
208193}
209194
210195pub async fn get_some_row_id ( limit : Queries , db_pool : & DbPoolConnection ) -> Vec < QueryRow > {
211- let futures: Vec < _ > = ( 0 ..limit)
212- . map ( |_| async {
213- let id: i32 = get_random_id ( ) ;
214- query_world_row ( db_pool, id) . await
215- } )
216- . collect ( ) ;
217- join_all ( futures) . await
196+ let semaphore: Arc < Semaphore > = Arc :: new ( Semaphore :: new ( 32 ) ) ;
197+ let mut tasks: Vec < JoinHandle < QueryRow > > = Vec :: with_capacity ( limit as usize ) ;
198+ for _ in 0 ..limit {
199+ let _ = semaphore. clone ( ) . acquire_owned ( ) . await . map ( |permit| {
200+ let db_pool: DbPoolConnection = db_pool. clone ( ) ;
201+ tasks. push ( spawn ( async move {
202+ let id: i32 = get_random_id ( ) ;
203+ let res: QueryRow = query_world_row ( & db_pool, id) . await ;
204+ drop ( permit) ;
205+ res
206+ } ) ) ;
207+ } ) ;
208+ }
209+ join_all ( tasks)
210+ . await
211+ . into_iter ( )
212+ . filter_map ( Result :: ok)
213+ . collect ( )
218214}
0 commit comments