Skip to content

Commit dcad2db

Browse files
authored
♻️ Replace aiopg in catalog service (ITISFoundation#2869)
* catalog uses sqlalchemy in async mode * use async sqlalchemy in tests too * added caching of director-v0 services
1 parent 6c63bef commit dcad2db

File tree

28 files changed

+571
-332
lines changed

28 files changed

+571
-332
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from typing import Any, Dict
2+
3+
import sqlalchemy as sa
4+
from sqlalchemy.ext.asyncio import AsyncEngine
5+
6+
from .utils_migration import get_current_head
7+
8+
9+
async def get_pg_engine_stateinfo(engine: AsyncEngine) -> Dict[str, Any]:
10+
return {
11+
"current pool connections": f"{engine.pool.checkedin()=},{engine.pool.checkedout()=}",
12+
}
13+
14+
15+
class DBMigrationError(RuntimeError):
16+
pass
17+
18+
19+
async def raise_if_migration_not_ready(engine: AsyncEngine):
20+
"""Ensures db migration is complete
21+
22+
:raises DBMigrationError
23+
"""
24+
async with engine.connect() as conn:
25+
version_num = await conn.scalar(
26+
sa.DDL('SELECT "version_num" FROM "alembic_version"')
27+
)
28+
head_version_num = get_current_head()
29+
if version_num != head_version_num:
30+
raise DBMigrationError(
31+
f"Migration is incomplete, expected {head_version_num} but got {version_num}"
32+
)

packages/pytest-simcore/src/pytest_simcore/postgres_service.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import logging
55
from typing import AsyncIterator, Dict, Iterator, List
66

7-
import aiopg.sa
87
import pytest
98
import sqlalchemy as sa
109
import tenacity
@@ -168,7 +167,7 @@ def postgres_db(
168167
@pytest.fixture(scope="function")
169168
async def aiopg_engine(
170169
postgres_db: sa.engine.Engine,
171-
) -> AsyncIterator[aiopg.sa.engine.Engine]:
170+
) -> AsyncIterator:
172171
"""An aiopg engine connected to an initialized database"""
173172
from aiopg.sa import create_engine
174173

@@ -181,6 +180,22 @@ async def aiopg_engine(
181180
await engine.wait_closed()
182181

183182

183+
@pytest.fixture(scope="function")
184+
async def sqlalchemy_async_engine(
185+
postgres_db: sa.engine.Engine,
186+
) -> AsyncIterator:
187+
# NOTE: prevent having to import this if latest sqlalchemy not installed
188+
from sqlalchemy.ext.asyncio import create_async_engine
189+
190+
engine = create_async_engine(
191+
f"{postgres_db.url}".replace("postgresql", "postgresql+asyncpg")
192+
)
193+
assert engine
194+
yield engine
195+
196+
await engine.dispose()
197+
198+
184199
@pytest.fixture(scope="function")
185200
def postgres_host_config(postgres_dsn: Dict[str, str], monkeypatch) -> Dict[str, str]:
186201
monkeypatch.setenv("POSTGRES_USER", postgres_dsn["user"])
@@ -195,7 +210,7 @@ def postgres_host_config(postgres_dsn: Dict[str, str], monkeypatch) -> Dict[str,
195210

196211

197212
@pytest.fixture(scope="module")
198-
def postgres_session(postgres_db: sa.engine.Engine) -> sa.orm.session.Session:
213+
def postgres_session(postgres_db: sa.engine.Engine) -> Iterator[sa.orm.session.Session]:
199214
from sqlalchemy.orm.session import Session
200215

201216
Session_cls = sessionmaker(postgres_db)

packages/settings-library/src/settings_library/postgres.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@ class PostgresSettings(BaseCustomSettings):
3333
POSTGRES_CLIENT_NAME: Optional[str] = Field(
3434
None,
3535
description="Name of the application connecting the postgres database, will default to use the host hostname (hostname on linux)",
36-
env=["HOST", "HOSTNAME", "POSTGRES_CLIENT_NAME"],
36+
env=[
37+
"POSTGRES_CLIENT_NAME",
38+
# This is useful when running inside a docker container, then the hostname is set each client gets a different name
39+
"HOST",
40+
"HOSTNAME",
41+
],
3742
)
3843

3944
@validator("POSTGRES_MAXSIZE")
@@ -57,6 +62,11 @@ def dsn(self) -> str:
5762
)
5863
return dsn
5964

65+
@cached_property
66+
def dsn_with_async_sqlalchemy(self) -> str:
67+
dsn = self.dsn.replace("postgresql", "postgresql+asyncpg")
68+
return dsn
69+
6070
@cached_property
6171
def dsn_with_query(self) -> str:
6272
"""Some clients do not support queries in the dsn"""

services/catalog/requirements/_base.in

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ fastapi[all]
2121
pydantic[dotenv]
2222

2323
# database
24-
aiopg[sa]
24+
asyncpg
25+
sqlalchemy[asyncio]
2526

2627
# web client
2728
httpx
2829

2930
# other
31+
aiocache[redis,msgpack]
3032
tenacity
3133
packaging
3234
pyyaml

services/catalog/requirements/_base.txt

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#
55
# pip-compile --output-file=requirements/_base.txt --strip-extras requirements/_base.in
66
#
7+
aiocache==0.11.1
8+
# via -r requirements/_base.in
79
aiodebug==1.1.2
810
# via
911
# -c requirements/../../../packages/service-library/requirements/./_base.in
@@ -12,8 +14,15 @@ aiofiles==0.5.0
1214
# via
1315
# -c requirements/../../../packages/service-library/requirements/./_base.in
1416
# -r requirements/../../../packages/service-library/requirements/_base.in
15-
aiopg==1.3.3
16-
# via -r requirements/_base.in
17+
aioredis==1.3.1
18+
# via
19+
# -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
20+
# -c requirements/../../../packages/postgres-database/requirements/../../../requirements/constraints.txt
21+
# -c requirements/../../../packages/service-library/requirements/../../../requirements/constraints.txt
22+
# -c requirements/../../../packages/service-library/requirements/./../../../requirements/constraints.txt
23+
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
24+
# -c requirements/../../../requirements/constraints.txt
25+
# aiocache
1726
alembic==1.7.4
1827
# via -r requirements/../../../packages/postgres-database/requirements/_base.in
1928
anyio==3.5.0
@@ -23,7 +32,9 @@ anyio==3.5.0
2332
asgiref==3.4.1
2433
# via uvicorn
2534
async-timeout==4.0.2
26-
# via aiopg
35+
# via aioredis
36+
asyncpg==0.25.0
37+
# via -r requirements/_base.in
2738
certifi==2020.12.5
2839
# via
2940
# httpcore
@@ -56,6 +67,8 @@ h11==0.12.0
5667
# via
5768
# httpcore
5869
# uvicorn
70+
hiredis==2.0.0
71+
# via aioredis
5972
httpcore==0.14.4
6073
# via httpx
6174
httptools==0.2.0
@@ -102,6 +115,8 @@ markupsafe==1.1.1
102115
# via
103116
# jinja2
104117
# mako
118+
msgpack==1.0.3
119+
# via aiocache
105120
multidict==5.1.0
106121
# via yarl
107122
opentracing==2.4.0
@@ -113,9 +128,7 @@ orjson==3.4.8
113128
packaging==20.9
114129
# via -r requirements/_base.in
115130
psycopg2-binary==2.8.6
116-
# via
117-
# aiopg
118-
# sqlalchemy
131+
# via sqlalchemy
119132
pydantic==1.9.0
120133
# via
121134
# -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
@@ -180,7 +193,7 @@ sqlalchemy==1.4.31
180193
# -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt
181194
# -c requirements/../../../requirements/constraints.txt
182195
# -r requirements/../../../packages/postgres-database/requirements/_base.in
183-
# aiopg
196+
# -r requirements/_base.in
184197
# alembic
185198
starlette==0.17.1
186199
# via fastapi

services/catalog/requirements/_test.in

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ jsonschema
1414

1515
# testing
1616
pytest
17-
pytest-aiohttp # incompatible with pytest-asyncio. See https://github.com/pytest-dev/pytest-asyncio/issues/76
17+
pytest-aiohttp
18+
pytest-benchmark
1819
pytest-cov
1920
pytest-mock
2021
pytest-runner

services/catalog/requirements/_test.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ ptvsd==4.3.2
166166
# via -r requirements/_test.in
167167
py==1.11.0
168168
# via pytest
169+
py-cpuinfo==8.0.0
170+
# via pytest-benchmark
169171
pycparser==2.21
170172
# via cffi
171173
pylint==2.12.2
@@ -183,13 +185,16 @@ pytest==6.2.5
183185
# -r requirements/_test.in
184186
# pytest-aiohttp
185187
# pytest-asyncio
188+
# pytest-benchmark
186189
# pytest-cov
187190
# pytest-docker
188191
# pytest-mock
189192
pytest-aiohttp==1.0.4
190193
# via -r requirements/_test.in
191194
pytest-asyncio==0.18.1
192195
# via pytest-aiohttp
196+
pytest-benchmark==3.4.1
197+
# via -r requirements/_test.in
193198
pytest-cov==3.0.0
194199
# via -r requirements/_test.in
195200
pytest-docker==0.10.3
Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,32 @@
11
import logging
22
from typing import AsyncGenerator, Callable, Type
33

4-
from aiopg.sa import Engine
54
from fastapi import Depends
65
from fastapi.requests import Request
6+
from sqlalchemy.ext.asyncio import AsyncEngine
77

88
from ...db.repositories import BaseRepository
99

1010
logger = logging.getLogger(__name__)
1111

1212

13-
def _get_db_engine(request: Request) -> Engine:
13+
def _get_db_engine(request: Request) -> AsyncEngine:
1414
return request.app.state.engine
1515

1616

1717
def get_repository(repo_type: Type[BaseRepository]) -> Callable:
1818
async def _get_repo(
19-
engine: Engine = Depends(_get_db_engine),
19+
engine: AsyncEngine = Depends(_get_db_engine),
2020
) -> AsyncGenerator[BaseRepository, None]:
2121
# NOTE: 2 different ideas were tried here with not so good
2222
# 1st one was acquiring a connection per repository which lead to the following issue https://github.com/ITISFoundation/osparc-simcore/pull/1966
2323
# 2nd one was acquiring a connection per request which works but blocks the director-v2 responsiveness once
2424
# the max amount of connections is reached
25-
# now the current solution is to acquire connection when needed.
26-
available_engines = engine.maxsize - (engine.size - engine.freesize)
27-
if available_engines <= 1:
28-
logger.warning(
29-
"Low pg connections available in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
30-
engine.size,
31-
engine.size - engine.freesize,
32-
engine.freesize,
33-
engine.minsize,
34-
engine.maxsize,
35-
)
25+
# now the current solution is to connect connection when needed.
26+
logger.info(
27+
"%s",
28+
f"current pool connections {engine.pool.checkedin()=},{engine.pool.checkedout()=}",
29+
)
3630
yield repo_type(db_engine=engine)
3731

3832
return _get_repo

0 commit comments

Comments
 (0)