Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 38 additions & 50 deletions frameworks/Python/blacksheep/app-socketify.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import multiprocessing
import os
import psycopg
# import psycopg
import platform
import random
import asyncio
import blacksheep as bs
import jinja2
from pathlib import Path
from psycopg_pool import AsyncConnectionPool

READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
# from psycopg_pool import AsyncConnectionPool
import psqlpy
from psqlpy import ConnectionPoolBuilder
READ_ROW_SQL = 'SELECT "randomnumber" FROM "world" WHERE id = $1'
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
CORE_COUNT = multiprocessing.cpu_count()
MAX_DB_CONNECTIONS = 2000

MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 32)
MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2)
MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 35)
db_pool = None

async def setup_db(app):
Expand All @@ -25,15 +25,17 @@ async def setup_db(app):
f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}"
f"@tfb-database:5432/hello_world"
)
db_pool = AsyncConnectionPool(
conninfo=conninfo,
min_size=MIN_POOL_SIZE,
max_size=MAX_POOL_SIZE,
open=False,
timeout=5.0,
max_lifetime=1800,
db_pool = (
ConnectionPoolBuilder()
.max_pool_size(MAX_POOL_SIZE)
.user(os.getenv('PGUSER', 'benchmarkdbuser'))
.password(os.getenv('PGPASS', 'benchmarkdbpass'))
.dbname("hello_world")
.host("tfb-database")
.port(5432)
.build()
)
await db_pool.open()
# await db_pool.open()

async def shutdown_db(app):
global db_pool
Expand Down Expand Up @@ -70,31 +72,31 @@ async def json_test(request):
@bs.get('/db')
async def single_db_query_test(request):
row_id = random.randint(1, 10000)
async with db_pool.connection() as db_conn:
async with db_conn.cursor() as cursor:
await cursor.execute(READ_ROW_SQL, (row_id,))
number = await cursor.fetchone()
return bs.json({'id': row_id, 'randomNumber': number[1]})
async with db_pool.acquire() as connection:
number = await connection.fetch_val(
READ_ROW_SQL, [row_id]
)
return bs.json({'id': row_id, 'randomNumber': number})

@bs.get('/queries')
async def multiple_db_queries_test(request):
num_queries = get_num_queries(request)
row_ids = random.sample(range(1, 10000), num_queries)
worlds = []
async with db_pool.connection() as db_conn:
async with db_conn.cursor() as cursor:
for row_id in row_ids:
await cursor.execute(READ_ROW_SQL, (row_id,))
number = await cursor.fetchone()
worlds.append({"id": row_id, "randomNumber": number[1]})
async with db_pool.acquire() as connection:
for row_id in row_ids:
number = await connection.fetch_val(
READ_ROW_SQL, [row_id]
)
worlds.append({"id": row_id, "randomNumber": number})
return bs.json(worlds)

@bs.get('/fortunes')
async def fortunes_test(request):
async with db_pool.connection() as db_conn:
async with db_conn.cursor() as cursor:
await cursor.execute("SELECT * FROM Fortune")
fortunes = await cursor.fetchall()
async with db_pool.acquire() as connection:
fortunes_fetch = await connection.fetch("SELECT * FROM Fortune")
# fortunes = fortunes_fetch.result()
fortunes = [list(item.values()) for item in fortunes_fetch.result()]
fortunes.append(ADDITIONAL_ROW)
fortunes.sort(key=lambda row: row[1])
data = fortune_template.render(fortunes=fortunes)
Expand All @@ -108,26 +110,12 @@ async def db_updates_test(request):
random.sample(range(1, 10000), num_queries)
), key=lambda x: x[1])
worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates]
for _ in range(5):
async with db_pool.connection() as db_conn:
try:
await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
async with db_conn.cursor() as cursor:
for row_id, number in updates:
await cursor.execute(READ_ROW_SQL, (row_id,))
await cursor.fetchone()
for _ in range(5):
try:
await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
return bs.json(worlds)
except psycopg.errors.DeadlockDetected:
await db_conn.rollback()
continue
# await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted):
await db_conn.rollback()
continue
raise Exception("connect error")
async with db_pool.acquire() as connection:
for row_id, _ in updates:
await connection.fetch_val(READ_ROW_SQL, [row_id])
# await db_conn.executemany(WRITE_ROW_SQL, updates)
await connection.execute_many(WRITE_ROW_SQL, updates)
return bs.json(worlds)

@bs.get('/plaintext')
async def plaintext_test(request):
Expand Down
2 changes: 1 addition & 1 deletion frameworks/Python/blacksheep/requirements-pypy.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
psycopg[pool]
psqlpy
git+https://github.com/cirospaciari/socketify.py.git@main#egg=socketify
h11
Loading