Skip to content

Commit 8c35d1e

Browse files
authored
PYTHON-4347 Improve performance by making _ServerSessionPool lock-free (mongodb#1660)
1 parent f7d2deb commit 8c35d1e

File tree

9 files changed

+70
-84
lines changed

9 files changed

+70
-84
lines changed

pymongo/asynchronous/bulk.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ async def _execute_command(
378378
if retryable and not self.started_retryable_write:
379379
session._start_retryable_write()
380380
self.started_retryable_write = True
381-
await session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn)
381+
session._apply_to(cmd, retryable, ReadPreference.PRIMARY, conn)
382382
conn.send_cluster_time(cmd, session, client)
383383
conn.add_server_api(cmd)
384384
# CSOT: apply timeout before encoding the command.

pymongo/asynchronous/client_session.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ async def _end_session(self, lock: bool) -> None:
528528
# is in the committed state when the session is discarded.
529529
await self._unpin()
530530
finally:
531-
await self._client._return_server_session(self._server_session, lock)
531+
self._client._return_server_session(self._server_session)
532532
self._server_session = None
533533

534534
def _check_ended(self) -> None:
@@ -557,13 +557,13 @@ def options(self) -> SessionOptions:
557557
async def session_id(self) -> Mapping[str, Any]:
558558
"""A BSON document, the opaque server session identifier."""
559559
self._check_ended()
560-
await self._materialize(self._client.topology_description.logical_session_timeout_minutes)
560+
self._materialize(self._client.topology_description.logical_session_timeout_minutes)
561561
return self._server_session.session_id
562562

563563
@property
564564
async def _transaction_id(self) -> Int64:
565565
"""The current transaction id for the underlying server session."""
566-
await self._materialize(self._client.topology_description.logical_session_timeout_minutes)
566+
self._materialize(self._client.topology_description.logical_session_timeout_minutes)
567567
return self._server_session.transaction_id
568568

569569
@property
@@ -979,16 +979,16 @@ def _txn_read_preference(self) -> Optional[_ServerMode]:
979979
return self._transaction.opts.read_preference
980980
return None
981981

982-
async def _materialize(self, logical_session_timeout_minutes: Optional[int] = None) -> None:
982+
def _materialize(self, logical_session_timeout_minutes: Optional[int] = None) -> None:
983983
if isinstance(self._server_session, _EmptyServerSession):
984984
old = self._server_session
985-
self._server_session = await self._client._topology.get_server_session(
985+
self._server_session = self._client._topology.get_server_session(
986986
logical_session_timeout_minutes
987987
)
988988
if old.started_retryable_write:
989989
self._server_session.inc_transaction_id()
990990

991-
async def _apply_to(
991+
def _apply_to(
992992
self,
993993
command: MutableMapping[str, Any],
994994
is_retryable: bool,
@@ -1000,7 +1000,7 @@ async def _apply_to(
10001000
raise ConfigurationError("Sessions are not supported by this MongoDB deployment")
10011001
return
10021002
self._check_ended()
1003-
await self._materialize(conn.logical_session_timeout_minutes)
1003+
self._materialize(conn.logical_session_timeout_minutes)
10041004
if self.options.snapshot:
10051005
self._update_read_concern(command, conn)
10061006

@@ -1103,7 +1103,7 @@ def inc_transaction_id(self) -> None:
11031103
class _ServerSessionPool(collections.deque):
11041104
"""Pool of _ServerSession objects.
11051105
1106-
This class is not thread-safe, access it while holding the Topology lock.
1106+
This class is thread-safe.
11071107
"""
11081108

11091109
def __init__(self, *args: Any, **kwargs: Any):
@@ -1116,8 +1116,11 @@ def reset(self) -> None:
11161116

11171117
def pop_all(self) -> list[_ServerSession]:
11181118
ids = []
1119-
while self:
1120-
ids.append(self.pop().session_id)
1119+
while True:
1120+
try:
1121+
ids.append(self.pop().session_id)
1122+
except IndexError:
1123+
break
11211124
return ids
11221125

11231126
def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession:
@@ -1129,33 +1132,30 @@ def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerS
11291132
self._clear_stale(session_timeout_minutes)
11301133

11311134
# The most recently used sessions are on the left.
1132-
while self:
1133-
s = self.popleft()
1135+
while True:
1136+
try:
1137+
s = self.popleft()
1138+
except IndexError:
1139+
break
11341140
if not s.timed_out(session_timeout_minutes):
11351141
return s
11361142

11371143
return _ServerSession(self.generation)
11381144

1139-
def return_server_session(
1140-
self, server_session: _ServerSession, session_timeout_minutes: Optional[int]
1141-
) -> None:
1142-
if session_timeout_minutes is not None:
1143-
self._clear_stale(session_timeout_minutes)
1144-
if server_session.timed_out(session_timeout_minutes):
1145-
return
1146-
self.return_server_session_no_lock(server_session)
1147-
1148-
def return_server_session_no_lock(self, server_session: _ServerSession) -> None:
1145+
def return_server_session(self, server_session: _ServerSession) -> None:
11491146
# Discard sessions from an old pool to avoid duplicate sessions in the
11501147
# child process after a fork.
11511148
if server_session.generation == self.generation and not server_session.dirty:
11521149
self.appendleft(server_session)
11531150

11541151
def _clear_stale(self, session_timeout_minutes: Optional[int]) -> None:
11551152
# Clear stale sessions. The least recently used are on the right.
1156-
while self:
1157-
if self[-1].timed_out(session_timeout_minutes):
1158-
self.pop()
1159-
else:
1153+
while True:
1154+
try:
1155+
s = self.pop()
1156+
except IndexError:
1157+
break
1158+
if not s.timed_out(session_timeout_minutes):
1159+
self.append(s)
11601160
# The remaining sessions also haven't timed out.
11611161
break

pymongo/asynchronous/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ async def as_command(
394394
session = self.session
395395
conn.add_server_api(cmd)
396396
if session:
397-
await session._apply_to(cmd, False, self.read_preference, conn)
397+
session._apply_to(cmd, False, self.read_preference, conn)
398398
# Explain does not support readConcern.
399399
if not explain and not session.in_transaction:
400400
session._update_read_concern(cmd, conn)
@@ -546,7 +546,7 @@ async def as_command(
546546
conn,
547547
)
548548
if self.session:
549-
await self.session._apply_to(cmd, False, self.read_preference, conn)
549+
self.session._apply_to(cmd, False, self.read_preference, conn)
550550
conn.add_server_api(cmd)
551551
conn.send_cluster_time(cmd, self.session, self.client)
552552
# Support auto encryption

pymongo/asynchronous/mongo_client.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,8 @@ def _should_pin_cursor(self, session: Optional[ClientSession]) -> Optional[bool]
902902
def _after_fork(self) -> None:
903903
"""Resets topology in a child after successfully forking."""
904904
self._init_background()
905+
# Reset the session pool to avoid duplicate sessions in the child process.
906+
self._topology._session_pool.reset()
905907

906908
def _duplicate(self, **kwargs: Any) -> AsyncMongoClient:
907909
args = self._init_kwargs.copy()
@@ -1508,7 +1510,7 @@ async def close(self) -> None:
15081510
.. versionchanged:: 3.6
15091511
End all server sessions created by this client.
15101512
"""
1511-
session_ids = await self._topology.pop_all_sessions()
1513+
session_ids = self._topology.pop_all_sessions()
15121514
if session_ids:
15131515
await self._end_sessions(session_ids)
15141516
# Stop the periodic task thread and then send pending killCursor
@@ -2006,13 +2008,13 @@ async def _process_periodic_tasks(self) -> None:
20062008
else:
20072009
helpers._handle_exception()
20082010

2009-
async def _return_server_session(
2010-
self, server_session: Union[_ServerSession, _EmptyServerSession], lock: bool
2011+
def _return_server_session(
2012+
self, server_session: Union[_ServerSession, _EmptyServerSession]
20112013
) -> None:
20122014
"""Internal: return a _ServerSession to the pool."""
20132015
if isinstance(server_session, _EmptyServerSession):
20142016
return None
2015-
return await self._topology.return_server_session(server_session, lock)
2017+
return self._topology.return_server_session(server_session)
20162018

20172019
@contextlib.asynccontextmanager
20182020
async def _tmp_session(

pymongo/asynchronous/pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,7 @@ async def command(
982982

983983
self.add_server_api(spec)
984984
if session:
985-
await session._apply_to(spec, retryable_write, read_preference, self)
985+
session._apply_to(spec, retryable_write, read_preference, self)
986986
self.send_cluster_time(spec, session, client)
987987
listeners = self.listeners if publish_events else None
988988
unacknowledged = bool(write_concern and not write_concern.acknowledged)

pymongo/asynchronous/topology.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -671,25 +671,16 @@ async def close(self) -> None:
671671
def description(self) -> TopologyDescription:
672672
return self._description
673673

674-
async def pop_all_sessions(self) -> list[_ServerSession]:
674+
def pop_all_sessions(self) -> list[_ServerSession]:
675675
"""Pop all session ids from the pool."""
676-
async with self._lock:
677-
return self._session_pool.pop_all()
676+
return self._session_pool.pop_all()
678677

679-
async def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession:
678+
def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession:
680679
"""Start or resume a server session, or raise ConfigurationError."""
681-
async with self._lock:
682-
return self._session_pool.get_server_session(session_timeout_minutes)
680+
return self._session_pool.get_server_session(session_timeout_minutes)
683681

684-
async def return_server_session(self, server_session: _ServerSession, lock: bool) -> None:
685-
if lock:
686-
async with self._lock:
687-
self._session_pool.return_server_session(
688-
server_session, self._description.logical_session_timeout_minutes
689-
)
690-
else:
691-
# Called from a __del__ method, can't use a lock.
692-
self._session_pool.return_server_session_no_lock(server_session)
682+
def return_server_session(self, server_session: _ServerSession) -> None:
683+
self._session_pool.return_server_session(server_session)
693684

694685
def _new_selection(self) -> Selection:
695686
"""A Selection object, initially including all known servers.

pymongo/synchronous/client_session.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ def _end_session(self, lock: bool) -> None:
528528
# is in the committed state when the session is discarded.
529529
self._unpin()
530530
finally:
531-
self._client._return_server_session(self._server_session, lock)
531+
self._client._return_server_session(self._server_session)
532532
self._server_session = None
533533

534534
def _check_ended(self) -> None:
@@ -1099,7 +1099,7 @@ def inc_transaction_id(self) -> None:
10991099
class _ServerSessionPool(collections.deque):
11001100
"""Pool of _ServerSession objects.
11011101
1102-
This class is not thread-safe, access it while holding the Topology lock.
1102+
This class is thread-safe.
11031103
"""
11041104

11051105
def __init__(self, *args: Any, **kwargs: Any):
@@ -1112,8 +1112,11 @@ def reset(self) -> None:
11121112

11131113
def pop_all(self) -> list[_ServerSession]:
11141114
ids = []
1115-
while self:
1116-
ids.append(self.pop().session_id)
1115+
while True:
1116+
try:
1117+
ids.append(self.pop().session_id)
1118+
except IndexError:
1119+
break
11171120
return ids
11181121

11191122
def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession:
@@ -1125,33 +1128,30 @@ def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerS
11251128
self._clear_stale(session_timeout_minutes)
11261129

11271130
# The most recently used sessions are on the left.
1128-
while self:
1129-
s = self.popleft()
1131+
while True:
1132+
try:
1133+
s = self.popleft()
1134+
except IndexError:
1135+
break
11301136
if not s.timed_out(session_timeout_minutes):
11311137
return s
11321138

11331139
return _ServerSession(self.generation)
11341140

1135-
def return_server_session(
1136-
self, server_session: _ServerSession, session_timeout_minutes: Optional[int]
1137-
) -> None:
1138-
if session_timeout_minutes is not None:
1139-
self._clear_stale(session_timeout_minutes)
1140-
if server_session.timed_out(session_timeout_minutes):
1141-
return
1142-
self.return_server_session_no_lock(server_session)
1143-
1144-
def return_server_session_no_lock(self, server_session: _ServerSession) -> None:
1141+
def return_server_session(self, server_session: _ServerSession) -> None:
11451142
# Discard sessions from an old pool to avoid duplicate sessions in the
11461143
# child process after a fork.
11471144
if server_session.generation == self.generation and not server_session.dirty:
11481145
self.appendleft(server_session)
11491146

11501147
def _clear_stale(self, session_timeout_minutes: Optional[int]) -> None:
11511148
# Clear stale sessions. The least recently used are on the right.
1152-
while self:
1153-
if self[-1].timed_out(session_timeout_minutes):
1154-
self.pop()
1155-
else:
1149+
while True:
1150+
try:
1151+
s = self.pop()
1152+
except IndexError:
1153+
break
1154+
if not s.timed_out(session_timeout_minutes):
1155+
self.append(s)
11561156
# The remaining sessions also haven't timed out.
11571157
break

pymongo/synchronous/mongo_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,8 @@ def _should_pin_cursor(self, session: Optional[ClientSession]) -> Optional[bool]
901901
def _after_fork(self) -> None:
902902
"""Resets topology in a child after successfully forking."""
903903
self._init_background()
904+
# Reset the session pool to avoid duplicate sessions in the child process.
905+
self._topology._session_pool.reset()
904906

905907
def _duplicate(self, **kwargs: Any) -> MongoClient:
906908
args = self._init_kwargs.copy()
@@ -2004,12 +2006,12 @@ def _process_periodic_tasks(self) -> None:
20042006
helpers._handle_exception()
20052007

20062008
def _return_server_session(
2007-
self, server_session: Union[_ServerSession, _EmptyServerSession], lock: bool
2009+
self, server_session: Union[_ServerSession, _EmptyServerSession]
20082010
) -> None:
20092011
"""Internal: return a _ServerSession to the pool."""
20102012
if isinstance(server_session, _EmptyServerSession):
20112013
return None
2012-
return self._topology.return_server_session(server_session, lock)
2014+
return self._topology.return_server_session(server_session)
20132015

20142016
@contextlib.contextmanager
20152017
def _tmp_session(

pymongo/synchronous/topology.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -671,23 +671,14 @@ def description(self) -> TopologyDescription:
671671

672672
def pop_all_sessions(self) -> list[_ServerSession]:
673673
"""Pop all session ids from the pool."""
674-
with self._lock:
675-
return self._session_pool.pop_all()
674+
return self._session_pool.pop_all()
676675

677676
def get_server_session(self, session_timeout_minutes: Optional[int]) -> _ServerSession:
678677
"""Start or resume a server session, or raise ConfigurationError."""
679-
with self._lock:
680-
return self._session_pool.get_server_session(session_timeout_minutes)
678+
return self._session_pool.get_server_session(session_timeout_minutes)
681679

682-
def return_server_session(self, server_session: _ServerSession, lock: bool) -> None:
683-
if lock:
684-
with self._lock:
685-
self._session_pool.return_server_session(
686-
server_session, self._description.logical_session_timeout_minutes
687-
)
688-
else:
689-
# Called from a __del__ method, can't use a lock.
690-
self._session_pool.return_server_session_no_lock(server_session)
680+
def return_server_session(self, server_session: _ServerSession) -> None:
681+
self._session_pool.return_server_session(server_session)
691682

692683
def _new_selection(self) -> Selection:
693684
"""A Selection object, initially including all known servers.

0 commit comments

Comments
 (0)