From cfc35d266d09c23f1abf401a5e78eaf9571baf5a Mon Sep 17 00:00:00 2001 From: nazo Date: Sat, 28 Jun 2025 02:09:41 +0800 Subject: [PATCH 1/7] try use psqlpy --- frameworks/Python/blacksheep/app-socketify.py | 85 ++++++++----------- .../Python/blacksheep/requirements-pypy.txt | 2 +- 2 files changed, 37 insertions(+), 50 deletions(-) diff --git a/frameworks/Python/blacksheep/app-socketify.py b/frameworks/Python/blacksheep/app-socketify.py index a825b1596d0..9b751f367ac 100644 --- a/frameworks/Python/blacksheep/app-socketify.py +++ b/frameworks/Python/blacksheep/app-socketify.py @@ -1,22 +1,22 @@ import multiprocessing import os -import psycopg +# import psycopg import platform import random import asyncio import blacksheep as bs import jinja2 from pathlib import Path -from psycopg_pool import AsyncConnectionPool - -READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s' -WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s' +# from psycopg_pool import AsyncConnectionPool +import psqlpy +from psqlpy import ConnectionPoolBuilder +READ_ROW_SQL = 'SELECT "randomnumber" FROM "world" WHERE id = $1' +WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2' ADDITIONAL_ROW = [0, "Additional fortune added at request time."] CORE_COUNT = multiprocessing.cpu_count() MAX_DB_CONNECTIONS = 2000 -MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 32) -MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2) +MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 35) db_pool = None async def setup_db(app): @@ -25,15 +25,17 @@ async def setup_db(app): f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}" f"@tfb-database:5432/hello_world" ) - db_pool = AsyncConnectionPool( - conninfo=conninfo, - min_size=MIN_POOL_SIZE, - max_size=MAX_POOL_SIZE, - open=False, - timeout=5.0, - max_lifetime=1800, + db_pool = ( + ConnectionPoolBuilder() + .max_pool_size(MAX_POOL_SIZE) + .user(os.getenv('PGUSER', 'benchmarkdbuser')) + .password(os.getenv('PGPASS', 'benchmarkdbpass')) + .dbname("hello_world") + .host("tfb-database") + .port(5432) + .build() ) - await db_pool.open() + # await db_pool.open() async def shutdown_db(app): global db_pool @@ -70,31 +72,30 @@ async def json_test(request): @bs.get('/db') async def single_db_query_test(request): row_id = random.randint(1, 10000) - async with db_pool.connection() as db_conn: - async with db_conn.cursor() as cursor: - await cursor.execute(READ_ROW_SQL, (row_id,)) - number = await cursor.fetchone() - return bs.json({'id': row_id, 'randomNumber': number[1]}) + async with db_pool.acquire() as connection: + number = await connection.fetch_row( + READ_ROW_SQL, [row_id] + ) + return bs.json({'id': row_id, 'randomNumber': number}) @bs.get('/queries') async def multiple_db_queries_test(request): num_queries = get_num_queries(request) row_ids = random.sample(range(1, 10000), num_queries) worlds = [] - async with db_pool.connection() as db_conn: - async with db_conn.cursor() as cursor: - for row_id in row_ids: - await cursor.execute(READ_ROW_SQL, (row_id,)) - number = await cursor.fetchone() - worlds.append({"id": row_id, "randomNumber": number[1]}) + async with db_pool.acquire() as connection: + for row_id in row_ids: + number = await number = await connection.fetch_row( + READ_ROW_SQL, [row_id] + ) + worlds.append({"id": row_id, "randomNumber": number}) return bs.json(worlds) @bs.get('/fortunes') async def fortunes_test(request): - async with db_pool.connection() as db_conn: - async with db_conn.cursor() as cursor: - await cursor.execute("SELECT * FROM Fortune") - fortunes = await cursor.fetchall() + async with db_pool.acquire() as connection: + fortunes_fetch = await connection.fetch("SELECT * FROM Fortune") + fortunes = results.result() fortunes.append(ADDITIONAL_ROW) fortunes.sort(key=lambda row: row[1]) data = fortune_template.render(fortunes=fortunes) @@ -108,25 +109,11 @@ async def db_updates_test(request): random.sample(range(1, 10000), num_queries) ), key=lambda x: x[1]) worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates] - for _ in range(5): - async with db_pool.connection() as db_conn: - try: - await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED") - async with db_conn.cursor() as cursor: - for row_id, number in updates: - await cursor.execute(READ_ROW_SQL, (row_id,)) - await cursor.fetchone() - for _ in range(5): - try: - await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates]) - return bs.json(worlds) - except psycopg.errors.DeadlockDetected: - await db_conn.rollback() - continue - # await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates]) - except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted): - await db_conn.rollback() - continue + async with db_pool.acquire() as connection: + for row_id, _ in updates: + await connection.fetch_row(READ_ROW_SQL, [row_id]) + # await db_conn.executemany(WRITE_ROW_SQL, updates) + await connection.execute_many(WRITE_ROW_SQL, updates) raise Exception("connect error") @bs.get('/plaintext') diff --git a/frameworks/Python/blacksheep/requirements-pypy.txt b/frameworks/Python/blacksheep/requirements-pypy.txt index f2c3d4fe9b6..217009599a2 100644 --- a/frameworks/Python/blacksheep/requirements-pypy.txt +++ b/frameworks/Python/blacksheep/requirements-pypy.txt @@ -1,3 +1,3 @@ -psycopg[pool] +psqlpy git+https://github.com/cirospaciari/socketify.py.git@main#egg=socketify h11 \ No newline at end of file From 8b85c1feb5f1d8e6e990bfa8fce6d6fcdc97c3d8 Mon Sep 17 00:00:00 2001 From: nazo Date: Sat, 28 Jun 2025 02:18:35 +0800 Subject: [PATCH 2/7] fix --- frameworks/Python/blacksheep/app-socketify.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frameworks/Python/blacksheep/app-socketify.py b/frameworks/Python/blacksheep/app-socketify.py index 9b751f367ac..aacde92f4fb 100644 --- a/frameworks/Python/blacksheep/app-socketify.py +++ b/frameworks/Python/blacksheep/app-socketify.py @@ -85,7 +85,7 @@ async def multiple_db_queries_test(request): worlds = [] async with db_pool.acquire() as connection: for row_id in row_ids: - number = await number = await connection.fetch_row( + number = await connection.fetch_row( READ_ROW_SQL, [row_id] ) worlds.append({"id": row_id, "randomNumber": number}) From 9fc56a70cfd0efe603c678fd4aa8f8acdb0dc6e3 Mon Sep 17 00:00:00 2001 From: nazo Date: Sat, 28 Jun 2025 02:28:15 +0800 Subject: [PATCH 3/7] fix --- frameworks/Python/blacksheep/app-socketify.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/frameworks/Python/blacksheep/app-socketify.py b/frameworks/Python/blacksheep/app-socketify.py index aacde92f4fb..4b246e61c55 100644 --- a/frameworks/Python/blacksheep/app-socketify.py +++ b/frameworks/Python/blacksheep/app-socketify.py @@ -73,7 +73,7 @@ async def json_test(request): async def single_db_query_test(request): row_id = random.randint(1, 10000) async with db_pool.acquire() as connection: - number = await connection.fetch_row( + number = await connection.fetch_val( READ_ROW_SQL, [row_id] ) return bs.json({'id': row_id, 'randomNumber': number}) @@ -85,7 +85,7 @@ async def multiple_db_queries_test(request): worlds = [] async with db_pool.acquire() as connection: for row_id in row_ids: - number = await connection.fetch_row( + number = await connection.fetch_val( READ_ROW_SQL, [row_id] ) worlds.append({"id": row_id, "randomNumber": number}) @@ -111,7 +111,7 @@ async def db_updates_test(request): worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates] async with db_pool.acquire() as connection: for row_id, _ in updates: - await connection.fetch_row(READ_ROW_SQL, [row_id]) + await connection.fetch_val(READ_ROW_SQL, [row_id]) # await db_conn.executemany(WRITE_ROW_SQL, updates) await connection.execute_many(WRITE_ROW_SQL, updates) raise Exception("connect error") From 9129cab096c5ec936848ad1d40e269dbadc1e5f2 Mon Sep 17 00:00:00 2001 From: nazo Date: Sat, 28 Jun 2025 02:49:19 +0800 Subject: [PATCH 4/7] try fix --- frameworks/Python/blacksheep/app-socketify.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/frameworks/Python/blacksheep/app-socketify.py b/frameworks/Python/blacksheep/app-socketify.py index 4b246e61c55..60ac2a66cff 100644 --- a/frameworks/Python/blacksheep/app-socketify.py +++ b/frameworks/Python/blacksheep/app-socketify.py @@ -95,7 +95,8 @@ async def multiple_db_queries_test(request): async def fortunes_test(request): async with db_pool.acquire() as connection: fortunes_fetch = await connection.fetch("SELECT * FROM Fortune") - fortunes = results.result() + # fortunes = fortunes_fetch.result() + fortunes = [list(item.values()) for item in fortunes_fetch.result()] fortunes.append(ADDITIONAL_ROW) fortunes.sort(key=lambda row: row[1]) data = fortune_template.render(fortunes=fortunes) @@ -110,8 +111,9 @@ async def db_updates_test(request): ), key=lambda x: x[1]) worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates] async with db_pool.acquire() as connection: + statement = await connection.prepare(READ_ROW_SQL) for row_id, _ in updates: - await connection.fetch_val(READ_ROW_SQL, [row_id]) + await statement.fetch_val([row_id]) # await db_conn.executemany(WRITE_ROW_SQL, updates) await connection.execute_many(WRITE_ROW_SQL, updates) raise Exception("connect error") From 1530b5a9da19c78b400da6996792f974b6ec60a1 Mon Sep 17 00:00:00 2001 From: nazo Date: Sat, 28 Jun 2025 14:11:20 +0800 Subject: [PATCH 5/7] roll back --- frameworks/Python/blacksheep/app-socketify.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/frameworks/Python/blacksheep/app-socketify.py b/frameworks/Python/blacksheep/app-socketify.py index 60ac2a66cff..9e03573eb0f 100644 --- a/frameworks/Python/blacksheep/app-socketify.py +++ b/frameworks/Python/blacksheep/app-socketify.py @@ -111,9 +111,8 @@ async def db_updates_test(request): ), key=lambda x: x[1]) worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates] async with db_pool.acquire() as connection: - statement = await connection.prepare(READ_ROW_SQL) for row_id, _ in updates: - await statement.fetch_val([row_id]) + await connection.fetch_val(READ_ROW_SQL, [row_id]) # await db_conn.executemany(WRITE_ROW_SQL, updates) await connection.execute_many(WRITE_ROW_SQL, updates) raise Exception("connect error") From 697be2f0110772cb87a1f369a8b2ab6f12706cdc Mon Sep 17 00:00:00 2001 From: nazo Date: Sat, 28 Jun 2025 14:38:58 +0800 Subject: [PATCH 6/7] fix --- frameworks/Python/blacksheep/app-socketify.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frameworks/Python/blacksheep/app-socketify.py b/frameworks/Python/blacksheep/app-socketify.py index 9e03573eb0f..72f4b76740c 100644 --- a/frameworks/Python/blacksheep/app-socketify.py +++ b/frameworks/Python/blacksheep/app-socketify.py @@ -115,7 +115,7 @@ async def db_updates_test(request): await connection.fetch_val(READ_ROW_SQL, [row_id]) # await db_conn.executemany(WRITE_ROW_SQL, updates) await connection.execute_many(WRITE_ROW_SQL, updates) - raise Exception("connect error") + return jsonify(worlds) @bs.get('/plaintext') async def plaintext_test(request): From 3f4b333e9db4045f8ba17c7897b0efb7f7657c7c Mon Sep 17 00:00:00 2001 From: nazo Date: Sat, 28 Jun 2025 15:07:54 +0800 Subject: [PATCH 7/7] success fix --- frameworks/Python/blacksheep/app-socketify.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frameworks/Python/blacksheep/app-socketify.py b/frameworks/Python/blacksheep/app-socketify.py index 72f4b76740c..d75fe084db3 100644 --- a/frameworks/Python/blacksheep/app-socketify.py +++ b/frameworks/Python/blacksheep/app-socketify.py @@ -115,7 +115,7 @@ async def db_updates_test(request): await connection.fetch_val(READ_ROW_SQL, [row_id]) # await db_conn.executemany(WRITE_ROW_SQL, updates) await connection.execute_many(WRITE_ROW_SQL, updates) - return jsonify(worlds) + return bs.json(worlds) @bs.get('/plaintext') async def plaintext_test(request):