Skip to content

Commit 9724ba0

Browse files
authored
Merge pull request #35 from febus982/multithread_session
Make session handler thread safe and async task safe
2 parents 79f0f03 + 02ed788 commit 9724ba0

File tree

7 files changed

+151
-119
lines changed

7 files changed

+151
-119
lines changed

docs/lifecycle.md

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,70 +17,37 @@ From SQLAlchemy documentation:
1717
recommends we create `Session` object at the beginning of a logical operation where
1818
database access is potentially anticipated.
1919

20-
The repository is not built for parallel execution, it keeps a `Session` object scoped to
21-
its lifecycle to avoid unnecessary queries, and executes a transaction for each _operation_
22-
to maintain isolation. This means you can create a repository object almost whenever you want,
23-
as long as you don't run parallel operations.
24-
25-
The session in the repository is not thread safe and [is not safe on concurrent asyncio tasks](https://docs.sqlalchemy.org/en/20/orm/session_basics.html#is-the-session-thread-safe-is-asyncsession-safe-to-share-in-concurrent-tasks)
26-
therefore the repository has the same limitations.
27-
28-
This means:
29-
30-
* Do not assign a repository object to a global variable
31-
(Check the [Notes on multithreaded applications](/manager/session/#note-on-multithreaded-applications))
32-
* Do not share a repository instance in multiple threads or processes (e.g. using `asyncio.to_thread`).
33-
* Do not use the same repository in concurrent asyncio task (e.g. using `asyncio.gather`)
34-
35-
Even using multiple repository instances will work fine, however as they will have completely
36-
different sessions, it's likely that the second repository will fire additional SELECT queries
37-
to get the state of the object prior to saving it.
38-
39-
/// details | Example
40-
```python
41-
from sqlalchemy import String
42-
from sqlalchemy.orm import Mapped, mapped_column
43-
from sqlalchemy_bind_manager import SQLAlchemyBindManager ,SQLAlchemyConfig
44-
from sqlalchemy_bind_manager.repository import SQLAlchemyRepository
45-
46-
config = SQLAlchemyConfig(
47-
engine_url="sqlite:///./sqlite.db",
48-
engine_options=dict(connect_args={"check_same_thread": False}, echo=True),
49-
session_options=dict(expire_on_commit=False),
50-
)
51-
52-
sa_manager = SQLAlchemyBindManager(config={})
53-
54-
class MyModel(sa_manager.get_bind().model_declarative_base):
55-
id: Mapped[int] = mapped_column(primary_key=True)
56-
name: Mapped[str] = mapped_column(String(30))
57-
58-
def update_my_model():
59-
# Create 2 instances of the same repository
60-
repo = SQLAlchemyRepository(sa_manager.get_bind(), model_class=MyModel)
61-
repo2 = SQLAlchemyRepository(sa_manager.get_bind(), model_class=MyModel)
62-
63-
o = repo.get(1)
64-
o.name = "John"
65-
66-
repo2.save(o)
67-
68-
update_my_model()
69-
```
70-
///
20+
The repository keeps a `Session` object scoped to its lifecycle to avoid unnecessary queries,
21+
and executes a transaction for each _operation_ to maintain isolation. This means you can create
22+
a repository object almost whenever you want, as long as you don't run parallel operations.
23+
24+
The repository is safe in multithreaded applications and in concurrent asyncio tasks, this means
25+
that potentially you can save it in a global variable, and it will have a different `Session`
26+
in each thread or asyncio task.
27+
28+
Even if the repository can be used with concurrency or parallelism, remember SQLAlchemy models
29+
belong to a single `Session`, so sharing the same models in multiple threads or asyncio tasks
30+
will cause problems.
31+
32+
What you can do is:
7133

72-
The recommendation is of course to try to write your application in a way you cna use
73-
a single repository instance, where possible.
34+
* Save the repositories in global variables and start a thread / asyncio task to handle
35+
a scoped request (e.g. one thread per HTTP request)
36+
37+
What you cannot do is:
38+
39+
* Get a list of models
40+
* Save the models using `save()` in parallel threads / tasks (each task will have a different session)
41+
42+
/// tip | The recommendation is of course to try to use a single repository instance, where possible.
7443

7544
For example a strategy similar to this would be optimal, if possible:
7645

7746
* Create repositories
7847
* Retrieve all the models you need
79-
* Do the changes you need, as per business logic
48+
* Do the changes you need, as per business logic, eventually using multiple threads / tasks
8049
* Save all the changed models as needed
8150

82-
/// tip | Using multiple repository instances is the only way to safely use concurrent asyncio tasks
83-
8451
///
8552

8653
### Unit of work
File renamed without changes.

mkdocs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ nav:
4545
- Session usage: manager/session.md
4646
- Alembic integration: manager/alembic.md
4747
- Repository:
48-
- Repository usage: repository/repository.md
48+
- Repository usage: repository/usage.md
4949
- Unit of work: repository/uow.md
5050
- Components life cycle: lifecycle.md
5151

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
from contextlib import asynccontextmanager, contextmanager
33
from typing import AsyncIterator, Iterator
4-
from uuid import uuid4
54

65
from sqlalchemy.ext.asyncio import (
76
AsyncSession,
@@ -17,91 +16,87 @@
1716

1817

1918
class SessionHandler:
20-
_session_class: scoped_session
21-
session: Session
19+
scoped_session: scoped_session
2220

2321
def __init__(self, bind: SQLAlchemyBind):
2422
if not isinstance(bind, SQLAlchemyBind):
2523
raise UnsupportedBind("Bind is not an instance of SQLAlchemyBind")
2624
else:
27-
u = uuid4()
28-
self._session_class = scoped_session(
29-
bind.session_class, scopefunc=lambda: str(u)
30-
)
31-
self.session = self._session_class()
25+
self.scoped_session = scoped_session(bind.session_class)
3226

3327
def __del__(self):
34-
if getattr(self, "_session_class", None):
35-
self._session_class.remove()
28+
if getattr(self, "scoped_session", None):
29+
self.scoped_session.remove()
3630

3731
@contextmanager
3832
def get_session(self, read_only: bool = False) -> Iterator[Session]:
33+
session = self.scoped_session()
3934
try:
40-
self.session.begin()
41-
yield self.session
35+
session.begin()
36+
yield session
4237
if not read_only:
43-
self.commit()
38+
self.commit(session)
4439
finally:
45-
self.session.close()
40+
session.close()
4641

47-
def commit(self) -> None:
42+
def commit(self, session: Session) -> None:
4843
"""Commits the session and handles rollback on errors.
4944
45+
:param session: The session object.
46+
:type session: Session
5047
:raises Exception: Any error is re-raised after the rollback.
5148
"""
5249
try:
53-
self.session.commit()
50+
session.commit()
5451
except:
55-
self.session.rollback()
52+
session.rollback()
5653
raise
5754

5855

5956
class AsyncSessionHandler:
60-
_session_class: async_scoped_session
61-
session: AsyncSession
57+
scoped_session: async_scoped_session
6258

6359
def __init__(self, bind: SQLAlchemyAsyncBind):
6460
if not isinstance(bind, SQLAlchemyAsyncBind):
6561
raise UnsupportedBind("Bind is not an instance of SQLAlchemyAsyncBind")
6662
else:
67-
u = uuid4()
68-
self._session_class = async_scoped_session(
69-
bind.session_class, scopefunc=lambda: str(u)
63+
self.scoped_session = async_scoped_session(
64+
bind.session_class, asyncio.current_task
7065
)
71-
self.session = self._session_class()
7266

7367
def __del__(self):
74-
if not getattr(self, "_session_class", None):
68+
if not getattr(self, "scoped_session", None):
7569
return
7670

7771
try:
7872
loop = asyncio.get_event_loop()
7973
if loop.is_running():
80-
loop.create_task(self._session_class.remove())
74+
loop.create_task(self.scoped_session.remove())
8175
else:
82-
loop.run_until_complete(self._session_class.remove())
76+
loop.run_until_complete(self.scoped_session.remove())
8377
except RuntimeError:
84-
asyncio.run(self._session_class.remove())
78+
asyncio.run(self.scoped_session.remove())
8579

8680
@asynccontextmanager
8781
async def get_session(self, read_only: bool = False) -> AsyncIterator[AsyncSession]:
82+
session = self.scoped_session()
8883
try:
89-
await self.session.begin()
90-
yield self.session
84+
await session.begin()
85+
yield session
9186
if not read_only:
92-
await self.commit()
87+
await self.commit(session)
9388
finally:
94-
await self.session.close()
89+
await session.close()
9590

96-
async def commit(self) -> None:
91+
async def commit(self, session: AsyncSession) -> None:
9792
"""Commits the session and handles rollback on errors.
9893
9994
:param session: The session object.
100-
:type session: Session
95+
:type session: AsyncSession
10196
:raises Exception: Any error is re-raised after the rollback.
10297
"""
10398
try:
104-
await self.session.commit()
99+
await session.commit()
105100
except:
106-
await self.session.rollback()
101+
await session.rollback()
107102
raise

sqlalchemy_bind_manager/_unit_of_work/__init__.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,36 @@
1616

1717

1818
class UnitOfWork:
19-
_transaction_handler: SessionHandler
19+
_session_handler: SessionHandler
2020

2121
def __init__(
2222
self, bind: SQLAlchemyBind, repositories: Iterable[Type[SQLAlchemyRepository]]
2323
) -> None:
2424
super().__init__()
25-
self._transaction_handler = SessionHandler(bind)
25+
self._session_handler = SessionHandler(bind)
2626
for r in repositories:
27-
setattr(self, r.__name__, r(session=self._transaction_handler.session))
27+
setattr(self, r.__name__, r(session=self._session_handler.scoped_session()))
2828

2929
@contextmanager
3030
def transaction(self, read_only: bool = False) -> Iterator[Session]:
31-
with self._transaction_handler.get_session(read_only=read_only) as _s:
31+
with self._session_handler.get_session(read_only=read_only) as _s:
3232
yield _s
3333

3434

3535
class AsyncUnitOfWork:
36-
_transaction_handler: AsyncSessionHandler
36+
_session_handler: AsyncSessionHandler
3737

3838
def __init__(
3939
self,
4040
bind: SQLAlchemyAsyncBind,
4141
repositories: Iterable[Type[SQLAlchemyAsyncRepository]],
4242
) -> None:
4343
super().__init__()
44-
self._transaction_handler = AsyncSessionHandler(bind)
44+
self._session_handler = AsyncSessionHandler(bind)
4545
for r in repositories:
46-
setattr(self, r.__name__, r(session=self._transaction_handler.session))
46+
setattr(self, r.__name__, r(session=self._session_handler.scoped_session()))
4747

4848
@asynccontextmanager
4949
async def transaction(self, read_only: bool = False) -> AsyncIterator[AsyncSession]:
50-
async with self._transaction_handler.get_session(read_only=read_only) as _s:
50+
async with self._session_handler.get_session(read_only=read_only) as _s:
5151
yield _s

0 commit comments

Comments
 (0)