33import asyncpg
44import random
55import asyncio
6- from operator import itemgetter
76import blacksheep as bs
87import jinja2
98import msgspec
109from pathlib import Path
10+ try :
11+ import uvloop
12+ asyncio .set_event_loop_policy (uvloop .EventLoopPolicy ())
13+ except Exception :
14+ ...
1115
1216READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
1317WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
1418ADDITIONAL_ROW = [0 , "Additional fortune added at request time." ]
15- MAX_POOL_SIZE = 1000 // multiprocessing .cpu_count ()
16- MIN_POOL_SIZE = max (int (MAX_POOL_SIZE / 2 ), 1 )
19+ MAX_CONNECTIONS = 1900
20+ CORE_COUNT = multiprocessing .cpu_count ()
21+ WORKER_PROCESSES = CORE_COUNT
22+ MAX_POOL_SIZE = max (1 , MAX_CONNECTIONS // WORKER_PROCESSES )
23+ MIN_POOL_SIZE = max (1 , MAX_POOL_SIZE // 2 )
1724db_pool = None
1825key = itemgetter (1 )
1926
20- try :
21- import uvloop
22- asyncio .set_event_loop_policy (uvloop .EventLoopPolicy ())
23- except Exception :
24- ...
25-
2627async def setup_db (app ):
2728 global db_pool
2829 db_pool = await asyncpg .create_pool (
@@ -33,8 +34,17 @@ async def setup_db(app):
3334 port = 5432 ,
3435 min_size = MIN_POOL_SIZE ,
3536 max_size = MAX_POOL_SIZE ,
37+ command_timeout = 5 ,
38+ max_inactive_connection_lifetime = 60 ,
39+ server_settings = {'jit' : 'off' },
3640 )
3741
42+ async def shutdown_db (app ):
43+ """Close asyncpg connection pool for the current process."""
44+ global db_pool
45+ if db_pool is not None :
46+ await db_pool .close ()
47+ db_pool = None
3848
3949def load_fortunes_template ():
4050 with Path ("templates/fortune.html" ).open ("r" ) as f :
@@ -45,7 +55,7 @@ def load_fortunes_template():
4555
4656app = bs .Application ()
4757app .on_start += setup_db
48-
58+ app . on_stop += shutdown_db
4959
5060def get_num_queries (request ):
5161 try :
@@ -55,14 +65,9 @@ def get_num_queries(request):
5565 query_count = int (value [0 ])
5666 except (KeyError , IndexError , ValueError ):
5767 return 1
58- if query_count < 1 :
59- return 1
60- if query_count > 500 :
61- return 500
62- return query_count
68+ return min (max (query_count , 1 ), 500 )
6369
6470ENCODER = msgspec .json .Encoder ()
65- DECODER = msgspec .json .Decoder ()
6671JSON_CONTENT_TYPE = b"application/json"
6772def jsonify (
6873 data ,
@@ -122,26 +127,22 @@ async def fortunes_test(request):
122127 fortunes = await db_conn .fetch ("SELECT * FROM Fortune" )
123128
124129 fortunes .append (ADDITIONAL_ROW )
125- fortunes .sort (key = key )
130+ fortunes .sort (key = lambda row : row [ 1 ] )
126131 data = fortune_template .render (fortunes = fortunes )
127132 return bs .html (data )
128133
129134
130135@bs .get ('/updates' )
131136async def db_updates_test (request ):
132137 num_queries = get_num_queries (request )
133- ids = sorted (random .sample (range (1 , 10000 + 1 ), num_queries ))
134- numbers = sorted (random .sample (range (1 , 10000 ), num_queries ))
135- updates = list (zip (ids , numbers ))
136-
137- # worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
138+ updates = [(row_id , random .randint (1 , 10000 )) for row_id in random .sample (range (1 , 10000 ), num_queries )]
138139 worlds = [Result (id = row_id , randomNumber = number ) for row_id , number in updates ]
140+ # worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
139141 async with db_pool .acquire () as db_conn :
140142 statement = await db_conn .prepare (READ_ROW_SQL )
141143 for row_id , _ in updates :
142144 await statement .fetchval (row_id )
143145 await db_conn .executemany (WRITE_ROW_SQL , updates )
144-
145146 return jsonify (worlds )
146147
147148
0 commit comments