@@ -131,7 +131,7 @@ async def create_function( # noqa: PLR0913
131131 api_access_rights = [FunctionsApiAccessRights .WRITE_FUNCTIONS ],
132132 )
133133
134- result = await transaction .stream (
134+ result = await transaction .execute (
135135 functions_table .insert ()
136136 .values (
137137 title = title ,
@@ -144,7 +144,7 @@ async def create_function( # noqa: PLR0913
144144 )
145145 .returning (* _FUNCTIONS_TABLE_COLS )
146146 )
147- row = await result .one ()
147+ row = result .one ()
148148
149149 registered_function = RegisteredFunctionDB .model_validate (row )
150150
@@ -273,7 +273,7 @@ async def create_function_job_collection(
273273 )
274274 job_collection_entries : list [Row ] = []
275275 for job_id in job_ids :
276- result = await transaction .stream (
276+ result = await transaction .execute (
277277 function_job_collections_to_function_jobs_table .insert ()
278278 .values (
279279 function_job_collection_uuid = function_job_collection_db .uuid ,
@@ -284,7 +284,7 @@ async def create_function_job_collection(
284284 function_job_collections_to_function_jobs_table .c .function_job_uuid ,
285285 )
286286 )
287- entry = await result .one_or_none ()
287+ entry = result .one_or_none ()
288288 assert entry is not None , (
289289 f"No row was returned from the database after creating function job collection entry { title } ."
290290 f" Job ID: { job_id } "
@@ -383,25 +383,25 @@ async def list_functions(
383383 .select_from (functions_table )
384384 .where (functions_table .c .uuid .in_ (subquery ))
385385 )
386- result = await conn .stream (
387- functions_table .select ()
388- .where (functions_table .c .uuid .in_ (subquery ))
389- .offset (pagination_offset )
390- .limit (pagination_limit )
391- )
392- rows = await result .all ()
393- if rows is None :
386+ if total_count_result == 0 :
394387 return [], PageMetaInfoLimitOffset (
395388 total = 0 , offset = pagination_offset , limit = pagination_limit , count = 0
396389 )
390+ function_rows = [
391+ RegisteredFunctionDB .model_validate (row )
392+ async for row in await conn .stream (
393+ functions_table .select ()
394+ .where (functions_table .c .uuid .in_ (subquery ))
395+ .offset (pagination_offset )
396+ .limit (pagination_limit )
397+ )
398+ ]
397399
398- return [
399- RegisteredFunctionDB .model_validate (row ) for row in rows
400- ], PageMetaInfoLimitOffset (
400+ return function_rows , PageMetaInfoLimitOffset (
401401 total = total_count_result ,
402402 offset = pagination_offset ,
403403 limit = pagination_limit ,
404- count = len (rows ),
404+ count = len (function_rows ),
405405 )
406406
407407
@@ -446,30 +446,30 @@ async def list_function_jobs(
446446 else sqlalchemy .sql .true ()
447447 )
448448 )
449- result = await conn .stream (
450- function_jobs_table .select ()
451- .where (function_jobs_table .c .uuid .in_ (access_subquery ))
452- .where (
453- function_jobs_table .c .function_uuid == filter_by_function_id
454- if filter_by_function_id
455- else sqlalchemy .sql .true ()
456- )
457- .offset (pagination_offset )
458- .limit (pagination_limit )
459- )
460- rows = await result .all ()
461- if rows is None :
449+ if total_count_result == 0 :
462450 return [], PageMetaInfoLimitOffset (
463451 total = 0 , offset = pagination_offset , limit = pagination_limit , count = 0
464452 )
453+ results = [
454+ RegisteredFunctionJobDB .model_validate (row )
455+ async for row in await conn .stream (
456+ function_jobs_table .select ()
457+ .where (function_jobs_table .c .uuid .in_ (access_subquery ))
458+ .where (
459+ function_jobs_table .c .function_uuid == filter_by_function_id
460+ if filter_by_function_id
461+ else sqlalchemy .sql .true ()
462+ )
463+ .offset (pagination_offset )
464+ .limit (pagination_limit )
465+ )
466+ ]
465467
466- return [
467- RegisteredFunctionJobDB .model_validate (row ) for row in rows
468- ], PageMetaInfoLimitOffset (
468+ return results , PageMetaInfoLimitOffset (
469469 total = total_count_result ,
470470 offset = pagination_offset ,
471471 limit = pagination_limit ,
472- count = len (rows ),
472+ count = len (results ),
473473 )
474474
475475
@@ -550,40 +550,35 @@ async def list_function_job_collections(
550550 .select_from (function_job_collections_table )
551551 .where (filter_and_access_condition )
552552 )
553- query = function_job_collections_table .select ().where (
554- filter_and_access_condition
555- )
556-
557- result = await conn .stream (
558- query .offset (pagination_offset ).limit (pagination_limit )
559- )
560- rows = await result .all ()
561- if rows is None :
553+ if total_count_result == 0 :
562554 return [], PageMetaInfoLimitOffset (
563555 total = 0 , offset = pagination_offset , limit = pagination_limit , count = 0
564556 )
565557
558+ query = function_job_collections_table .select ().where (
559+ filter_and_access_condition
560+ )
561+
566562 collections = []
567- for row in rows :
563+ async for row in await conn .stream (
564+ query .offset (pagination_offset ).limit (pagination_limit )
565+ ):
568566 collection = RegisteredFunctionJobCollectionDB .model_validate (row )
569- job_result = await conn .stream (
570- function_job_collections_to_function_jobs_table .select ().where (
571- function_job_collections_to_function_jobs_table .c .function_job_collection_uuid
572- == row ["uuid" ]
567+ job_ids = [
568+ job_row ["function_job_uuid" ]
569+ async for job_row in await conn .stream (
570+ function_job_collections_to_function_jobs_table .select ().where (
571+ function_job_collections_to_function_jobs_table .c .function_job_collection_uuid
572+ == row ["uuid" ]
573+ )
573574 )
574- )
575- job_rows = await job_result .all ()
576- job_ids = (
577- [job_row ["function_job_uuid" ] for job_row in job_rows ]
578- if job_rows
579- else []
580- )
575+ ]
581576 collections .append ((collection , job_ids ))
582577 return collections , PageMetaInfoLimitOffset (
583578 total = total_count_result ,
584579 offset = pagination_offset ,
585580 limit = pagination_limit ,
586- count = len (rows ),
581+ count = len (collections ),
587582 )
588583
589584
@@ -618,10 +613,10 @@ async def delete_function(
618613 )
619614
620615 # Check if the function exists
621- result = await transaction .stream (
616+ result = await transaction .execute (
622617 functions_table .select ().where (functions_table .c .uuid == function_id )
623618 )
624- row = await result .one_or_none ()
619+ row = result .one_or_none ()
625620
626621 if row is None :
627622 raise FunctionIDNotFoundError (function_id = function_id )
@@ -825,19 +820,13 @@ async def find_cached_function_jobs(
825820 api_access_rights = [FunctionsApiAccessRights .READ_FUNCTION_JOBS ],
826821 )
827822
828- result = await conn .stream (
823+ jobs : list [RegisteredFunctionJobDB ] = []
824+ async for row in await conn .stream (
829825 function_jobs_table .select ().where (
830826 function_jobs_table .c .function_uuid == function_id ,
831827 cast (function_jobs_table .c .inputs , Text ) == json .dumps (inputs ),
832- ),
833- )
834- rows = await result .all ()
835-
836- if rows is None or len (rows ) == 0 :
837- return None
838-
839- jobs : list [RegisteredFunctionJobDB ] = []
840- for row in rows :
828+ )
829+ ):
841830 job = RegisteredFunctionJobDB .model_validate (row )
842831 try :
843832 await check_user_permissions (
@@ -899,17 +888,15 @@ async def get_function_job_collection(
899888 )
900889
901890 # Retrieve associated job ids from the join table
902- job_result = await conn .stream (
903- function_job_collections_to_function_jobs_table .select ().where (
904- function_job_collections_to_function_jobs_table .c .function_job_collection_uuid
905- == row ["uuid" ]
891+ job_ids = [
892+ job_row ["function_job_uuid" ]
893+ async for job_row in await conn .stream (
894+ function_job_collections_to_function_jobs_table .select ().where (
895+ function_job_collections_to_function_jobs_table .c .function_job_collection_uuid
896+ == row ["uuid" ]
897+ )
906898 )
907- )
908- job_rows = await job_result .all ()
909-
910- job_ids = (
911- [job_row ["function_job_uuid" ] for job_row in job_rows ] if job_rows else []
912- )
899+ ]
913900
914901 job_collection = RegisteredFunctionJobCollectionDB .model_validate (row )
915902
@@ -1000,13 +987,13 @@ async def set_group_permissions(
1000987 async with transaction_context (get_asyncpg_engine (app ), connection ) as transaction :
1001988 for object_id in object_ids :
1002989 # Check if the group already has access rights for the function
1003- result = await transaction .stream (
990+ result = await transaction .execute (
1004991 access_rights_table .select ().where (
1005992 getattr (access_rights_table .c , field_name ) == object_id ,
1006993 access_rights_table .c .group_id == group_id ,
1007994 )
1008995 )
1009- row = await result .one_or_none ()
996+ row = result .one_or_none ()
1010997
1011998 if row is None :
1012999 # Insert new access rights if the group does not have any
@@ -1058,20 +1045,20 @@ async def get_user_api_access_rights(
10581045 if not rows :
10591046 return FunctionUserApiAccessRights (user_id = user_id )
10601047 combined_permissions = {
1061- "read_functions" : any (row [ " read_functions" ] for row in rows ),
1062- "write_functions" : any (row [ " write_functions" ] for row in rows ),
1063- "execute_functions" : any (row [ " execute_functions" ] for row in rows ),
1064- "read_function_jobs" : any (row [ " read_function_jobs" ] for row in rows ),
1065- "write_function_jobs" : any (row [ " write_function_jobs" ] for row in rows ),
1066- "execute_function_jobs" : any (row [ " execute_function_jobs" ] for row in rows ),
1048+ "read_functions" : any (row . read_functions for row in rows ),
1049+ "write_functions" : any (row . write_functions for row in rows ),
1050+ "execute_functions" : any (row . execute_functions for row in rows ),
1051+ "read_function_jobs" : any (row . read_function_jobs for row in rows ),
1052+ "write_function_jobs" : any (row . write_function_jobs for row in rows ),
1053+ "execute_function_jobs" : any (row . execute_function_jobs for row in rows ),
10671054 "read_function_job_collections" : any (
1068- row [ " read_function_job_collections" ] for row in rows
1055+ row . read_function_job_collections for row in rows
10691056 ),
10701057 "write_function_job_collections" : any (
1071- row [ " write_function_job_collections" ] for row in rows
1058+ row . write_function_job_collections for row in rows
10721059 ),
10731060 "execute_function_job_collections" : any (
1074- row [ " execute_function_job_collections" ] for row in rows
1061+ row . execute_function_job_collections for row in rows
10751062 ),
10761063 "user_id" : user_id ,
10771064 }
0 commit comments