Skip to content

Commit 9a7b1cd

Browse files
Feature/improve uow (#14)
* add tenacity and upgrade libs; * add a retry mechanism for UoW to handle SerializationError; * allow config isolation level for relational database;
1 parent 30dfa61 commit 9a7b1cd

File tree

7 files changed

+190
-119
lines changed

7 files changed

+190
-119
lines changed

config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ class RelationalDBConfig(BaseModel):
6969
url: str
7070
enable_log: bool
7171
enable_auto_migrate: bool
72+
isolation_level: Optional[str] = (
73+
"SERIALIZABLE" # "READ UNCOMMITTED”, “READ COMMITTED”, “REPEATABLE READ”, “SERIALIZABLE”
74+
)
7275

7376

7477
class CfgManagerConfig(BaseModel):

config/.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ RELATIONAL_DB__VENDOR=postgres
1515
RELATIONAL_DB__URL=postgresql+asyncpg://cleanarc:cleanarc!123@localhost:5432/cleanarc
1616
RELATIONAL_DB__ENABLE_LOG=true
1717
RELATIONAL_DB__ENABLE_AUTO_MIGRATE=true
18+
RELATIONAL_DB__ISOLATION_LEVEL='READ COMMITTED' # "READ UNCOMMITTED”, “READ COMMITTED”, “REPEATABLE READ”, “SERIALIZABLE”
1819

1920
# === Authentication Service ===
2021
AUTHENTICATION_SERVICE__VENDOR=keycloak

internal/infrastructures/relational_db/patterns/unit_of_work.py

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,30 @@
11
import abc
2+
import logging
23
from abc import abstractmethod
3-
from typing import Any, Callable, Optional, Type
4+
from typing import Callable, Optional
45

6+
from asyncpg.exceptions import SerializationError
57
from sqlalchemy.ext.asyncio import (
68
AsyncSession,
79
AsyncSessionTransaction,
810
async_scoped_session,
911
)
12+
from tenacity import (
13+
RetryError,
14+
before_sleep_log,
15+
retry,
16+
retry_if_exception_type,
17+
stop_after_attempt,
18+
wait_exponential,
19+
)
1020

1121
from internal.infrastructures.relational_db import CommentRepo, PostRepo, UserRepo
1222
from internal.infrastructures.relational_db.abstraction import (
1323
AbstractCommentRepo,
1424
AbstractPostRepo,
1525
AbstractUserRepo,
1626
)
27+
from utils.logger_utils import get_shared_logger
1728

1829

1930
class AbstractUnitOfWork(abc.ABC):
@@ -59,30 +70,68 @@ def __init__(
5970
self._comment_repo_factory = comment_repo_factory
6071
self._user_repo_factory = user_repo_factory
6172

62-
async def __aenter__(self):
73+
@retry(
74+
stop=stop_after_attempt(3),
75+
wait=wait_exponential(multiplier=0.5, min=0.5, max=5),
76+
retry=retry_if_exception_type(SerializationError),
77+
before_sleep=before_sleep_log(get_shared_logger(), logging.WARNING),
78+
)
79+
async def _init_session(self):
80+
"""Initialize session with retry logic for serialization errors."""
6381
self._session = self._scoped_session_factory()
6482
self._transaction = await self._session.begin()
6583

66-
# register repo
84+
# register repos
6785
self.post_repo = self._post_repo_factory(self._session)
6886
self.comment_repo = self._comment_repo_factory(self._session)
6987
self.user_repo = self._user_repo_factory(self._session)
7088
return self
7189

72-
async def __aexit__(
73-
self,
74-
exc_type: Optional[Type[BaseException]],
75-
exc: Optional[BaseException],
76-
tb: Any,
77-
):
90+
async def __aenter__(self):
91+
try:
92+
return await self._init_session()
93+
except SerializationError as e:
94+
# Ensure resources are cleaned up if all retries fail
95+
if self._session:
96+
await self._session.close()
97+
await self._scoped_session_factory.remove()
98+
raise
99+
100+
@retry(
101+
stop=stop_after_attempt(3),
102+
wait=wait_exponential(multiplier=0.5, min=0.5, max=5),
103+
retry=retry_if_exception_type(SerializationError),
104+
before_sleep=before_sleep_log(get_shared_logger(), logging.WARNING),
105+
)
106+
async def _commit_transaction(self):
107+
"""Commit transaction with retry logic for serialization errors."""
108+
try:
109+
await self._transaction.commit()
110+
except SerializationError:
111+
# Rollback the failed transaction and start a new one for retry
112+
await self._transaction.rollback()
113+
self._transaction = await self._session.begin()
114+
# Re-raise to trigger retry
115+
raise
116+
117+
async def __aexit__(self, exc_type, exc, tb):
118+
logger = get_shared_logger()
119+
78120
try:
79121
if exc_type is None:
80-
await self._transaction.commit()
122+
try:
123+
# Try to commit with retries
124+
await self._commit_transaction()
125+
except RetryError as e:
126+
# All retries exhausted
127+
logger.error(f"Max retries exceeded for commit: {e}")
128+
await self._transaction.rollback()
129+
raise SerializationError(
130+
"Failed to commit transaction after multiple retries"
131+
) from e
81132
else:
133+
# If there was an exception in the context, just rollback
82134
await self._transaction.rollback()
83-
except Exception:
84-
await self._transaction.rollback()
85-
raise
86135
finally:
87136
await self._session.close()
88137
await self._scoped_session_factory.remove()

internal/infrastructures/relational_db/postgres/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@
1919

2020
class PostgresDatabase:
2121
def __init__(
22-
self, db_url: str, enable_log: bool = True, enable_migrations: bool = True
22+
self,
23+
db_url: str,
24+
enable_log: bool = True,
25+
enable_migrations: bool = True,
26+
isolation_level: str = "SERIALIZABLE",
2327
):
2428
self._db_url = db_url
2529
self._enable_log = enable_log
2630
self._enable_migrations = enable_migrations
2731
self._engine = create_async_engine(
2832
url=self._db_url,
2933
echo=self._enable_log,
30-
isolation_level="SERIALIZABLE",
34+
isolation_level=isolation_level,
3135
)
3236

3337
self._session_factory = async_sessionmaker(

internal/patterns/dependency_injection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class Container(containers.DeclarativeContainer):
4444
db_url=config.relational_db.url,
4545
enable_log=config.relational_db.enable_log,
4646
enable_migrations=config.relational_db.enable_auto_migrate,
47+
isolation_level=config.relational_db.isolation_level,
4748
)
4849

4950
relational_db_scoped_session = providers.Resource(

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies = [
2222
"ecs_logging",
2323
"pydantic-settings",
2424
"py-consul",
25+
"tenacity",
2526
]
2627

2728
[tool.uv]

0 commit comments

Comments
 (0)