diff --git a/frameworks/Python/blacksheep/app-socketify.py b/frameworks/Python/blacksheep/app-socketify.py new file mode 100644 index 00000000000..47ed2663804 --- /dev/null +++ b/frameworks/Python/blacksheep/app-socketify.py @@ -0,0 +1,152 @@ +import multiprocessing +import os +import psycopg +import platform +import random +import asyncio +import blacksheep as bs +import jinja2 +from pathlib import Path +from psycopg_pool import AsyncConnectionPool + +READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s' +WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s' +ADDITIONAL_ROW = [0, "Additional fortune added at request time."] +CORE_COUNT = multiprocessing.cpu_count() + +MAX_POOL_SIZE = CORE_COUNT * 2 +MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2) +db_pool = None + +async def setup_db(app): + global db_pool + conninfo = ( + f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}" + f"@tfb-database:5432/hello_world" + ) + db_pool = AsyncConnectionPool( + conninfo=conninfo, + min_size=MIN_POOL_SIZE, + max_size=MAX_POOL_SIZE, + open=False, + timeout=5.0, + max_lifetime=1800, + ) + await db_pool.open() + +async def shutdown_db(app): + global db_pool + if db_pool is not None: + await db_pool.close() + db_pool = None + +def load_fortunes_template(): + with Path("templates/fortune.html").open("r") as f: + return jinja2.Template(f.read()) + +fortune_template = load_fortunes_template() + +app = bs.Application() +app.on_start += setup_db +app.on_stop += shutdown_db + +def get_num_queries(request): + try: + value = request.query.get('queries') + if value is None: + return 1 + query_count = int(value[0]) + except (KeyError, IndexError, ValueError): + return 1 + return min(max(query_count, 1), 500) + +JSON_CONTENT_TYPE = b"application/json" + +@bs.get('/json') +async def json_test(request): + return bs.json({'message': 'Hello, world!'}) + +@bs.get('/db') +async def single_db_query_test(request): + row_id = random.randint(1, 10000) + async with db_pool.connection() as db_conn: + async with db_conn.cursor() as cursor: + await cursor.execute(READ_ROW_SQL, (row_id,)) + number = await cursor.fetchone() + return bs.json({'id': row_id, 'randomNumber': number[1]}) + +@bs.get('/queries') +async def multiple_db_queries_test(request): + num_queries = get_num_queries(request) + row_ids = random.sample(range(1, 10000), num_queries) + worlds = [] + async with db_pool.connection() as db_conn: + async with db_conn.cursor() as cursor: + for row_id in row_ids: + await cursor.execute(READ_ROW_SQL, (row_id,)) + number = await cursor.fetchone() + worlds.append({"id": row_id, "randomNumber": number[1]}) + return bs.json(worlds) + +@bs.get('/fortunes') +async def fortunes_test(request): + async with db_pool.connection() as db_conn: + async with db_conn.cursor() as cursor: + await cursor.execute("SELECT * FROM Fortune") + fortunes = await cursor.fetchall() + fortunes.append(ADDITIONAL_ROW) + fortunes.sort(key=lambda row: row[1]) + data = fortune_template.render(fortunes=fortunes) + return bs.html(data) + +@bs.get('/updates') +async def db_updates_test(request): + num_queries = get_num_queries(request) + updates = sorted(zip( + random.sample(range(1, 10000), num_queries), + random.sample(range(1, 10000), num_queries) + ), key=lambda x: x[1]) + worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates] + for _ in range(5): + async with db_pool.connection() as db_conn: + try: + await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED") + async with db_conn.cursor() as cursor: + for row_id, number in updates: + await cursor.execute(READ_ROW_SQL, (row_id,)) + await cursor.fetchone() + for _ in range(5): + try: + await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates]) + return bs.json(worlds) + except psycopg.errors.DeadlockDetected: + await db_conn.rollback() + continue + # await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates]) + except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted): + await db_conn.rollback() + continue + raise Exception("connect error") + +@bs.get('/plaintext') +async def plaintext_test(request): + return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!')) + +if platform.python_implementation() == 'PyPy': + import logging + from socketify import ASGI + workers = int(multiprocessing.cpu_count()) + if os.environ.get('TRAVIS') == 'true': + workers = 2 + + def run_app(): + ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run() + + def create_fork(): + n = os.fork() + if not n > 0: + run_app() + + for i in range(1, workers): + create_fork() + run_app() \ No newline at end of file diff --git a/frameworks/Python/blacksheep/app.py b/frameworks/Python/blacksheep/app.py index 422a987df43..80dd48919e9 100644 --- a/frameworks/Python/blacksheep/app.py +++ b/frameworks/Python/blacksheep/app.py @@ -1,28 +1,29 @@ import multiprocessing import os import asyncpg +import platform import random import asyncio -from operator import itemgetter import blacksheep as bs import jinja2 import msgspec from pathlib import Path - -READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1' -WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2' -ADDITIONAL_ROW = [0, "Additional fortune added at request time."] -MAX_POOL_SIZE = 1000 // multiprocessing.cpu_count() -MIN_POOL_SIZE = max(int(MAX_POOL_SIZE / 2), 1) -db_pool = None -key = itemgetter(1) - try: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) except Exception: ... +READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1' +WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2' +ADDITIONAL_ROW = [0, "Additional fortune added at request time."] +MAX_CONNECTIONS = 1900 +CORE_COUNT = multiprocessing.cpu_count() +MAX_POOL_SIZE = max(1,int(os.getenv('MAX_POOL_SIZE', MAX_CONNECTIONS // CORE_COUNT))) +MIN_POOL_SIZE = max(1,int(os.getenv('MIN_POOL_SIZE', MAX_POOL_SIZE // 2))) + +db_pool = None + async def setup_db(app): global db_pool db_pool = await asyncpg.create_pool( @@ -35,6 +36,12 @@ async def setup_db(app): max_size=MAX_POOL_SIZE, ) +async def shutdown_db(app): + """Close asyncpg connection pool for the current process.""" + global db_pool + if db_pool is not None: + await db_pool.close() + db_pool = None def load_fortunes_template(): with Path("templates/fortune.html").open("r") as f: @@ -45,7 +52,7 @@ def load_fortunes_template(): app = bs.Application() app.on_start += setup_db - +app.on_stop += shutdown_db def get_num_queries(request): try: @@ -55,14 +62,9 @@ def get_num_queries(request): query_count = int(value[0]) except (KeyError, IndexError, ValueError): return 1 - if query_count < 1: - return 1 - if query_count > 500: - return 500 - return query_count + return min(max(query_count, 1), 500) ENCODER = msgspec.json.Encoder() -DECODER = msgspec.json.Decoder() JSON_CONTENT_TYPE = b"application/json" def jsonify( data, @@ -122,7 +124,7 @@ async def fortunes_test(request): fortunes = await db_conn.fetch("SELECT * FROM Fortune") fortunes.append(ADDITIONAL_ROW) - fortunes.sort(key = key) + fortunes.sort(key=lambda row: row[1]) data = fortune_template.render(fortunes=fortunes) return bs.html(data) @@ -130,18 +132,17 @@ async def fortunes_test(request): @bs.get('/updates') async def db_updates_test(request): num_queries = get_num_queries(request) - ids = sorted(random.sample(range(1, 10000 + 1), num_queries)) - numbers = sorted(random.sample(range(1, 10000), num_queries)) - updates = list(zip(ids, numbers)) - - # worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ] + updates = list(zip( + random.sample(range(1, 10000), num_queries), + sorted(random.sample(range(1, 10000), num_queries)) + )) worlds = [Result(id=row_id, randomNumber=number) for row_id, number in updates] + # worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ] async with db_pool.acquire() as db_conn: statement = await db_conn.prepare(READ_ROW_SQL) for row_id, _ in updates: await statement.fetchval(row_id) await db_conn.executemany(WRITE_ROW_SQL, updates) - return jsonify(worlds) @@ -150,3 +151,26 @@ async def plaintext_test(request): return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!')) #return bs.text('Hello, World!') + +if platform.python_implementation() == 'PyPy': + from socketify import ASGI + workers = int(multiprocessing.cpu_count()) + if _is_travis: + workers = 2 + + def run_app(): + ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run() + + + def create_fork(): + n = os.fork() + # n greater than 0 means parent process + if not n > 0: + run_app() + + + # fork limiting the cpu count - 1 + for i in range(1, workers): + create_fork() + + run_app() \ No newline at end of file diff --git a/frameworks/Python/blacksheep/benchmark_config.json b/frameworks/Python/blacksheep/benchmark_config.json index fb05ddddbe8..db211d08363 100644 --- a/frameworks/Python/blacksheep/benchmark_config.json +++ b/frameworks/Python/blacksheep/benchmark_config.json @@ -46,6 +46,29 @@ "display_name": "blacksheep-nginx-unit", "versus": "None", "notes": "" + }, + "pypy-socketify": { + "json_url": "/json", + "fortune_url": "/fortunes", + "plaintext_url": "/plaintext", + "db_url": "/db", + "query_url": "/queries?queries=", + "update_url": "/updates?queries=", + "port": 8080, + "approach": "Realistic", + "classification": "Micro", + "framework": "blacksheep", + "language": "Python", + "flavor": "Python3", + "platform": "ASGI", + "webserver": "Socketify.py", + "os": "Linux", + "orm": "Raw", + "database_os": "Linux", + "database": "Postgres", + "display_name": "Blacksheep [Socketify.py PyPy3]", + "versus": "None", + "notes": "" } }] } diff --git a/frameworks/Python/blacksheep/blacksheep-nginx-unit.dockerfile b/frameworks/Python/blacksheep/blacksheep-nginx-unit.dockerfile index cd09391f6ac..3014ba7cfdb 100644 --- a/frameworks/Python/blacksheep/blacksheep-nginx-unit.dockerfile +++ b/frameworks/Python/blacksheep/blacksheep-nginx-unit.dockerfile @@ -4,14 +4,20 @@ WORKDIR /blacksheep COPY ./ /blacksheep -RUN pip3 install -U pip -RUN pip3 install Cython==3.0.12 -RUN pip3 install -r /blacksheep/requirements.txt -RUN pip3 install -r /blacksheep/requirements-uvicorn.txt +RUN apt-get update; apt-get install libuv1 -y -RUN chmod +x start-unit.sh +RUN pip3 install -U pip -q +RUN pip3 install Cython==3.0.12 -q +RUN pip3 install -r /blacksheep/requirements.txt -q +RUN pip3 install -r /blacksheep/requirements-uvicorn.txt -q ENV PGSSLMODE=disable +RUN CORE_COUNT=$(nproc) && \ + sed -i "s|\"processes\": [0-9]*|\"processes\": $CORE_COUNT|g" /blacksheep/unit-config.json +RUN chmod +x start-unit.sh +ENTRYPOINT [] EXPOSE 8080 -CMD ["./start-unit.sh"] + +# CMD ["unitd", "--no-daemon", "--control", "unix:/var/run/control.unit.sock"] +CMD ["./start-unit.sh"] \ No newline at end of file diff --git a/frameworks/Python/blacksheep/blacksheep-pypy-socketify.dockerfile b/frameworks/Python/blacksheep/blacksheep-pypy-socketify.dockerfile new file mode 100644 index 00000000000..b730747562f --- /dev/null +++ b/frameworks/Python/blacksheep/blacksheep-pypy-socketify.dockerfile @@ -0,0 +1,15 @@ +FROM pypy:3.11-bookworm + +ADD ./ /blacksheep + +WORKDIR /blacksheep + +RUN apt-get update; apt-get install libuv1 libpq5 -y + +RUN pip3 install -r /blacksheep/requirements.txt +RUN pip3 install -r /blacksheep/requirements-pypy.txt + +EXPOSE 8080 + +CMD python ./app-socketify.py + diff --git a/frameworks/Python/blacksheep/blacksheep.dockerfile b/frameworks/Python/blacksheep/blacksheep.dockerfile index e87d8f14e5a..9948bd51766 100644 --- a/frameworks/Python/blacksheep/blacksheep.dockerfile +++ b/frameworks/Python/blacksheep/blacksheep.dockerfile @@ -4,12 +4,13 @@ WORKDIR /blacksheep COPY ./ /blacksheep -RUN pip3 install -U pip -RUN pip3 install Cython==3.0.12 -RUN pip3 install -r /blacksheep/requirements.txt -RUN pip3 install -r /blacksheep/requirements-gunicorn.txt -RUN pip3 install -r /blacksheep/requirements-uvicorn.txt +RUN apt-get update; apt-get install libuv1 -y +RUN pip3 install -U pip -q +RUN pip3 install Cython==3.0.12 -q +RUN pip3 install -r /blacksheep/requirements.txt -q +RUN pip3 install -r /blacksheep/requirements-uvicorn.txt -q +ENV GUNICORN=1 EXPOSE 8080 -CMD gunicorn app:app -k uvicorn.workers.UvicornWorker -c blacksheep_conf.py +CMD gunicorn app:app -k uvicorn_worker.UvicornWorker -c blacksheep_conf.py diff --git a/frameworks/Python/blacksheep/blacksheep_conf.py b/frameworks/Python/blacksheep/blacksheep_conf.py index 4f4e08a729e..7324c9efddb 100644 --- a/frameworks/Python/blacksheep/blacksheep_conf.py +++ b/frameworks/Python/blacksheep/blacksheep_conf.py @@ -2,13 +2,18 @@ import os _is_travis = os.environ.get('TRAVIS') == 'true' +CPU_CORES = multiprocessing.cpu_count() +MAX_CONNECTIONS = 1900 +CONNECTIONS_PER_WORKER = 100 +max_pg_workers = MAX_CONNECTIONS // CONNECTIONS_PER_WORKER -workers = multiprocessing.cpu_count() +workers = CPU_CORES if _is_travis: workers = 2 bind = "0.0.0.0:8080" -keepalive = 120 +keepalive = 1 +timeout = 0 errorlog = '-' pidfile = '/tmp/blacksheep.pid' loglevel = 'error' diff --git a/frameworks/Python/blacksheep/config.toml b/frameworks/Python/blacksheep/config.toml index c37e96092d5..ea30e9cad43 100644 --- a/frameworks/Python/blacksheep/config.toml +++ b/frameworks/Python/blacksheep/config.toml @@ -35,3 +35,21 @@ orm = "Raw" platform = "ASGI" webserver = "nginx-unit" versus = "None" + + +[pypy-socketify] +urls.plaintext = "/plaintext" +urls.json = "/json" +urls.db = "/db" +urls.query = "/queries?queries=" +urls.update = "/updates?queries=" +urls.fortune = "/fortunes" +approach = "Realistic" +classification = "Platform" +database = "Postgres" +database_os = "Linux" +os = "Linux" +orm = "Raw" +platform = "ASGI" +webserver = "Socketify.py" +versus = "None" diff --git a/frameworks/Python/blacksheep/requirements-gunicorn.txt b/frameworks/Python/blacksheep/requirements-gunicorn.txt deleted file mode 100644 index 4afe40e70ce..00000000000 --- a/frameworks/Python/blacksheep/requirements-gunicorn.txt +++ /dev/null @@ -1 +0,0 @@ -gunicorn==23.0.0 diff --git a/frameworks/Python/blacksheep/requirements-hypercorn.txt b/frameworks/Python/blacksheep/requirements-hypercorn.txt deleted file mode 100644 index 3e0a21f2fe2..00000000000 --- a/frameworks/Python/blacksheep/requirements-hypercorn.txt +++ /dev/null @@ -1 +0,0 @@ -hypercorn==0.17.3 diff --git a/frameworks/Python/blacksheep/requirements-pypy.txt b/frameworks/Python/blacksheep/requirements-pypy.txt new file mode 100644 index 00000000000..f2c3d4fe9b6 --- /dev/null +++ b/frameworks/Python/blacksheep/requirements-pypy.txt @@ -0,0 +1,3 @@ +psycopg[pool] +git+https://github.com/cirospaciari/socketify.py.git@main#egg=socketify +h11 \ No newline at end of file diff --git a/frameworks/Python/blacksheep/requirements-uvicorn.txt b/frameworks/Python/blacksheep/requirements-uvicorn.txt index 7a5c5d8140d..4784617228a 100644 --- a/frameworks/Python/blacksheep/requirements-uvicorn.txt +++ b/frameworks/Python/blacksheep/requirements-uvicorn.txt @@ -1,3 +1,7 @@ uvloop==0.21.0 -uvicorn==0.34.1 +uvicorn==0.34.2 +uvicorn-worker httptools==0.6.4 +gunicorn==23.0.0 +msgspec==0.19.0 +asyncpg==0.30.0 \ No newline at end of file diff --git a/frameworks/Python/blacksheep/requirements.txt b/frameworks/Python/blacksheep/requirements.txt index 9c83c9fad47..12af6e9f9b2 100644 --- a/frameworks/Python/blacksheep/requirements.txt +++ b/frameworks/Python/blacksheep/requirements.txt @@ -1,4 +1,3 @@ -asyncpg==0.30.0 Jinja2==3.1.6 -blacksheep==2.1.0 -msgspec==0.19.0 +blacksheep==2.3.1 +psycopg \ No newline at end of file diff --git a/frameworks/Python/blacksheep/start-unit.sh b/frameworks/Python/blacksheep/start-unit.sh old mode 100755 new mode 100644 index 9d2b923e166..016ee8c1020 --- a/frameworks/Python/blacksheep/start-unit.sh +++ b/frameworks/Python/blacksheep/start-unit.sh @@ -1,16 +1,18 @@ #!/usr/bin/env bash +CORE_COUNT=$(nproc) +sysctl -w net.core.somaxconn=65535 +sysctl -w net.ipv4.tcp_max_syn_backlog=65535 +ulimit -n 65535 +taskset -c 0-$(($CORE_COUNT-1)) unitd --no-daemon --control unix:/var/run/control.unit.sock & -NPROC=$(nproc) -sed "s/{{NPROC}}/$NPROC/" unit-config.template.json > nginx-unit-config.json - -unitd --no-daemon --control unix:/var/run/control.unit.sock & +# unitd --no-daemon --control unix:/var/run/control.unit.sock & # wait UNIT started sleep 1 # PUT configure curl -X PUT \ - --data-binary @nginx-unit-config.json \ + --data-binary @unit-config.json \ --unix-socket /var/run/control.unit.sock \ http://localhost/config diff --git a/frameworks/Python/blacksheep/unit-config.template.json b/frameworks/Python/blacksheep/unit-config.json similarity index 84% rename from frameworks/Python/blacksheep/unit-config.template.json rename to frameworks/Python/blacksheep/unit-config.json index ee02557db34..186d581296f 100644 --- a/frameworks/Python/blacksheep/unit-config.template.json +++ b/frameworks/Python/blacksheep/unit-config.json @@ -9,10 +9,10 @@ "type": "python", "path": "/blacksheep", "working_directory": "/blacksheep", + "processes": 14, "protocol": "asgi", "module": "app", - "callable": "app", - "processes": {{NPROC}} + "callable": "app" } }, "access_log": "/dev/null"