Skip to content

Commit 23a3f3c

Browse files
authored
PYTHON-4482 Improve performance by making _ServerSessionPool lock-free (#1660) (#1671)
1 parent 11b3f9a commit 23a3f3c

File tree

4 files changed

+33
-43
lines changed

4 files changed

+33
-43
lines changed

pymongo/client_session.py

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -515,9 +515,6 @@ def end_session(self) -> None:
515515
516516
It is an error to use the session after the session has ended.
517517
"""
518-
self._end_session(lock=True)
519-
520-
def _end_session(self, lock: bool) -> None:
521518
if self._server_session is not None:
522519
try:
523520
if self.in_transaction:
@@ -526,7 +523,7 @@ def _end_session(self, lock: bool) -> None:
526523
# is in the committed state when the session is discarded.
527524
self._unpin()
528525
finally:
529-
self._client._return_server_session(self._server_session, lock)
526+
self._client._return_server_session(self._server_session)
530527
self._server_session = None
531528

532529
def _check_ended(self) -> None:
@@ -537,7 +534,7 @@ def __enter__(self) -> ClientSession:
537534
return self
538535

539536
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
540-
self._end_session(lock=True)
537+
self.end_session()
541538

542539
@property
543540
def client(self) -> MongoClient:
@@ -1097,7 +1094,7 @@ def inc_transaction_id(self) -> None:
10971094
class _ServerSessionPool(collections.deque):
10981095
"""Pool of _ServerSession objects.
10991096
1100-
This class is not thread-safe, access it while holding the Topology lock.
1097+
This class is thread-safe.
11011098
"""
11021099

11031100
def __init__(self, *args: Any, **kwargs: Any):
@@ -1110,8 +1107,11 @@ def reset(self) -> None:
11101107

11111108
def pop_all(self) -> list[_ServerSession]:
11121109
ids = []
1113-
while self:
1114-
ids.append(self.pop().session_id)
1110+
while True:
1111+
try:
1112+
ids.append(self.pop().session_id)
1113+
except IndexError:
1114+
break
11151115
return ids
11161116

11171117
def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession:
@@ -1123,33 +1123,30 @@ def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerS
11231123
self._clear_stale(session_timeout_minutes)
11241124

11251125
# The most recently used sessions are on the left.
1126-
while self:
1127-
s = self.popleft()
1126+
while True:
1127+
try:
1128+
s = self.popleft()
1129+
except IndexError:
1130+
break
11281131
if not s.timed_out(session_timeout_minutes):
11291132
return s
11301133

11311134
return _ServerSession(self.generation)
11321135

1133-
def return_server_session(
1134-
self, server_session: _ServerSession, session_timeout_minutes: Optional[int]
1135-
) -> None:
1136-
if session_timeout_minutes is not None:
1137-
self._clear_stale(session_timeout_minutes)
1138-
if server_session.timed_out(session_timeout_minutes):
1139-
return
1140-
self.return_server_session_no_lock(server_session)
1141-
1142-
def return_server_session_no_lock(self, server_session: _ServerSession) -> None:
1136+
def return_server_session(self, server_session: _ServerSession) -> None:
11431137
# Discard sessions from an old pool to avoid duplicate sessions in the
11441138
# child process after a fork.
11451139
if server_session.generation == self.generation and not server_session.dirty:
11461140
self.appendleft(server_session)
11471141

11481142
def _clear_stale(self, session_timeout_minutes: Optional[int]) -> None:
11491143
# Clear stale sessions. The least recently used are on the right.
1150-
while self:
1151-
if self[-1].timed_out(session_timeout_minutes):
1152-
self.pop()
1153-
else:
1144+
while True:
1145+
try:
1146+
s = self.pop()
1147+
except IndexError:
1148+
break
1149+
if not s.timed_out(session_timeout_minutes):
1150+
self.append(s)
11541151
# The remaining sessions also haven't timed out.
11551152
break

pymongo/command_cursor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def __init__(
7373
self.__killed = self.__id == 0
7474
self.__comment = comment
7575
if self.__killed:
76-
self.__end_session(True)
76+
self.__end_session()
7777

7878
if "ns" in cursor_info: # noqa: SIM401
7979
self.__ns = cursor_info["ns"]
@@ -112,9 +112,9 @@ def __die(self, synchronous: bool = False) -> None:
112112
self.__session = None
113113
self.__sock_mgr = None
114114

115-
def __end_session(self, synchronous: bool) -> None:
115+
def __end_session(self) -> None:
116116
if self.__session and not self.__explicit_session:
117-
self.__session._end_session(lock=synchronous)
117+
self.__session.end_session()
118118
self.__session = None
119119

120120
def close(self) -> None:

pymongo/mongo_client.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,8 @@ def target() -> bool:
907907
def _after_fork(self) -> None:
908908
"""Resets topology in a child after successfully forking."""
909909
self._init_background(self._topology._pid)
910+
# Reset the session pool to avoid duplicate sessions in the child process.
911+
self._topology._session_pool.reset()
910912

911913
def _duplicate(self, **kwargs: Any) -> MongoClient:
912914
args = self.__init_kwargs.copy()
@@ -1679,7 +1681,7 @@ def _cleanup_cursor(
16791681
if cursor_id or conn_mgr:
16801682
self._close_cursor_soon(cursor_id, address, conn_mgr)
16811683
if session and not explicit_session:
1682-
session._end_session(lock=locks_allowed)
1684+
session.end_session()
16831685

16841686
def _close_cursor_soon(
16851687
self,
@@ -1838,12 +1840,12 @@ def start_session(
18381840
)
18391841

18401842
def _return_server_session(
1841-
self, server_session: Union[_ServerSession, _EmptyServerSession], lock: bool
1843+
self, server_session: Union[_ServerSession, _EmptyServerSession]
18421844
) -> None:
18431845
"""Internal: return a _ServerSession to the pool."""
18441846
if isinstance(server_session, _EmptyServerSession):
18451847
return None
1846-
return self._topology.return_server_session(server_session, lock)
1848+
return self._topology.return_server_session(server_session)
18471849

18481850
def _ensure_session(self, session: Optional[ClientSession] = None) -> Optional[ClientSession]:
18491851
"""If provided session is None, lend a temporary session."""

pymongo/topology.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -669,23 +669,14 @@ def description(self) -> TopologyDescription:
669669

670670
def pop_all_sessions(self) -> list[_ServerSession]:
671671
"""Pop all session ids from the pool."""
672-
with self._lock:
673-
return self._session_pool.pop_all()
672+
return self._session_pool.pop_all()
674673

675674
def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession:
676675
"""Start or resume a server session, or raise ConfigurationError."""
677-
with self._lock:
678-
return self._session_pool.get_server_session(session_timeout_minutes)
676+
return self._session_pool.get_server_session(session_timeout_minutes)
679677

680-
def return_server_session(self, server_session: _ServerSession, lock: bool) -> None:
681-
if lock:
682-
with self._lock:
683-
self._session_pool.return_server_session(
684-
server_session, self._description.logical_session_timeout_minutes
685-
)
686-
else:
687-
# Called from a __del__ method, can't use a lock.
688-
self._session_pool.return_server_session_no_lock(server_session)
678+
def return_server_session(self, server_session: _ServerSession) -> None:
679+
self._session_pool.return_server_session(server_session)
689680

690681
def _new_selection(self) -> Selection:
691682
"""A Selection object, initially including all known servers.

0 commit comments

Comments
 (0)