11import multiprocessing
22import os
3- import asyncpg
3+ import psycopg
44import platform
55import random
66import asyncio
77import blacksheep as bs
88import jinja2
9- # import json
109from pathlib import Path
11- try :
12- import uvloop
13- asyncio .set_event_loop_policy (uvloop .EventLoopPolicy ())
14- except Exception :
15- ...
16-
17- READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
18- WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
10+ from psycopg_pool import AsyncConnectionPool
11+
12+ READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
13+ WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
1914ADDITIONAL_ROW = [0 , "Additional fortune added at request time." ]
2015MAX_CONNECTIONS = 1900
2116CORE_COUNT = multiprocessing .cpu_count ()
2217PROCESSES = CORE_COUNT
23- MAX_POOL_SIZE = max (1 ,int (os .getenv ('MAX_POOL_SIZE' , MAX_CONNECTIONS // PROCESSES )))
24- MIN_POOL_SIZE = max (1 ,int (os .getenv ('MIN_POOL_SIZE' , MAX_POOL_SIZE // 2 )))
18+ MAX_POOL_SIZE = max (1 , int (os .getenv ('MAX_POOL_SIZE' , MAX_CONNECTIONS // PROCESSES )))
19+ MIN_POOL_SIZE = max (1 , int (os .getenv ('MIN_POOL_SIZE' , MAX_POOL_SIZE // 2 )))
2520
2621WORKER_PROCESSES = CORE_COUNT
2722MAX_POOL_SIZE = max (1 , MAX_CONNECTIONS // WORKER_PROCESSES )
3025
3126async def setup_db (app ):
3227 global db_pool
33- db_pool = await asyncpg . create_pool (
34- user = os .getenv ('PGUSER' , " benchmarkdbuser" ),
35- password = os . getenv ( 'PGPASS' , "benchmarkdbpass" ),
36- database = 'hello_world' ,
37- host = "tfb-database" ,
38- port = 5432 ,
28+ conninfo = (
29+ f"postgresql:// { os .getenv ('PGUSER' , ' benchmarkdbuser' ) } : { os . getenv ( 'PGPASS' , 'benchmarkdbpass' ) } "
30+ f"@tfb-database:5432/hello_world"
31+ )
32+ db_pool = AsyncConnectionPool (
33+ conninfo = conninfo ,
3934 min_size = MIN_POOL_SIZE ,
4035 max_size = MAX_POOL_SIZE ,
36+ open = False ,
4137 )
38+ await db_pool .open ()
4239
4340async def shutdown_db (app ):
44- """Close asyncpg connection pool for the current process."""
4541 global db_pool
4642 if db_pool is not None :
4743 await db_pool .close ()
@@ -51,7 +47,6 @@ def load_fortunes_template():
5147 with Path ("templates/fortune.html" ).open ("r" ) as f :
5248 return jinja2 .Template (f .read ())
5349
54-
5550fortune_template = load_fortunes_template ()
5651
5752app = bs .Application ()
@@ -70,76 +65,62 @@ def get_num_queries(request):
7065
7166JSON_CONTENT_TYPE = b"application/json"
7267
73-
74- # ------------------------------------------------------------------------------------------
75-
7668@bs .get ('/json' )
7769async def json_test (request ):
78- return bs .json ( {'message' : 'Hello, world!' } )
70+ return bs .json ({'message' : 'Hello, world!' })
7971
8072@bs .get ('/db' )
8173async def single_db_query_test (request ):
8274 row_id = random .randint (1 , 10000 )
83-
84- async with db_pool .acquire () as db_conn :
85- number = await db_conn .fetchval (READ_ROW_SQL , row_id )
86-
87- # return jsonify(Result(id=row_id, randomNumber=number))
88- # return ({'id': row_id, 'randomNumber': number})
89- return bs .json ({'id' : row_id , 'randomNumber' : number })
90-
75+ async with db_pool .connection () as db_conn :
76+ async with db_conn .cursor () as cursor :
77+ await cursor .execute (READ_ROW_SQL , (row_id ,))
78+ number = await cursor .fetchone ()
79+ return bs .json ({'id' : row_id , 'randomNumber' : number [1 ]})
9180
9281@bs .get ('/queries' )
9382async def multiple_db_queries_test (request ):
9483 num_queries = get_num_queries (request )
9584 row_ids = random .sample (range (1 , 10000 ), num_queries )
9685 worlds = []
97-
98- async with db_pool .acquire () as db_conn :
99- statement = await db_conn .prepare (READ_ROW_SQL )
100- for row_id in row_ids :
101- number = await statement .fetchval (row_id )
102- worlds .append ( {"id" : row_id , "randomNumber" : number } )
103- # worlds.append(Result(id=row_id, randomNumber=number))
104-
86+ async with db_pool .connection () as db_conn :
87+ async with db_conn .cursor () as cursor :
88+ for row_id in row_ids :
89+ await cursor .execute (READ_ROW_SQL , (row_id ,))
90+ number = await cursor .fetchone ()
91+ worlds .append ({"id" : row_id , "randomNumber" : number [1 ]})
10592 return bs .json (worlds )
106- # return jsonify(worlds)
107-
10893
10994@bs .get ('/fortunes' )
11095async def fortunes_test (request ):
111- async with db_pool .acquire () as db_conn :
112- fortunes = await db_conn .fetch ("SELECT * FROM Fortune" )
113-
96+ async with db_pool .connection () as db_conn :
97+ async with db_conn .cursor () as cursor :
98+ await cursor .execute ("SELECT * FROM Fortune" )
99+ fortunes = await cursor .fetchall ()
114100 fortunes .append (ADDITIONAL_ROW )
115101 fortunes .sort (key = lambda row : row [1 ])
116102 data = fortune_template .render (fortunes = fortunes )
117103 return bs .html (data )
118104
119-
120105@bs .get ('/updates' )
121106async def db_updates_test (request ):
122107 num_queries = get_num_queries (request )
123108 updates = list (zip (
124109 random .sample (range (1 , 10000 ), num_queries ),
125110 sorted (random .sample (range (1 , 10000 ), num_queries ))
126111 ))
127- # worlds = [Result(id= row_id, randomNumber= number) for row_id, number in updates]
128- worlds = [ { "id" : row_id , "randomNumber" : number } for row_id , number in updates ]
129- async with db_pool . acquire () as db_conn :
130- statement = await db_conn . prepare ( READ_ROW_SQL )
131- for row_id , _ in updates :
132- await statement . fetchval ( row_id )
133- await db_conn .executemany (WRITE_ROW_SQL , updates )
112+ worlds = [{ "id" : row_id , " randomNumber" : number } for row_id , number in updates ]
113+ async with db_pool . connection () as db_conn :
114+ async with db_conn . cursor () as cursor :
115+ for row_id , number in updates :
116+ await cursor . execute ( READ_ROW_SQL , ( row_id ,))
117+ await cursor . fetchone ( )
118+ await cursor .executemany (WRITE_ROW_SQL , [( number , row_id ) for row_id , number in updates ] )
134119 return bs .json (worlds )
135- # return jsonify(worlds)
136-
137120
138121@bs .get ('/plaintext' )
139122async def plaintext_test (request ):
140123 return bs .Response (200 , content = bs .Content (b"text/plain" , b'Hello, World!' ))
141- #return bs.text('Hello, World!')
142-
143124
144125if platform .python_implementation () == 'PyPy' :
145126 from socketify import ASGI
@@ -150,16 +131,11 @@ async def plaintext_test(request):
150131 def run_app ():
151132 ASGI (app ).listen (8080 , lambda config : logging .info (f"Listening on port http://localhost:{ config .port } now\n " )).run ()
152133
153-
154134 def create_fork ():
155135 n = os .fork ()
156- # n greater than 0 means parent process
157136 if not n > 0 :
158137 run_app ()
159138
160-
161- # fork limiting the cpu count - 1
162139 for i in range (1 , workers ):
163140 create_fork ()
164-
165141 run_app ()
0 commit comments