Skip to content

Commit ca99ef1

Browse files
authored
[Python/Blacksheep] Adjust the unit startup mode (#9886)
* Adjust the unit startup mode * fix * try fix * try fix * try fix * try fix * try fix * rollback * adjust * fix * try change * try change * fix * Tuning * fix * try fix * try fix * add pypy * try fix pypy * fix * fix: app.py -> app-socketify.py * fix: _is_travis * fix error * try fix * try fix * try fix * try fix * try optimization * try fix * rollback * update socketify and fix query
1 parent 3f4037b commit ca99ef1

15 files changed

+301
-51
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import multiprocessing
2+
import os
3+
import psycopg
4+
import platform
5+
import random
6+
import asyncio
7+
import blacksheep as bs
8+
import jinja2
9+
from pathlib import Path
10+
from psycopg_pool import AsyncConnectionPool
11+
12+
READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
13+
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
14+
ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
15+
CORE_COUNT = multiprocessing.cpu_count()
16+
17+
MAX_POOL_SIZE = CORE_COUNT * 2
18+
MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2)
19+
db_pool = None
20+
21+
async def setup_db(app):
22+
global db_pool
23+
conninfo = (
24+
f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}"
25+
f"@tfb-database:5432/hello_world"
26+
)
27+
db_pool = AsyncConnectionPool(
28+
conninfo=conninfo,
29+
min_size=MIN_POOL_SIZE,
30+
max_size=MAX_POOL_SIZE,
31+
open=False,
32+
timeout=5.0,
33+
max_lifetime=1800,
34+
)
35+
await db_pool.open()
36+
37+
async def shutdown_db(app):
38+
global db_pool
39+
if db_pool is not None:
40+
await db_pool.close()
41+
db_pool = None
42+
43+
def load_fortunes_template():
44+
with Path("templates/fortune.html").open("r") as f:
45+
return jinja2.Template(f.read())
46+
47+
fortune_template = load_fortunes_template()
48+
49+
app = bs.Application()
50+
app.on_start += setup_db
51+
app.on_stop += shutdown_db
52+
53+
def get_num_queries(request):
54+
try:
55+
value = request.query.get('queries')
56+
if value is None:
57+
return 1
58+
query_count = int(value[0])
59+
except (KeyError, IndexError, ValueError):
60+
return 1
61+
return min(max(query_count, 1), 500)
62+
63+
JSON_CONTENT_TYPE = b"application/json"
64+
65+
@bs.get('/json')
66+
async def json_test(request):
67+
return bs.json({'message': 'Hello, world!'})
68+
69+
@bs.get('/db')
70+
async def single_db_query_test(request):
71+
row_id = random.randint(1, 10000)
72+
async with db_pool.connection() as db_conn:
73+
async with db_conn.cursor() as cursor:
74+
await cursor.execute(READ_ROW_SQL, (row_id,))
75+
number = await cursor.fetchone()
76+
return bs.json({'id': row_id, 'randomNumber': number[1]})
77+
78+
@bs.get('/queries')
79+
async def multiple_db_queries_test(request):
80+
num_queries = get_num_queries(request)
81+
row_ids = random.sample(range(1, 10000), num_queries)
82+
worlds = []
83+
async with db_pool.connection() as db_conn:
84+
async with db_conn.cursor() as cursor:
85+
for row_id in row_ids:
86+
await cursor.execute(READ_ROW_SQL, (row_id,))
87+
number = await cursor.fetchone()
88+
worlds.append({"id": row_id, "randomNumber": number[1]})
89+
return bs.json(worlds)
90+
91+
@bs.get('/fortunes')
92+
async def fortunes_test(request):
93+
async with db_pool.connection() as db_conn:
94+
async with db_conn.cursor() as cursor:
95+
await cursor.execute("SELECT * FROM Fortune")
96+
fortunes = await cursor.fetchall()
97+
fortunes.append(ADDITIONAL_ROW)
98+
fortunes.sort(key=lambda row: row[1])
99+
data = fortune_template.render(fortunes=fortunes)
100+
return bs.html(data)
101+
102+
@bs.get('/updates')
103+
async def db_updates_test(request):
104+
num_queries = get_num_queries(request)
105+
updates = sorted(zip(
106+
random.sample(range(1, 10000), num_queries),
107+
random.sample(range(1, 10000), num_queries)
108+
), key=lambda x: x[1])
109+
worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates]
110+
for _ in range(5):
111+
async with db_pool.connection() as db_conn:
112+
try:
113+
await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
114+
async with db_conn.cursor() as cursor:
115+
for row_id, number in updates:
116+
await cursor.execute(READ_ROW_SQL, (row_id,))
117+
await cursor.fetchone()
118+
for _ in range(5):
119+
try:
120+
await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
121+
return bs.json(worlds)
122+
except psycopg.errors.DeadlockDetected:
123+
await db_conn.rollback()
124+
continue
125+
# await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
126+
except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted):
127+
await db_conn.rollback()
128+
continue
129+
raise Exception("connect error")
130+
131+
@bs.get('/plaintext')
132+
async def plaintext_test(request):
133+
return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
134+
135+
if platform.python_implementation() == 'PyPy':
136+
import logging
137+
from socketify import ASGI
138+
workers = int(multiprocessing.cpu_count())
139+
if os.environ.get('TRAVIS') == 'true':
140+
workers = 2
141+
142+
def run_app():
143+
ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run()
144+
145+
def create_fork():
146+
n = os.fork()
147+
if not n > 0:
148+
run_app()
149+
150+
for i in range(1, workers):
151+
create_fork()
152+
run_app()

