From 5e72772083cb3fc586b3a8af5a3f2cd625f9e44e Mon Sep 17 00:00:00 2001 From: Paul Sanders Date: Tue, 28 Oct 2025 10:50:26 -0400 Subject: [PATCH 1/3] Add db transactions --- src/fastapi/service_files.rs | 56 ++++++++++++++---------------------- 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/src/fastapi/service_files.rs b/src/fastapi/service_files.rs index ea61667f..653c8c54 100644 --- a/src/fastapi/service_files.rs +++ b/src/fastapi/service_files.rs @@ -144,14 +144,15 @@ async def create_user(*, pool: Pool, cache_client: Valkey, user: UserCreate) -> """ async with pool.acquire() as conn: - result = await conn.fetchrow( - query, - user.email, - user.full_name, - get_password_hash(user.password), - user.is_active, - user.is_superuser, - ) + async with conn.transaction(): + result = await conn.fetchrow( + query, + user.email, + user.full_name, + get_password_hash(user.password), + user.is_active, + user.is_superuser, + ) # failsafe: this shouldn't happen if not result: # pragma: no cover @@ -166,17 +167,15 @@ async def create_user(*, pool: Pool, cache_client: Valkey, user: UserCreate) -> async def delete_user(*, pool: Pool, cache_client: Valkey, user_id: str) -> None: query = "DELETE FROM users WHERE id::text = $1" async with pool.acquire() as conn: - async with asyncio.TaskGroup() as tg: - db_task = tg.create_task(conn.execute(query, user_id)) - tg.create_task( - user_cache_services.delete_cached_user(cache_client=cache_client, user_id=user_id) - ) - - result = await db_task + async with conn.transaction(): + result = await conn.execute(query, user_id) if result == "DELETE 0": # pragma: no cover raise UserNotFoundError(f"User with id {{user_id}} not found") + logger.debug("Deleting cached user") + user_cache_services.delete_cached_user(cache_client=cache_client, user_id=user_id) + async def get_users(*, pool: Pool, offset: int = 0, limit: int = 100) -> list[UserInDb] | None: query = """ @@ -350,18 +349,10 @@ async def update_user( """ async with pool.acquire() as conn: - async with asyncio.TaskGroup() as tg: - db_task = tg.create_task( - conn.fetchrow(query, get_password_hash(user_in.new_password), db_user.id) - ) - tg.create_task( - user_cache_services.delete_cached_user( - cache_client=cache_client, user_id=db_user.id - ) + async with conn.transaction(): + result = await conn.fetchrow( + query, get_password_hash(user_in.new_password), db_user.id ) - - result = await db_task - else: user_data = user_in.model_dump(exclude_unset=True) if "password" in user_data: @@ -382,19 +373,14 @@ async def update_user( """ async with pool.acquire() as conn: - async with asyncio.TaskGroup() as tg: - db_task = tg.create_task(conn.fetchrow(query, db_user.id, *user_data.values())) - tg.create_task( - user_cache_services.delete_cached_user( - cache_client=cache_client, user_id=db_user.id - ) - ) - - result = await db_task + async with conn.transaction(): + result = await conn.fetchrow(query, db_user.id, *user_data.values())) if not result or result == "UPDATE 0": # pragma: no cover raise DbUpdateError("Error updating user") + await user_cache_services.delete_cached_user(cache_client=cache_client, user_id=db_user.id) + return UserInDb(**dict(result)) "# ) From 11c326ac5f2436ec1d2f3268867e969169727628 Mon Sep 17 00:00:00 2001 From: Paul Sanders Date: Tue, 28 Oct 2025 10:54:24 -0400 Subject: [PATCH 2/3] Fix build error --- src/fastapi/service_files.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fastapi/service_files.rs b/src/fastapi/service_files.rs index 653c8c54..e32ae3e8 100644 --- a/src/fastapi/service_files.rs +++ b/src/fastapi/service_files.rs @@ -374,7 +374,7 @@ async def update_user( async with pool.acquire() as conn: async with conn.transaction(): - result = await conn.fetchrow(query, db_user.id, *user_data.values())) + result = await conn.fetchrow(query, db_user.id, *user_data.values()) if not result or result == "UPDATE 0": # pragma: no cover raise DbUpdateError("Error updating user") From 55394b5ae0846a887907dbf65993ad5f7a7bc986 Mon Sep 17 00:00:00 2001 From: Paul Sanders Date: Tue, 28 Oct 2025 10:56:47 -0400 Subject: [PATCH 3/3] Add missing await --- src/fastapi/service_files.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fastapi/service_files.rs b/src/fastapi/service_files.rs index e32ae3e8..138bb2ea 100644 --- a/src/fastapi/service_files.rs +++ b/src/fastapi/service_files.rs @@ -174,7 +174,7 @@ async def delete_user(*, pool: Pool, cache_client: Valkey, user_id: str) -> None raise UserNotFoundError(f"User with id {{user_id}} not found") logger.debug("Deleting cached user") - user_cache_services.delete_cached_user(cache_client=cache_client, user_id=user_id) + await user_cache_services.delete_cached_user(cache_client=cache_client, user_id=user_id) async def get_users(*, pool: Pool, offset: int = 0, limit: int = 100) -> list[UserInDb] | None: