1+ import multiprocessing
2+ import os
3+ import asyncpg
4+ import platform
5+ import random
6+ import asyncio
7+ import blacksheep as bs
8+ import jinja2
9+ # import json
10+ from pathlib import Path
11+ try :
12+ import uvloop
13+ asyncio .set_event_loop_policy (uvloop .EventLoopPolicy ())
14+ except Exception :
15+ ...
16+
17+ READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
18+ WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
19+ ADDITIONAL_ROW = [0 , "Additional fortune added at request time." ]
20+ MAX_CONNECTIONS = 1900
21+ CORE_COUNT = multiprocessing .cpu_count ()
22+ PROCESSES = CORE_COUNT
23+ MAX_POOL_SIZE = max (1 ,int (os .getenv ('MAX_POOL_SIZE' , MAX_CONNECTIONS // PROCESSES )))
24+ MIN_POOL_SIZE = max (1 ,int (os .getenv ('MIN_POOL_SIZE' , MAX_POOL_SIZE // 2 )))
25+
26+ WORKER_PROCESSES = CORE_COUNT
27+ MAX_POOL_SIZE = max (1 , MAX_CONNECTIONS // WORKER_PROCESSES )
28+ MIN_POOL_SIZE = max (1 , MAX_POOL_SIZE // 2 )
29+ db_pool = None
30+
31+ async def setup_db (app ):
32+ global db_pool
33+ db_pool = await asyncpg .create_pool (
34+ user = os .getenv ('PGUSER' , "benchmarkdbuser" ),
35+ password = os .getenv ('PGPASS' , "benchmarkdbpass" ),
36+ database = 'hello_world' ,
37+ host = "tfb-database" ,
38+ port = 5432 ,
39+ min_size = MIN_POOL_SIZE ,
40+ max_size = MAX_POOL_SIZE ,
41+ )
42+
43+ async def shutdown_db (app ):
44+ """Close asyncpg connection pool for the current process."""
45+ global db_pool
46+ if db_pool is not None :
47+ await db_pool .close ()
48+ db_pool = None
49+
50+ def load_fortunes_template ():
51+ with Path ("templates/fortune.html" ).open ("r" ) as f :
52+ return jinja2 .Template (f .read ())
53+
54+
55+ fortune_template = load_fortunes_template ()
56+
57+ app = bs .Application ()
58+ app .on_start += setup_db
59+ app .on_stop += shutdown_db
60+
61+ def get_num_queries (request ):
62+ try :
63+ value = request .query .get ('queries' )
64+ if value is None :
65+ return 1
66+ query_count = int (value [0 ])
67+ except (KeyError , IndexError , ValueError ):
68+ return 1
69+ return min (max (query_count , 1 ), 500 )
70+
71+ JSON_CONTENT_TYPE = b"application/json"
72+
73+
74+ # ------------------------------------------------------------------------------------------
75+
76+ @bs .get ('/json' )
77+ async def json_test (request ):
78+ return bs .json ( {'message' : 'Hello, world!' } )
79+
80+ @bs .get ('/db' )
81+ async def single_db_query_test (request ):
82+ row_id = random .randint (1 , 10000 )
83+
84+ async with db_pool .acquire () as db_conn :
85+ number = await db_conn .fetchval (READ_ROW_SQL , row_id )
86+
87+ # return jsonify(Result(id=row_id, randomNumber=number))
88+ # return ({'id': row_id, 'randomNumber': number})
89+ return bs .json ({'id' : row_id , 'randomNumber' : number })
90+
91+
92+ @bs .get ('/queries' )
93+ async def multiple_db_queries_test (request ):
94+ num_queries = get_num_queries (request )
95+ row_ids = random .sample (range (1 , 10000 ), num_queries )
96+ worlds = []
97+
98+ async with db_pool .acquire () as db_conn :
99+ statement = await db_conn .prepare (READ_ROW_SQL )
100+ for row_id in row_ids :
101+ number = await statement .fetchval (row_id )
102+ worlds .append ( {"id" : row_id , "randomNumber" : number } )
103+ # worlds.append(Result(id=row_id, randomNumber=number))
104+
105+ return bs .json (worlds )
106+ # return jsonify(worlds)
107+
108+
109+ @bs .get ('/fortunes' )
110+ async def fortunes_test (request ):
111+ async with db_pool .acquire () as db_conn :
112+ fortunes = await db_conn .fetch ("SELECT * FROM Fortune" )
113+
114+ fortunes .append (ADDITIONAL_ROW )
115+ fortunes .sort (key = lambda row : row [1 ])
116+ data = fortune_template .render (fortunes = fortunes )
117+ return bs .html (data )
118+
119+
120+ @bs .get ('/updates' )
121+ async def db_updates_test (request ):
122+ num_queries = get_num_queries (request )
123+ updates = list (zip (
124+ random .sample (range (1 , 10000 ), num_queries ),
125+ sorted (random .sample (range (1 , 10000 ), num_queries ))
126+ ))
127+ # worlds = [Result(id=row_id, randomNumber=number) for row_id, number in updates]
128+ worlds = [ {"id" : row_id , "randomNumber" : number } for row_id , number in updates ]
129+ async with db_pool .acquire () as db_conn :
130+ statement = await db_conn .prepare (READ_ROW_SQL )
131+ for row_id , _ in updates :
132+ await statement .fetchval (row_id )
133+ await db_conn .executemany (WRITE_ROW_SQL , updates )
134+ return bs .json (worlds )
135+ # return jsonify(worlds)
136+
137+
138+ @bs .get ('/plaintext' )
139+ async def plaintext_test (request ):
140+ return bs .Response (200 , content = bs .Content (b"text/plain" , b'Hello, World!' ))
141+ #return bs.text('Hello, World!')
142+
143+
144+ if platform .python_implementation () == 'PyPy' :
145+ from socketify import ASGI
146+ workers = int (multiprocessing .cpu_count ())
147+ if _is_travis :
148+ workers = 2
149+
150+ def run_app ():
151+ ASGI (app ).listen (8080 , lambda config : logging .info (f"Listening on port http://localhost:{ config .port } now\n " )).run ()
152+
153+
154+ def create_fork ():
155+ n = os .fork ()
156+ # n greater than 0 means parent process
157+ if not n > 0 :
158+ run_app ()
159+
160+
161+ # fork limiting the cpu count - 1
162+ for i in range (1 , workers ):
163+ create_fork ()
164+
165+ run_app ()
0 commit comments