|
| 1 | +import os |
| 2 | +from operator import itemgetter |
| 3 | +from random import randint, sample |
| 4 | + |
| 5 | +import asyncpg |
| 6 | +from emmett55 import App, Pipe, current, request, response |
| 7 | +from emmett55.extensions import Extension, Signals, listen_signal |
| 8 | +from emmett55.tools import service |
| 9 | +from renoir import Renoir |
| 10 | + |
| 11 | + |
| 12 | +class AsyncPG(Extension): |
| 13 | + __slots__ = ["pool"] |
| 14 | + |
| 15 | + def on_load(self): |
| 16 | + self.pool = None |
| 17 | + self.pipe = AsyncPGPipe(self) |
| 18 | + |
| 19 | + async def build_pool(self): |
| 20 | + self.pool = await asyncpg.create_pool( |
| 21 | + user=os.getenv('PGUSER', 'benchmarkdbuser'), |
| 22 | + password=os.getenv('PGPASS', 'benchmarkdbpass'), |
| 23 | + database='hello_world', |
| 24 | + host='tfb-database', |
| 25 | + port=5432, |
| 26 | + min_size=16, |
| 27 | + max_size=16, |
| 28 | + max_queries=64_000_000_000, |
| 29 | + max_inactive_connection_lifetime=0 |
| 30 | + ) |
| 31 | + |
| 32 | + @listen_signal(Signals.after_loop) |
| 33 | + def _init_pool(self, loop): |
| 34 | + loop.run_until_complete(self.build_pool()) |
| 35 | + |
| 36 | + |
| 37 | +class AsyncPGPipe(Pipe): |
| 38 | + __slots__ = ["ext"] |
| 39 | + |
| 40 | + def __init__(self, ext): |
| 41 | + self.ext = ext |
| 42 | + |
| 43 | + async def open(self): |
| 44 | + conn = current._db_conn = self.ext.pool.acquire() |
| 45 | + current.db = await conn.__aenter__() |
| 46 | + |
| 47 | + async def close(self): |
| 48 | + await current._db_conn.__aexit__() |
| 49 | + |
| 50 | + |
| 51 | +app = App(__name__) |
| 52 | +app.config.handle_static = False |
| 53 | +templates = Renoir() |
| 54 | + |
| 55 | +db_ext = app.use_extension(AsyncPG) |
| 56 | + |
| 57 | +SQL_SELECT = 'SELECT "randomnumber", "id" FROM "world" WHERE id = $1' |
| 58 | +SQL_UPDATE = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2' |
| 59 | +ROW_ADD = [0, 'Additional fortune added at request time.'] |
| 60 | +sort_key = itemgetter(1) |
| 61 | + |
| 62 | + |
| 63 | +@app.route() |
| 64 | +@service.json |
| 65 | +async def json(): |
| 66 | + return {'message': 'Hello, World!'} |
| 67 | + |
| 68 | + |
| 69 | +@app.route("/db", pipeline=[db_ext.pipe]) |
| 70 | +@service.json |
| 71 | +async def get_random_world(): |
| 72 | + row_id = randint(1, 10000) |
| 73 | + number = await current.db.fetchval(SQL_SELECT, row_id) |
| 74 | + return {'id': row_id, 'randomNumber': number} |
| 75 | + |
| 76 | + |
| 77 | +def get_qparam(): |
| 78 | + try: |
| 79 | + rv = int(request.query_params.queries or 1) |
| 80 | + except ValueError: |
| 81 | + return 1 |
| 82 | + if rv < 1: |
| 83 | + return 1 |
| 84 | + if rv > 500: |
| 85 | + return 500 |
| 86 | + return rv |
| 87 | + |
| 88 | + |
| 89 | +@app.route("/queries", pipeline=[db_ext.pipe]) |
| 90 | +@service.json |
| 91 | +async def get_random_worlds(): |
| 92 | + num_queries = get_qparam() |
| 93 | + row_ids = sample(range(1, 10000), num_queries) |
| 94 | + worlds = [] |
| 95 | + statement = await current.db.prepare(SQL_SELECT) |
| 96 | + for row_id in row_ids: |
| 97 | + number = await statement.fetchval(row_id) |
| 98 | + worlds.append({'id': row_id, 'randomNumber': number}) |
| 99 | + return worlds |
| 100 | + |
| 101 | + |
| 102 | +@app.route(pipeline=[db_ext.pipe], output='str') |
| 103 | +async def fortunes(): |
| 104 | + response.content_type = "text/html; charset=utf-8" |
| 105 | + fortunes = await current.db.fetch('SELECT * FROM Fortune') |
| 106 | + fortunes.append(ROW_ADD) |
| 107 | + fortunes.sort(key=sort_key) |
| 108 | + return templates.render("templates/fortunes.html", {"fortunes": fortunes}) |
| 109 | + |
| 110 | + |
| 111 | +@app.route(pipeline=[db_ext.pipe]) |
| 112 | +@service.json |
| 113 | +async def updates(): |
| 114 | + num_queries = get_qparam() |
| 115 | + updates = list(zip( |
| 116 | + sample(range(1, 10000), num_queries), |
| 117 | + sorted(sample(range(1, 10000), num_queries)) |
| 118 | + )) |
| 119 | + worlds = [{'id': row_id, 'randomNumber': number} for row_id, number in updates] |
| 120 | + statement = await current.db.prepare(SQL_SELECT) |
| 121 | + for row_id, _ in updates: |
| 122 | + await statement.fetchval(row_id) |
| 123 | + await current.db.executemany(SQL_UPDATE, updates) |
| 124 | + return worlds |
| 125 | + |
| 126 | + |
| 127 | +@app.route(output='bytes') |
| 128 | +async def plaintext(): |
| 129 | + return b'Hello, World!' |
0 commit comments