2222from simcore_postgres_database .models .workspaces_access_rights import (
2323 workspaces_access_rights ,
2424)
25+ from simcore_postgres_database .utils_repos import (
26+ pass_or_acquire_connection ,
27+ transaction_context ,
28+ )
2529from sqlalchemy import asc , desc , func
2630from sqlalchemy .dialects .postgresql import BOOLEAN , INTEGER
31+ from sqlalchemy .ext .asyncio import AsyncConnection
2732from sqlalchemy .sql import Subquery , select
2833
29- from ..db .plugin import get_database_engine
34+ from ..db .plugin import get_asyncpg_engine
3035from .errors import WorkspaceAccessForbiddenError , WorkspaceNotFoundError
3136
3237_logger = logging .getLogger (__name__ )
4550
4651async def create_workspace (
4752 app : web .Application ,
53+ connection : AsyncConnection | None = None ,
54+ * ,
4855 product_name : ProductName ,
4956 owner_primary_gid : GroupID ,
5057 name : str ,
5158 description : str | None ,
5259 thumbnail : str | None ,
5360) -> WorkspaceDB :
54- async with get_database_engine ( app ). acquire ( ) as conn :
61+ async with transaction_context ( get_asyncpg_engine ( app ), connection ) as conn :
5562 result = await conn .execute (
5663 workspaces .insert ()
5764 .values (
@@ -65,11 +72,11 @@ async def create_workspace(
6572 )
6673 .returning (* _SELECTION_ARGS )
6774 )
68- row = await result .first ()
75+ row = result .first ()
6976 return WorkspaceDB .from_orm (row )
7077
7178
72- access_rights_subquery = (
79+ _access_rights_subquery = (
7380 select (
7481 workspaces_access_rights .c .workspace_id ,
7582 func .jsonb_object_agg (
@@ -116,6 +123,7 @@ def _create_my_access_rights_subquery(user_id: UserID) -> Subquery:
116123
117124async def list_workspaces_for_user (
118125 app : web .Application ,
126+ connection : AsyncConnection | None = None ,
119127 * ,
120128 user_id : UserID ,
121129 product_name : ProductName ,
@@ -128,11 +136,11 @@ async def list_workspaces_for_user(
128136 base_query = (
129137 select (
130138 * _SELECTION_ARGS ,
131- access_rights_subquery .c .access_rights ,
139+ _access_rights_subquery .c .access_rights ,
132140 my_access_rights_subquery .c .my_access_rights ,
133141 )
134142 .select_from (
135- workspaces .join (access_rights_subquery ).join (my_access_rights_subquery )
143+ workspaces .join (_access_rights_subquery ).join (my_access_rights_subquery )
136144 )
137145 .where (workspaces .c .product_name == product_name )
138146 )
@@ -148,12 +156,12 @@ async def list_workspaces_for_user(
148156 list_query = base_query .order_by (desc (getattr (workspaces .c , order_by .field )))
149157 list_query = list_query .offset (offset ).limit (limit )
150158
151- async with get_database_engine ( app ). acquire ( ) as conn :
159+ async with pass_or_acquire_connection ( get_asyncpg_engine ( app ), connection ) as conn :
152160 count_result = await conn .execute (count_query )
153- total_count = await count_result .scalar ()
161+ total_count = count_result .scalar ()
154162
155163 result = await conn .execute (list_query )
156- rows = await result .fetchall () or []
164+ rows = result .fetchall () or []
157165 results : list [UserWorkspaceAccessRightsDB ] = [
158166 UserWorkspaceAccessRightsDB .from_orm (row ) for row in rows
159167 ]
@@ -163,6 +171,8 @@ async def list_workspaces_for_user(
163171
164172async def get_workspace_for_user (
165173 app : web .Application ,
174+ connection : AsyncConnection | None = None ,
175+ * ,
166176 user_id : UserID ,
167177 workspace_id : WorkspaceID ,
168178 product_name : ProductName ,
@@ -172,21 +182,21 @@ async def get_workspace_for_user(
172182 base_query = (
173183 select (
174184 * _SELECTION_ARGS ,
175- access_rights_subquery .c .access_rights ,
185+ _access_rights_subquery .c .access_rights ,
176186 my_access_rights_subquery .c .my_access_rights ,
177187 )
178188 .select_from (
179- workspaces .join (access_rights_subquery ).join (my_access_rights_subquery )
189+ workspaces .join (_access_rights_subquery ).join (my_access_rights_subquery )
180190 )
181191 .where (
182192 (workspaces .c .workspace_id == workspace_id )
183193 & (workspaces .c .product_name == product_name )
184194 )
185195 )
186196
187- async with get_database_engine ( app ). acquire ( ) as conn :
197+ async with pass_or_acquire_connection ( get_asyncpg_engine ( app ), connection ) as conn :
188198 result = await conn .execute (base_query )
189- row = await result .first ()
199+ row = result .first ()
190200 if row is None :
191201 raise WorkspaceAccessForbiddenError (
192202 reason = f"User { user_id } does not have access to the workspace { workspace_id } . Or workspace does not exist." ,
@@ -196,13 +206,15 @@ async def get_workspace_for_user(
196206
197207async def update_workspace (
198208 app : web .Application ,
209+ connection : AsyncConnection | None = None ,
210+ * ,
199211 workspace_id : WorkspaceID ,
200212 name : str ,
201213 description : str | None ,
202214 thumbnail : str | None ,
203215 product_name : ProductName ,
204216) -> WorkspaceDB :
205- async with get_database_engine ( app ). acquire ( ) as conn :
217+ async with transaction_context ( get_asyncpg_engine ( app ), connection ) as conn :
206218 result = await conn .execute (
207219 workspaces .update ()
208220 .values (
@@ -217,18 +229,20 @@ async def update_workspace(
217229 )
218230 .returning (* _SELECTION_ARGS )
219231 )
220- row = await result .first ()
232+ row = result .first ()
221233 if row is None :
222234 raise WorkspaceNotFoundError (reason = f"Workspace { workspace_id } not found." )
223235 return WorkspaceDB .from_orm (row )
224236
225237
226238async def delete_workspace (
227239 app : web .Application ,
240+ connection : AsyncConnection | None = None ,
241+ * ,
228242 workspace_id : WorkspaceID ,
229243 product_name : ProductName ,
230244) -> None :
231- async with get_database_engine ( app ). acquire ( ) as conn :
245+ async with transaction_context ( get_asyncpg_engine ( app ), connection ) as conn :
232246 await conn .execute (
233247 workspaces .delete ().where (
234248 (workspaces .c .workspace_id == workspace_id )
0 commit comments