11import multiprocessing
22import os
3- import psycopg
3+ # import psycopg
44import platform
55import random
66import asyncio
77import blacksheep as bs
88import jinja2
99from pathlib import Path
10- from psycopg_pool import AsyncConnectionPool
11-
12- READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
13- WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
10+ # from psycopg_pool import AsyncConnectionPool
11+ import psqlpy
12+ from psqlpy import ConnectionPoolBuilder
13+ READ_ROW_SQL = 'SELECT "randomnumber" FROM "world" WHERE id = $1'
14+ WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
1415ADDITIONAL_ROW = [0 , "Additional fortune added at request time." ]
1516CORE_COUNT = multiprocessing .cpu_count ()
1617MAX_DB_CONNECTIONS = 2000
1718
18- MAX_POOL_SIZE = min (CORE_COUNT * 2 , MAX_DB_CONNECTIONS // CORE_COUNT , 32 )
19- MIN_POOL_SIZE = max (1 , MAX_POOL_SIZE // 2 )
19+ MAX_POOL_SIZE = min (CORE_COUNT * 2 , MAX_DB_CONNECTIONS // CORE_COUNT , 35 )
2020db_pool = None
2121
2222async def setup_db (app ):
@@ -25,15 +25,17 @@ async def setup_db(app):
2525 f"postgresql://{ os .getenv ('PGUSER' , 'benchmarkdbuser' )} :{ os .getenv ('PGPASS' , 'benchmarkdbpass' )} "
2626 f"@tfb-database:5432/hello_world"
2727 )
28- db_pool = AsyncConnectionPool (
29- conninfo = conninfo ,
30- min_size = MIN_POOL_SIZE ,
31- max_size = MAX_POOL_SIZE ,
32- open = False ,
33- timeout = 5.0 ,
34- max_lifetime = 1800 ,
28+ db_pool = (
29+ ConnectionPoolBuilder ()
30+ .max_pool_size (MAX_POOL_SIZE )
31+ .user (os .getenv ('PGUSER' , 'benchmarkdbuser' ))
32+ .password (os .getenv ('PGPASS' , 'benchmarkdbpass' ))
33+ .dbname ("hello_world" )
34+ .host ("tfb-database" )
35+ .port (5432 )
36+ .build ()
3537 )
36- await db_pool .open ()
38+ # await db_pool.open()
3739
3840async def shutdown_db (app ):
3941 global db_pool
@@ -70,31 +72,30 @@ async def json_test(request):
7072@bs .get ('/db' )
7173async def single_db_query_test (request ):
7274 row_id = random .randint (1 , 10000 )
73- async with db_pool .connection () as db_conn :
74- async with db_conn . cursor () as cursor :
75- await cursor . execute ( READ_ROW_SQL , ( row_id ,))
76- number = await cursor . fetchone ( )
77- return bs .json ({'id' : row_id , 'randomNumber' : number [ 1 ] })
75+ async with db_pool .acquire () as connection :
76+ number = await connection . fetch_row (
77+ READ_ROW_SQL , [ row_id ]
78+ )
79+ return bs .json ({'id' : row_id , 'randomNumber' : number })
7880
7981@bs .get ('/queries' )
8082async def multiple_db_queries_test (request ):
8183 num_queries = get_num_queries (request )
8284 row_ids = random .sample (range (1 , 10000 ), num_queries )
8385 worlds = []
84- async with db_pool .connection () as db_conn :
85- async with db_conn . cursor () as cursor :
86- for row_id in row_ids :
87- await cursor . execute ( READ_ROW_SQL , ( row_id ,))
88- number = await cursor . fetchone ( )
89- worlds .append ({"id" : row_id , "randomNumber" : number [ 1 ] })
86+ async with db_pool .acquire () as connection :
87+ for row_id in row_ids :
88+ number = await number = await connection . fetch_row (
89+ READ_ROW_SQL , [ row_id ]
90+ )
91+ worlds .append ({"id" : row_id , "randomNumber" : number })
9092 return bs .json (worlds )
9193
9294@bs .get ('/fortunes' )
9395async def fortunes_test (request ):
94- async with db_pool .connection () as db_conn :
95- async with db_conn .cursor () as cursor :
96- await cursor .execute ("SELECT * FROM Fortune" )
97- fortunes = await cursor .fetchall ()
96+ async with db_pool .acquire () as connection :
97+ fortunes_fetch = await connection .fetch ("SELECT * FROM Fortune" )
98+ fortunes = results .result ()
9899 fortunes .append (ADDITIONAL_ROW )
99100 fortunes .sort (key = lambda row : row [1 ])
100101 data = fortune_template .render (fortunes = fortunes )
@@ -108,25 +109,11 @@ async def db_updates_test(request):
108109 random .sample (range (1 , 10000 ), num_queries )
109110 ), key = lambda x : x [1 ])
110111 worlds = [{"id" : row_id , "randomNumber" : number } for row_id , number in updates ]
111- for _ in range (5 ):
112- async with db_pool .connection () as db_conn :
113- try :
114- await db_conn .execute ("SET TRANSACTION ISOLATION LEVEL READ COMMITTED" )
115- async with db_conn .cursor () as cursor :
116- for row_id , number in updates :
117- await cursor .execute (READ_ROW_SQL , (row_id ,))
118- await cursor .fetchone ()
119- for _ in range (5 ):
120- try :
121- await cursor .executemany (WRITE_ROW_SQL , [(number , row_id ) for row_id , number in updates ])
122- return bs .json (worlds )
123- except psycopg .errors .DeadlockDetected :
124- await db_conn .rollback ()
125- continue
126- # await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
127- except (psycopg .errors .OperationalError , psycopg .errors .PipelineAborted ):
128- await db_conn .rollback ()
129- continue
112+ async with db_pool .acquire () as connection :
113+ for row_id , _ in updates :
114+ await connection .fetch_row (READ_ROW_SQL , [row_id ])
115+ # await db_conn.executemany(WRITE_ROW_SQL, updates)
116+ await connection .execute_many (WRITE_ROW_SQL , updates )
130117 raise Exception ("connect error" )
131118
132119@bs .get ('/plaintext' )
0 commit comments