Skip to content

Commit 1bf60ee

Browse files
committed
asyncpg version of aggregation
1 parent 832db9e commit 1bf60ee

File tree

3 files changed

+78
-17
lines changed

3 files changed

+78
-17
lines changed

packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py

Lines changed: 73 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
import datetime
22
import logging
3+
import warnings
34
from dataclasses import dataclass, fields
45
from typing import Any
56

67
import sqlalchemy as sa
78
from aiopg.sa.connection import SAConnection
89
from aiopg.sa.result import RowProxy
10+
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
911

1012
from .models.groups import GroupType, groups, user_to_groups
1113
from .models.groups_extra_properties import groups_extra_properties
1214
from .utils_models import FromRowMixin
15+
from .utils_repos import pass_or_acquire_connection
1316

1417
_logger = logging.getLogger(__name__)
1518

@@ -103,31 +106,52 @@ def _merge_extra_properties_booleans(
103106

104107
@dataclass(frozen=True, slots=True, kw_only=True)
105108
class GroupExtraPropertiesRepo:
109+
@staticmethod
110+
def _get_stmt(gid: int, product_name: str):
111+
return sa.select(groups_extra_properties).where(
112+
(groups_extra_properties.c.group_id == gid)
113+
& (groups_extra_properties.c.product_name == product_name)
114+
)
115+
106116
@staticmethod
107117
async def get(
108118
connection: SAConnection, *, gid: int, product_name: str
109119
) -> GroupExtraProperties:
110-
get_stmt = sa.select(groups_extra_properties).where(
111-
(groups_extra_properties.c.group_id == gid)
112-
& (groups_extra_properties.c.product_name == product_name)
120+
warnings.warn(
121+
f"{__name__}.get_v2 uses aiopg which has been deprecated in this repo."
122+
"Use get_v2 instead. "
123+
"See https://github.com/ITISFoundation/osparc-simcore/issues/4529",
124+
DeprecationWarning,
125+
stacklevel=1,
113126
)
114-
result = await connection.execute(get_stmt)
127+
128+
query = GroupExtraPropertiesRepo._get_stmt(gid, product_name)
129+
result = await connection.execute(query)
115130
assert result # nosec
116131
if row := await result.first():
117132
return GroupExtraProperties.from_row_proxy(row)
118133
msg = f"Properties for group {gid} not found"
119134
raise GroupExtraPropertiesNotFoundError(msg)
120135

121136
@staticmethod
122-
async def get_aggregated_properties_for_user(
123-
connection: SAConnection,
137+
async def get_v2(
138+
engine: AsyncEngine,
139+
connection: AsyncConnection | None = None,
124140
*,
125-
user_id: int,
141+
gid: int,
126142
product_name: str,
127143
) -> GroupExtraProperties:
128-
rows = await _list_table_entries_ordered_by_group_type(
129-
connection, user_id, product_name
130-
)
144+
async with pass_or_acquire_connection(engine, connection) as conn:
145+
query = GroupExtraPropertiesRepo._get_stmt(gid, product_name)
146+
result = await conn.stream(query)
147+
assert result # nosec
148+
if row := await result.first():
149+
return GroupExtraProperties.from_orm(row)
150+
msg = f"Properties for group {gid} not found"
151+
raise GroupExtraPropertiesNotFoundError(msg)
152+
153+
@staticmethod
154+
def _aggregate(rows, user_id, product_name):
131155
merged_standard_extra_properties = None
132156
for row in rows:
133157
group_extra_properties = GroupExtraProperties.from_row_proxy(row)
@@ -161,3 +185,42 @@ async def get_aggregated_properties_for_user(
161185
return merged_standard_extra_properties
162186
msg = f"Properties for user {user_id} in {product_name} not found"
163187
raise GroupExtraPropertiesNotFoundError(msg)
188+
189+
@staticmethod
190+
async def get_aggregated_properties_for_user(
191+
connection: SAConnection,
192+
*,
193+
user_id: int,
194+
product_name: str,
195+
) -> GroupExtraProperties:
196+
warnings.warn(
197+
f"{__name__}.get_aggregated_properties_for_user uses aiopg which has been deprecated in this repo. "
198+
"Use get_aggregated_properties_for_user_v2 instead. "
199+
"See https://github.com/ITISFoundation/osparc-simcore/issues/4529",
200+
DeprecationWarning,
201+
stacklevel=1,
202+
)
203+
204+
rows = await _list_table_entries_ordered_by_group_type(
205+
connection, user_id, product_name
206+
)
207+
return GroupExtraPropertiesRepo._aggregate(rows, user_id, product_name)
208+
209+
@staticmethod
210+
async def get_aggregated_properties_for_user_v2(
211+
engine: AsyncEngine,
212+
connection: AsyncConnection | None = None,
213+
*,
214+
user_id: int,
215+
product_name: str,
216+
) -> GroupExtraProperties:
217+
async with pass_or_acquire_connection(engine, connection) as conn:
218+
219+
list_stmt = _list_table_entries_ordered_by_group_type_query(
220+
user_id=user_id, product_name=product_name
221+
)
222+
result = await conn.stream(
223+
sa.select(list_stmt).order_by(list_stmt.c.type_order)
224+
)
225+
rows = [row async for row in result]
226+
return GroupExtraPropertiesRepo._aggregate(rows, user_id, product_name)

packages/postgres-database/src/simcore_postgres_database/utils_models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def from_row_proxy(cls: type[ModelType], row: RowProxy) -> ModelType:
1717
return cls(**{k: v for k, v in row.items() if k in field_names}) # type: ignore[return-value]
1818

1919
@classmethod
20-
def from_row(cls: type[ModelType], row: Row) -> ModelType:
20+
def from_orm(cls: type[ModelType], row: Row) -> ModelType:
2121
assert is_dataclass(cls) # nosec
2222
field_names = [f.name for f in fields(cls)]
2323
return cls(**{k: v for k, v in row._asdict().items() if k in field_names}) # type: ignore[return-value]

services/web/server/src/simcore_service_webserver/users/_users_repository.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,12 @@ async def list_user_permissions(
7777
name="override_services_specifications",
7878
allowed=False,
7979
)
80+
engine = get_asyncpg_engine(app)
8081
with contextlib.suppress(GroupExtraPropertiesNotFoundError):
81-
async with pass_or_acquire_connection(
82-
get_asyncpg_engine(app), connection
83-
) as conn:
82+
async with pass_or_acquire_connection(engine, connection) as conn:
8483
user_group_extra_properties = (
85-
# TODO: adapt to asyncpg
86-
await GroupExtraPropertiesRepo.get_aggregated_properties_for_user(
87-
conn, user_id=user_id, product_name=product_name
84+
await GroupExtraPropertiesRepo.get_aggregated_properties_for_user_v2(
85+
engine, conn, user_id=user_id, product_name=product_name
8886
)
8987
)
9088
override_services_specifications.allowed = (

0 commit comments

Comments
 (0)