33from random import randint , sample
44
55import asyncpg
6- from emmett55 import App , Pipe , current , request , response
6+ from emmett55 import App , Pipe , request , response
77from emmett55 .extensions import Extension , Signals , listen_signal
8- from emmett55 .tools import service
8+ from emmett55 .tools import ServicePipe
99from renoir import Renoir
1010
1111
12+ class NoResetConnection (asyncpg .Connection ):
13+ __slots__ = ()
14+
15+ def get_reset_query (self ):
16+ return ""
17+
18+
1219class AsyncPG (Extension ):
1320 __slots__ = ["pool" ]
1421
@@ -23,10 +30,9 @@ async def build_pool(self):
2330 database = 'hello_world' ,
2431 host = 'tfb-database' ,
2532 port = 5432 ,
26- min_size = 16 ,
27- max_size = 16 ,
28- max_queries = 64_000_000_000 ,
29- max_inactive_connection_lifetime = 0
33+ min_size = 4 ,
34+ max_size = 4 ,
35+ connection_class = NoResetConnection ,
3036 )
3137
3238 @listen_signal (Signals .after_loop )
@@ -40,18 +46,32 @@ class AsyncPGPipe(Pipe):
4046 def __init__ (self , ext ):
4147 self .ext = ext
4248
43- async def open (self ):
44- conn = current ._db_conn = self .ext .pool .acquire ()
45- current .db = await conn .__aenter__ ()
49+ async def pipe (self , next_pipe , ** kwargs ):
50+ async with self .ext .pool .acquire () as conn :
51+ kwargs ['db' ] = conn
52+ return await next_pipe (** kwargs )
53+
4654
47- async def close (self ):
48- await current ._db_conn .__aexit__ ()
55+ class TemplatePipe (Pipe ):
56+ __slots__ = ["template" ]
57+ output = "str"
58+
59+ def __init__ (self , template ):
60+ self .template = f"templates/{ template } "
61+
62+ async def pipe (self , next_pipe , ** kwargs ):
63+ response .content_type = "text/html; charset=utf-8"
64+ ctx = await next_pipe (** kwargs )
65+ return templates .render (self .template , ctx )
4966
5067
5168app = App (__name__ )
5269app .config .handle_static = False
5370templates = Renoir ()
5471
72+ json_routes = app .module (__name__ , 'json' )
73+ json_routes .pipeline = [ServicePipe ('json' )]
74+
5575db_ext = app .use_extension (AsyncPG )
5676
5777SQL_SELECT = 'SELECT "randomnumber", "id" FROM "world" WHERE id = $1'
@@ -60,17 +80,15 @@ async def close(self):
6080sort_key = itemgetter (1 )
6181
6282
63- @app .route ()
64- @service .json
83+ @json_routes .route ()
6584async def json ():
6685 return {'message' : 'Hello, World!' }
6786
6887
69- @app .route ("/db" , pipeline = [db_ext .pipe ])
70- @service .json
71- async def get_random_world ():
88+ @json_routes .route ("/db" , pipeline = [db_ext .pipe ])
89+ async def get_random_world (db ):
7290 row_id = randint (1 , 10000 )
73- number = await current . db .fetchval (SQL_SELECT , row_id )
91+ number = await db .fetchval (SQL_SELECT , row_id )
7492 return {'id' : row_id , 'randomNumber' : number }
7593
7694
@@ -86,41 +104,32 @@ def get_qparam():
86104 return rv
87105
88106
89- @app .route ("/queries" , pipeline = [db_ext .pipe ])
90- @service .json
91- async def get_random_worlds ():
107+ @json_routes .route ("/queries" , pipeline = [db_ext .pipe ])
108+ async def get_random_worlds (db ):
92109 num_queries = get_qparam ()
93110 row_ids = sample (range (1 , 10000 ), num_queries )
94- worlds = []
95- statement = await current .db .prepare (SQL_SELECT )
96- for row_id in row_ids :
97- number = await statement .fetchval (row_id )
98- worlds .append ({'id' : row_id , 'randomNumber' : number })
99- return worlds
111+ rows = await db .fetchmany (SQL_SELECT , [(v ,) for v in row_ids ])
112+ return [{'id' : row_id , 'randomNumber' : number [0 ]} for row_id , number in zip (row_ids , rows )]
100113
101114
102- @app .route (pipeline = [db_ext .pipe ], output = 'str' )
103- async def fortunes ():
104- response .content_type = "text/html; charset=utf-8"
105- fortunes = await current .db .fetch ('SELECT * FROM Fortune' )
115+ @app .route (pipeline = [TemplatePipe ("fortunes.html" ), db_ext .pipe ])
116+ async def fortunes (db ):
117+ fortunes = await db .fetch ('SELECT * FROM Fortune' )
106118 fortunes .append (ROW_ADD )
107119 fortunes .sort (key = sort_key )
108- return templates . render ( "templates/fortunes.html" , {"fortunes" : fortunes })
120+ return {"fortunes" : fortunes }
109121
110122
111- @app .route (pipeline = [db_ext .pipe ])
112- @service .json
113- async def updates ():
123+ @json_routes .route (pipeline = [db_ext .pipe ])
124+ async def updates (db ):
114125 num_queries = get_qparam ()
115126 updates = list (zip (
116127 sample (range (1 , 10000 ), num_queries ),
117128 sorted (sample (range (1 , 10000 ), num_queries ))
118129 ))
119130 worlds = [{'id' : row_id , 'randomNumber' : number } for row_id , number in updates ]
120- statement = await current .db .prepare (SQL_SELECT )
121- for row_id , _ in updates :
122- await statement .fetchval (row_id )
123- await current .db .executemany (SQL_UPDATE , updates )
131+ await db .executemany (SQL_SELECT , [(i [0 ],) for i in updates ])
132+ await db .executemany (SQL_UPDATE , updates )
124133 return worlds
125134
126135
0 commit comments