frameworks/Python/blacksheep/app.py

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,29 @@
11
import multiprocessing
22
import os
33
import asyncpg
4+
import platform
45
import random
56
import asyncio
6-
from operator import itemgetter
77
import blacksheep as bs
88
import jinja2
99
import msgspec
1010
from pathlib import Path
11-
12-
READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
13-
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
14-
ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
15-
MAX_POOL_SIZE = 1000 // multiprocessing.cpu_count()
16-
MIN_POOL_SIZE = max(int(MAX_POOL_SIZE / 2), 1)
17-
db_pool = None
18-
key = itemgetter(1)
19-
2011
try:
2112
import uvloop
2213
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
2314
except Exception:
2415
...
2516

17+
READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
18+
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
19+
ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
20+
MAX_CONNECTIONS = 1900
21+
CORE_COUNT = multiprocessing.cpu_count()
22+
MAX_POOL_SIZE = max(1,int(os.getenv('MAX_POOL_SIZE', MAX_CONNECTIONS // CORE_COUNT)))
23+
MIN_POOL_SIZE = max(1,int(os.getenv('MIN_POOL_SIZE', MAX_POOL_SIZE // 2)))
24+
25+
db_pool = None
26+
2627
async def setup_db(app):
2728
global db_pool
2829
db_pool = await asyncpg.create_pool(
@@ -35,6 +36,12 @@ async def setup_db(app):
3536
max_size=MAX_POOL_SIZE,
3637
)
3738

39+
async def shutdown_db(app):
40+
"""Close asyncpg connection pool for the current process."""
41+
global db_pool
42+
if db_pool is not None:
43+
await db_pool.close()
44+
db_pool = None
3845

3946
def load_fortunes_template():
4047
with Path("templates/fortune.html").open("r") as f:
@@ -45,7 +52,7 @@ def load_fortunes_template():
4552

4653
app = bs.Application()
4754
app.on_start += setup_db
48-
55+
app.on_stop += shutdown_db
4956

5057
def get_num_queries(request):
5158
try:
@@ -55,14 +62,9 @@ def get_num_queries(request):
5562
query_count = int(value[0])
5663
except (KeyError, IndexError, ValueError):
5764
return 1
58-
if query_count < 1:
59-
return 1
60-
if query_count > 500:
61-
return 500
62-
return query_count
65+
return min(max(query_count, 1), 500)
6366

6467
ENCODER = msgspec.json.Encoder()
65-
DECODER = msgspec.json.Decoder()
6668
JSON_CONTENT_TYPE = b"application/json"
6769
def jsonify(
6870
data,
@@ -122,26 +124,25 @@ async def fortunes_test(request):
122124
fortunes = await db_conn.fetch("SELECT * FROM Fortune")
123125

124126
fortunes.append(ADDITIONAL_ROW)
125-
fortunes.sort(key = key)
127+
fortunes.sort(key=lambda row: row[1])
126128
data = fortune_template.render(fortunes=fortunes)
127129
return bs.html(data)
128130

129131

130132
@bs.get('/updates')
131133
async def db_updates_test(request):
132134
num_queries = get_num_queries(request)
133-
ids = sorted(random.sample(range(1, 10000 + 1), num_queries))
134-
numbers = sorted(random.sample(range(1, 10000), num_queries))
135-
updates = list(zip(ids, numbers))
136-
137-
# worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
135+
updates = list(zip(
136+
random.sample(range(1, 10000), num_queries),
137+
sorted(random.sample(range(1, 10000), num_queries))
138+
))
138139
worlds = [Result(id=row_id, randomNumber=number) for row_id, number in updates]
140+
# worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
139141
async with db_pool.acquire() as db_conn:
140142
statement = await db_conn.prepare(READ_ROW_SQL)
141143
for row_id, _ in updates:
142144
await statement.fetchval(row_id)
143145
await db_conn.executemany(WRITE_ROW_SQL, updates)
144-
145146
return jsonify(worlds)
146147

147148

@@ -150,3 +151,26 @@ async def plaintext_test(request):
150151
return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
151152
#return bs.text('Hello, World!')
152153

154+
155+
if platform.python_implementation() == 'PyPy':
156+
from socketify import ASGI
157+
workers = int(multiprocessing.cpu_count())
158+
if _is_travis:
159+
workers = 2
160+
161+
def run_app():
162+
ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run()
163+
164+
165+
def create_fork():
166+
n = os.fork()
167+
# n greater than 0 means parent process
168+
if not n > 0:
169+
run_app()
170+
171+
172+
# fork limiting the cpu count - 1
173+
for i in range(1, workers):
174+
create_fork()
175+
176+
run_app()

frameworks/Python/blacksheep/benchmark_config.json

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,29 @@
4646
"display_name": "blacksheep-nginx-unit",
4747
"versus": "None",
4848
"notes": ""
49+
},
50+
"pypy-socketify": {
51+
"json_url": "/json",
52+
"fortune_url": "/fortunes",
53+
"plaintext_url": "/plaintext",
54+
"db_url": "/db",
55+
"query_url": "/queries?queries=",
56+
"update_url": "/updates?queries=",
57+
"port": 8080,
58+
"approach": "Realistic",
59+
"classification": "Micro",
60+
"framework": "blacksheep",
61+
"language": "Python",
62+
"flavor": "Python3",
63+
"platform": "ASGI",
64+
"webserver": "Socketify.py",
65+
"os": "Linux",
66+
"orm": "Raw",
67+
"database_os": "Linux",
68+
"database": "Postgres",
69+
"display_name": "Blacksheep [Socketify.py PyPy3]",
70+
"versus": "None",
71+
"notes": ""
4972
}
5073
}]
5174
}

frameworks/Python/blacksheep/blacksheep-nginx-unit.dockerfile

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@ WORKDIR /blacksheep
44

55
COPY ./ /blacksheep
66

7-
RUN pip3 install -U pip
8-
RUN pip3 install Cython==3.0.12
9-
RUN pip3 install -r /blacksheep/requirements.txt
10-
RUN pip3 install -r /blacksheep/requirements-uvicorn.txt
7+
RUN apt-get update; apt-get install libuv1 -y
118

12-
RUN chmod +x start-unit.sh
9+
RUN pip3 install -U pip -q
10+
RUN pip3 install Cython==3.0.12 -q
11+
RUN pip3 install -r /blacksheep/requirements.txt -q
12+
RUN pip3 install -r /blacksheep/requirements-uvicorn.txt -q
1313

1414
ENV PGSSLMODE=disable
15+
RUN CORE_COUNT=$(nproc) && \
16+
sed -i "s|\"processes\": [0-9]*|\"processes\": $CORE_COUNT|g" /blacksheep/unit-config.json
1517

18+
RUN chmod +x start-unit.sh
19+
ENTRYPOINT []
1620
EXPOSE 8080
17-
CMD ["./start-unit.sh"]
21+
22+
# CMD ["unitd", "--no-daemon", "--control", "unix:/var/run/control.unit.sock"]
23+
CMD ["./start-unit.sh"]
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
FROM pypy:3.11-bookworm
2+
3+
ADD ./ /blacksheep
4+
5+
WORKDIR /blacksheep
6+
7+
RUN apt-get update; apt-get install libuv1 libpq5 -y
8+
9+
RUN pip3 install -r /blacksheep/requirements.txt
10+
RUN pip3 install -r /blacksheep/requirements-pypy.txt
11+
12+
EXPOSE 8080
13+
14+
CMD python ./app-socketify.py
15+

frameworks/Python/blacksheep/blacksheep.dockerfile

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ WORKDIR /blacksheep
44

55
COPY ./ /blacksheep
66

7-
RUN pip3 install -U pip
8-
RUN pip3 install Cython==3.0.12
9-
RUN pip3 install -r /blacksheep/requirements.txt
10-
RUN pip3 install -r /blacksheep/requirements-gunicorn.txt
11-
RUN pip3 install -r /blacksheep/requirements-uvicorn.txt
7+
RUN apt-get update; apt-get install libuv1 -y
128

9+
RUN pip3 install -U pip -q
10+
RUN pip3 install Cython==3.0.12 -q
11+
RUN pip3 install -r /blacksheep/requirements.txt -q
12+
RUN pip3 install -r /blacksheep/requirements-uvicorn.txt -q
13+
ENV GUNICORN=1
1314
EXPOSE 8080
1415

15-
CMD gunicorn app:app -k uvicorn.workers.UvicornWorker -c blacksheep_conf.py
16+
CMD gunicorn app:app -k uvicorn_worker.UvicornWorker -c blacksheep_conf.py

0 commit comments

Comments
 (0)