|
| 1 | +import base64 |
| 2 | +from http import HTTPStatus |
| 3 | + |
| 4 | +from cryptography import fernet |
| 5 | +from aiohttp import web |
| 6 | +from aiohttp_session import setup, get_session |
| 7 | +from aiohttp_session.cookie_storage import EncryptedCookieStorage |
| 8 | + |
| 9 | + |
| 10 | +DATABASE = [ |
| 11 | + ('admin', 'admin'), |
| 12 | +] |
| 13 | + |
| 14 | + |
| 15 | +def login_required(fn): |
| 16 | + async def wrapped(request, *args, **kwargs): |
| 17 | + app = request.app |
| 18 | + router = app.router |
| 19 | + |
| 20 | + session = await get_session(request) |
| 21 | + |
| 22 | + if 'user_id' not in session: |
| 23 | + return web.HTTPFound(router['login'].url_for()) |
| 24 | + |
| 25 | + user_id = session['user_id'] |
| 26 | + # actually load user from your database (e.g. with aiopg) |
| 27 | + user = DATABASE[user_id] |
| 28 | + app['user'] = user |
| 29 | + return await fn(request, *args, **kwargs) |
| 30 | + |
| 31 | + return wrapped |
| 32 | + |
| 33 | + |
| 34 | +@login_required |
| 35 | +async def handler(request): |
| 36 | + user = request.app['user'] |
| 37 | + return web.Response(text='User {} authorized'.format(user)) |
| 38 | + |
| 39 | + |
| 40 | +tmpl = '''\ |
| 41 | +<html> |
| 42 | + <body> |
| 43 | + <form method="post" action="login"> |
| 44 | + <label>Name:</label><input type="text" name="name"/> |
| 45 | + <label>Password:</label><input type="password" name="password"/> |
| 46 | + <input type="submit" value="Login"/> |
| 47 | + </form> |
| 48 | + </body> |
| 49 | +</html>''' |
| 50 | + |
| 51 | + |
| 52 | +async def login_page(request): |
| 53 | + return web.Response(content_type='text/html', text=tmpl) |
| 54 | + |
| 55 | + |
| 56 | +async def login(request): |
| 57 | + router = request.app.router |
| 58 | + form = await request.post() |
| 59 | + user_signature = (form['name'], form['password']) |
| 60 | + |
| 61 | + # actually implement business logic to check user credentials |
| 62 | + try: |
| 63 | + user_id = DATABASE.index(user_signature) |
| 64 | + session = await get_session(request) |
| 65 | + session['user_id'] = user_id |
| 66 | + return web.HTTPFound(router['restricted'].url_for()) |
| 67 | + except ValueError: |
| 68 | + return web.Response(text='No such user', status=HTTPStatus.FORBIDDEN) |
| 69 | + |
| 70 | + |
| 71 | +def make_app(): |
| 72 | + app = web.Application() |
| 73 | + # secret_key must be 32 url-safe base64-encoded bytes |
| 74 | + fernet_key = fernet.Fernet.generate_key() |
| 75 | + secret_key = base64.urlsafe_b64decode(fernet_key) |
| 76 | + setup(app, EncryptedCookieStorage(secret_key)) |
| 77 | + app.router.add_get('/', handler, name='restricted') |
| 78 | + app.router.add_get('/login', login_page, name='login') |
| 79 | + app.router.add_post('/login', login) |
| 80 | + return app |
| 81 | + |
| 82 | + |
| 83 | +web.run_app(make_app()) |
0 commit comments