Skip to content

Commit d9e0bee

Browse files
authored
[Python/Blacksheep] use psqlpy (#9975)
* try use psqlpy * fix * fix * try fix * roll back * fix * success fix
1 parent 1ec0fb2 commit d9e0bee

File tree

2 files changed

+39
-51
lines changed

2 files changed

+39
-51
lines changed

frameworks/Python/blacksheep/app-socketify.py

Lines changed: 38 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
import multiprocessing
22
import os
3-
import psycopg
3+
# import psycopg
44
import platform
55
import random
66
import asyncio
77
import blacksheep as bs
88
import jinja2
99
from pathlib import Path
10-
from psycopg_pool import AsyncConnectionPool
11-
12-
READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
13-
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
10+
# from psycopg_pool import AsyncConnectionPool
11+
import psqlpy
12+
from psqlpy import ConnectionPoolBuilder
13+
READ_ROW_SQL = 'SELECT "randomnumber" FROM "world" WHERE id = $1'
14+
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
1415
ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
1516
CORE_COUNT = multiprocessing.cpu_count()
1617
MAX_DB_CONNECTIONS = 2000
1718

18-
MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 32)
19-
MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2)
19+
MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 35)
2020
db_pool = None
2121

2222
async def setup_db(app):
@@ -25,15 +25,17 @@ async def setup_db(app):
2525
f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}"
2626
f"@tfb-database:5432/hello_world"
2727
)
28-
db_pool = AsyncConnectionPool(
29-
conninfo=conninfo,
30-
min_size=MIN_POOL_SIZE,
31-
max_size=MAX_POOL_SIZE,
32-
open=False,
33-
timeout=5.0,
34-
max_lifetime=1800,
28+
db_pool = (
29+
ConnectionPoolBuilder()
30+
.max_pool_size(MAX_POOL_SIZE)
31+
.user(os.getenv('PGUSER', 'benchmarkdbuser'))
32+
.password(os.getenv('PGPASS', 'benchmarkdbpass'))
33+
.dbname("hello_world")
34+
.host("tfb-database")
35+
.port(5432)
36+
.build()
3537
)
36-
await db_pool.open()
38+
# await db_pool.open()
3739

3840
async def shutdown_db(app):
3941
global db_pool
@@ -70,31 +72,31 @@ async def json_test(request):
7072
@bs.get('/db')
7173
async def single_db_query_test(request):
7274
row_id = random.randint(1, 10000)
73-
async with db_pool.connection() as db_conn:
74-
async with db_conn.cursor() as cursor:
75-
await cursor.execute(READ_ROW_SQL, (row_id,))
76-
number = await cursor.fetchone()
77-
return bs.json({'id': row_id, 'randomNumber': number[1]})
75+
async with db_pool.acquire() as connection:
76+
number = await connection.fetch_val(
77+
READ_ROW_SQL, [row_id]
78+
)
79+
return bs.json({'id': row_id, 'randomNumber': number})
7880

7981
@bs.get('/queries')
8082
async def multiple_db_queries_test(request):
8183
num_queries = get_num_queries(request)
8284
row_ids = random.sample(range(1, 10000), num_queries)
8385
worlds = []
84-
async with db_pool.connection() as db_conn:
85-
async with db_conn.cursor() as cursor:
86-
for row_id in row_ids:
87-
await cursor.execute(READ_ROW_SQL, (row_id,))
88-
number = await cursor.fetchone()
89-
worlds.append({"id": row_id, "randomNumber": number[1]})
86+
async with db_pool.acquire() as connection:
87+
for row_id in row_ids:
88+
number = await connection.fetch_val(
89+
READ_ROW_SQL, [row_id]
90+
)
91+
worlds.append({"id": row_id, "randomNumber": number})
9092
return bs.json(worlds)
9193

9294
@bs.get('/fortunes')
9395
async def fortunes_test(request):
94-
async with db_pool.connection() as db_conn:
95-
async with db_conn.cursor() as cursor:
96-
await cursor.execute("SELECT * FROM Fortune")
97-
fortunes = await cursor.fetchall()
96+
async with db_pool.acquire() as connection:
97+
fortunes_fetch = await connection.fetch("SELECT * FROM Fortune")
98+
# fortunes = fortunes_fetch.result()
99+
fortunes = [list(item.values()) for item in fortunes_fetch.result()]
98100
fortunes.append(ADDITIONAL_ROW)
99101
fortunes.sort(key=lambda row: row[1])
100102
data = fortune_template.render(fortunes=fortunes)
@@ -108,26 +110,12 @@ async def db_updates_test(request):
108110
random.sample(range(1, 10000), num_queries)
109111
), key=lambda x: x[1])
110112
worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates]
111-
for _ in range(5):
112-
async with db_pool.connection() as db_conn:
113-
try:
114-
await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
115-
async with db_conn.cursor() as cursor:
116-
for row_id, number in updates:
117-
await cursor.execute(READ_ROW_SQL, (row_id,))
118-
await cursor.fetchone()
119-
for _ in range(5):
120-
try:
121-
await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
122-
return bs.json(worlds)
123-
except psycopg.errors.DeadlockDetected:
124-
await db_conn.rollback()
125-
continue
126-
# await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
127-
except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted):
128-
await db_conn.rollback()
129-
continue
130-
raise Exception("connect error")
113+
async with db_pool.acquire() as connection:
114+
for row_id, _ in updates:
115+
await connection.fetch_val(READ_ROW_SQL, [row_id])
116+
# await db_conn.executemany(WRITE_ROW_SQL, updates)
117+
await connection.execute_many(WRITE_ROW_SQL, updates)
118+
return bs.json(worlds)
131119

132120
@bs.get('/plaintext')
133121
async def plaintext_test(request):
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
psycopg[pool]
1+
psqlpy
22
git+https://github.com/cirospaciari/socketify.py.git@main#egg=socketify
33
h11

0 commit comments

Comments
 (0)