Skip to content
17 changes: 10 additions & 7 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class AsyncConnection:
:param pool: a Pool instance
:param address: the server's (host, port)
:param id: the id of this socket in it's pool
:param is_sdam: SDAM connections do not call hello on creation
"""

def __init__(
Expand All @@ -139,11 +140,13 @@ def __init__(
pool: Pool,
address: tuple[str, int],
id: int,
is_sdam: bool,
):
self.pool_ref = weakref.ref(pool)
self.conn = conn
self.address = address
self.id = id
self.is_sdam = is_sdam
self.closed = False
self.last_checkin_time = time.monotonic()
self.performed_handshake = False
Expand Down Expand Up @@ -711,13 +714,13 @@ def __init__(
self,
address: _Address,
options: PoolOptions,
handshake: bool = True,
is_sdam: bool = False,
client_id: Optional[ObjectId] = None,
):
"""
:param address: a (hostname, port) tuple
:param options: a PoolOptions instance
:param handshake: whether to call hello for each new AsyncConnection
:param is_sdam: whether to call hello for each new AsyncConnection
"""
if options.pause_enabled:
self.state = PoolState.PAUSED
Expand Down Expand Up @@ -746,14 +749,14 @@ def __init__(
self.pid = os.getpid()
self.address = address
self.opts = options
self.handshake = handshake
self.is_sdam = is_sdam
# Don't publish events or logs in Monitor pools.
self.enabled_for_cmap = (
self.handshake
not self.is_sdam
and self.opts._event_listeners is not None
and self.opts._event_listeners.enabled_for_cmap
)
self.enabled_for_logging = self.handshake
self.enabled_for_logging = not self.is_sdam

# The first portion of the wait queue.
# Enforces: maxPoolSize
Expand Down Expand Up @@ -1058,14 +1061,14 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A

raise

conn = AsyncConnection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type]
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
async with self.lock:
self.active_contexts.add(conn.cancel_context)
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
try:
if self.handshake:
if not self.is_sdam:
await conn.hello()
self.is_writable = conn.is_writable
if handler:
Expand Down
2 changes: 1 addition & 1 deletion pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool:
)

return self._settings.pool_class(
address, monitor_pool_options, handshake=False, client_id=self._topology_id
address, monitor_pool_options, is_sdam=True, client_id=self._topology_id
)

def _error_message(self, selector: Callable[[Selection], Selection]) -> str:
Expand Down
7 changes: 6 additions & 1 deletion pymongo/network_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,12 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me
except socket.timeout:
if conn.cancel_context.cancelled:
raise _OperationCancelled("operation cancelled") from None
if _PYPY:
if (
_PYPY
or not conn.is_sdam
and deadline is not None
and deadline - time.monotonic() < 0
):
Copy link
Member

@ShaneHarvey ShaneHarvey May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want sigstop/sigcont to work the same regardless of what env variables are defined. What if we take an alternative approach here and replace not _is_faas() with not conn.is_sdam? IE only do the extra non-blocking read on SDAM connections.

Will that still fix the flaky tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the flaky tests still fixed after the is_sdam change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# We reached the true deadline.
raise
continue
Expand Down
17 changes: 10 additions & 7 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class Connection:
:param pool: a Pool instance
:param address: the server's (host, port)
:param id: the id of this socket in it's pool
:param is_sdam: SDAM connections do not call hello on creation
"""

def __init__(
Expand All @@ -139,11 +140,13 @@ def __init__(
pool: Pool,
address: tuple[str, int],
id: int,
is_sdam: bool,
):
self.pool_ref = weakref.ref(pool)
self.conn = conn
self.address = address
self.id = id
self.is_sdam = is_sdam
self.closed = False
self.last_checkin_time = time.monotonic()
self.performed_handshake = False
Expand Down Expand Up @@ -709,13 +712,13 @@ def __init__(
self,
address: _Address,
options: PoolOptions,
handshake: bool = True,
is_sdam: bool = False,
client_id: Optional[ObjectId] = None,
):
"""
:param address: a (hostname, port) tuple
:param options: a PoolOptions instance
:param handshake: whether to call hello for each new Connection
:param is_sdam: whether to call hello for each new Connection
"""
if options.pause_enabled:
self.state = PoolState.PAUSED
Expand Down Expand Up @@ -744,14 +747,14 @@ def __init__(
self.pid = os.getpid()
self.address = address
self.opts = options
self.handshake = handshake
self.is_sdam = is_sdam
# Don't publish events or logs in Monitor pools.
self.enabled_for_cmap = (
self.handshake
not self.is_sdam
and self.opts._event_listeners is not None
and self.opts._event_listeners.enabled_for_cmap
)
self.enabled_for_logging = self.handshake
self.enabled_for_logging = not self.is_sdam

# The first portion of the wait queue.
# Enforces: maxPoolSize
Expand Down Expand Up @@ -1054,14 +1057,14 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect

raise

conn = Connection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type]
conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
with self.lock:
self.active_contexts.add(conn.cancel_context)
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
try:
if self.handshake:
if not self.is_sdam:
conn.hello()
self.is_writable = conn.is_writable
if handler:
Expand Down
2 changes: 1 addition & 1 deletion pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool:
)

return self._settings.pool_class(
address, monitor_pool_options, handshake=False, client_id=self._topology_id
address, monitor_pool_options, is_sdam=True, client_id=self._topology_id
)

def _error_message(self, selector: Callable[[Selection], Selection]) -> str:
Expand Down
3 changes: 2 additions & 1 deletion test/asynchronous/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def __init__(self):
self.cancel_context = _CancellationContext()
self.more_to_come = False
self.id = random.randint(0, 100)
self.is_sdam = False
self.server_connection_id = random.randint(0, 100)

def close_conn(self, reason):
Expand All @@ -172,7 +173,7 @@ def __aexit__(self, exc_type, exc_val, exc_tb):


class AsyncMockPool:
def __init__(self, address, options, handshake=True, client_id=None):
def __init__(self, address, options, is_sdam=False, client_id=None):
self.gen = _PoolGeneration()
self._lock = _async_create_lock()
self.opts = options
Expand Down
2 changes: 1 addition & 1 deletion test/test_topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def test_timeout_configuration(self):
self.assertEqual(1, monitor._pool.opts.socket_timeout)

# The monitor, not its pool, is responsible for calling hello.
self.assertFalse(monitor._pool.handshake)
self.assertTrue(monitor._pool.is_sdam)


class TestSingleServerTopology(TopologyTest):
Expand Down
3 changes: 2 additions & 1 deletion test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def __init__(self):
self.cancel_context = _CancellationContext()
self.more_to_come = False
self.id = random.randint(0, 100)
self.is_sdam = False
self.server_connection_id = random.randint(0, 100)

def close_conn(self, reason):
Expand All @@ -170,7 +171,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):


class MockPool:
def __init__(self, address, options, handshake=True, client_id=None):
def __init__(self, address, options, is_sdam=False, client_id=None):
self.gen = _PoolGeneration()
self._lock = _create_lock()
self.opts = options
Expand Down
Loading