diff --git a/frameworks/Python/blacksheep/app-socketify.py b/frameworks/Python/blacksheep/app-socketify.py index a825b1596d0..d75fe084db3 100644 --- a/frameworks/Python/blacksheep/app-socketify.py +++ b/frameworks/Python/blacksheep/app-socketify.py @@ -1,22 +1,22 @@ import multiprocessing import os -import psycopg +# import psycopg import platform import random import asyncio import blacksheep as bs import jinja2 from pathlib import Path -from psycopg_pool import AsyncConnectionPool - -READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s' -WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s' +# from psycopg_pool import AsyncConnectionPool +import psqlpy +from psqlpy import ConnectionPoolBuilder +READ_ROW_SQL = 'SELECT "randomnumber" FROM "world" WHERE id = $1' +WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2' ADDITIONAL_ROW = [0, "Additional fortune added at request time."] CORE_COUNT = multiprocessing.cpu_count() MAX_DB_CONNECTIONS = 2000 -MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 32) -MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2) +MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 35) db_pool = None async def setup_db(app): @@ -25,15 +25,17 @@ async def setup_db(app): f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}" f"@tfb-database:5432/hello_world" ) - db_pool = AsyncConnectionPool( - conninfo=conninfo, - min_size=MIN_POOL_SIZE, - max_size=MAX_POOL_SIZE, - open=False, - timeout=5.0, - max_lifetime=1800, + db_pool = ( + ConnectionPoolBuilder() + .max_pool_size(MAX_POOL_SIZE) + .user(os.getenv('PGUSER', 'benchmarkdbuser')) + .password(os.getenv('PGPASS', 'benchmarkdbpass')) + .dbname("hello_world") + .host("tfb-database") + .port(5432) + .build() ) - await db_pool.open() + # await db_pool.open() async def shutdown_db(app): global db_pool @@ -70,31 +72,31 @@ async def json_test(request): @bs.get('/db') async def single_db_query_test(request): row_id = random.randint(1, 10000) - async with db_pool.connection() as db_conn: - async with db_conn.cursor() as cursor: - await cursor.execute(READ_ROW_SQL, (row_id,)) - number = await cursor.fetchone() - return bs.json({'id': row_id, 'randomNumber': number[1]}) + async with db_pool.acquire() as connection: + number = await connection.fetch_val( + READ_ROW_SQL, [row_id] + ) + return bs.json({'id': row_id, 'randomNumber': number}) @bs.get('/queries') async def multiple_db_queries_test(request): num_queries = get_num_queries(request) row_ids = random.sample(range(1, 10000), num_queries) worlds = [] - async with db_pool.connection() as db_conn: - async with db_conn.cursor() as cursor: - for row_id in row_ids: - await cursor.execute(READ_ROW_SQL, (row_id,)) - number = await cursor.fetchone() - worlds.append({"id": row_id, "randomNumber": number[1]}) + async with db_pool.acquire() as connection: + for row_id in row_ids: + number = await connection.fetch_val( + READ_ROW_SQL, [row_id] + ) + worlds.append({"id": row_id, "randomNumber": number}) return bs.json(worlds) @bs.get('/fortunes') async def fortunes_test(request): - async with db_pool.connection() as db_conn: - async with db_conn.cursor() as cursor: - await cursor.execute("SELECT * FROM Fortune") - fortunes = await cursor.fetchall() + async with db_pool.acquire() as connection: + fortunes_fetch = await connection.fetch("SELECT * FROM Fortune") + # fortunes = fortunes_fetch.result() + fortunes = [list(item.values()) for item in fortunes_fetch.result()] fortunes.append(ADDITIONAL_ROW) fortunes.sort(key=lambda row: row[1]) data = fortune_template.render(fortunes=fortunes) @@ -108,26 +110,12 @@ async def db_updates_test(request): random.sample(range(1, 10000), num_queries) ), key=lambda x: x[1]) worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates] - for _ in range(5): - async with db_pool.connection() as db_conn: - try: - await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED") - async with db_conn.cursor() as cursor: - for row_id, number in updates: - await cursor.execute(READ_ROW_SQL, (row_id,)) - await cursor.fetchone() - for _ in range(5): - try: - await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates]) - return bs.json(worlds) - except psycopg.errors.DeadlockDetected: - await db_conn.rollback() - continue - # await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates]) - except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted): - await db_conn.rollback() - continue - raise Exception("connect error") + async with db_pool.acquire() as connection: + for row_id, _ in updates: + await connection.fetch_val(READ_ROW_SQL, [row_id]) + # await db_conn.executemany(WRITE_ROW_SQL, updates) + await connection.execute_many(WRITE_ROW_SQL, updates) + return bs.json(worlds) @bs.get('/plaintext') async def plaintext_test(request): diff --git a/frameworks/Python/blacksheep/requirements-pypy.txt b/frameworks/Python/blacksheep/requirements-pypy.txt index f2c3d4fe9b6..217009599a2 100644 --- a/frameworks/Python/blacksheep/requirements-pypy.txt +++ b/frameworks/Python/blacksheep/requirements-pypy.txt @@ -1,3 +1,3 @@ -psycopg[pool] +psqlpy git+https://github.com/cirospaciari/socketify.py.git@main#egg=socketify h11 \ No newline at end of file