-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
60 lines (39 loc) · 1.46 KB
/
server.py
File metadata and controls
60 lines (39 loc) · 1.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from starlette.applications import Starlette
from starlette.routing import Mount, Route, Router
import settings
from logger import logger
from pools import RedisPool, PostgresPool
from views import HomeView, AutocompleteView, PrevisionView
from middleware import SessionMiddleware, AuditMiddleware
# app init
app = Starlette()
app.add_middleware(SessionMiddleware)
if settings.AUDIT:
app.add_middleware(AuditMiddleware)
# app events
@app.on_event('startup')
async def setup_pools():
"Inicializa los pools de conexión de Redis y Postgresql"
await RedisPool.initialize()
await PostgresPool.initialize()
if PostgresPool.pool and settings.AUDIT:
await PostgresPool.pool.execute(AuditMiddleware.TABLE_CREATION)
all_ok = all([_.pool is not None for _ in [RedisPool, PostgresPool]])
logger.debug("[] setup_pools() ok: %s", all_ok)
@app.on_event('shutdown')
async def teardown_pools():
"Cierra los pools de conexión de Redis y Postgresql"
if RedisPool.pool:
RedisPool.pool.close()
if PostgresPool.pool:
await PostgresPool.pool.close()
logger.debug("[] teardown_pools()")
# app routing
app.mount('', Router([
Route('/', endpoint=HomeView, methods=['GET']),
Route('/autocomplete', endpoint=AutocompleteView, methods=['GET']),
Route('/prevision/{localidad_id:int}/', endpoint=PrevisionView, methods=['GET']),
]))
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8000)