Skip to content

Commit cf27d65

Browse files
committed
WIP exhaust cursors should return all data in first read
1 parent 482485d commit cf27d65

22 files changed

+232
-153
lines changed

pymongo/asynchronous/mongo_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1951,7 +1951,7 @@ async def _cleanup_cursor_lock(
19511951
# exhausted the result set we *must* close the socket
19521952
# to stop the server from sending more data.
19531953
assert conn_mgr.conn is not None
1954-
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
1954+
await conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
19551955
else:
19561956
await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr)
19571957
if conn_mgr:

pymongo/asynchronous/network.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,10 @@ async def command(
195195
reply = None
196196
response_doc: _DocumentOut = {"ok": 1}
197197
else:
198-
reply = await async_receive_message(conn, request_id)
198+
if "dropDatabase" in spec:
199+
reply = await async_receive_message(conn, request_id, debug=True)
200+
else:
201+
reply = await async_receive_message(conn, request_id)
199202
conn.more_to_come = reply.more_to_come
200203
unpacked_docs = reply.unpack_response(
201204
codec_options=codec_options, user_fields=user_fields

pymongo/asynchronous/pool.py

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ async def unpin(self) -> None:
239239
if pool:
240240
await pool.checkin(self)
241241
else:
242-
self.close_conn(ConnectionClosedReason.STALE)
242+
await self.close_conn(ConnectionClosedReason.STALE)
243243

244244
def hello_cmd(self) -> dict[str, Any]:
245245
# Handshake spec requires us to use OP_MSG+hello command for the
@@ -438,7 +438,7 @@ async def command(
438438
raise
439439
# Catch socket.error, KeyboardInterrupt, etc. and close ourselves.
440440
except BaseException as error:
441-
self._raise_connection_failure(error)
441+
await self._raise_connection_failure(error)
442442

443443
async def send_message(self, message: bytes, max_doc_size: int) -> None:
444444
"""Send a raw BSON message or raise ConnectionFailure.
@@ -454,7 +454,7 @@ async def send_message(self, message: bytes, max_doc_size: int) -> None:
454454
try:
455455
await async_sendall(self.conn.get_conn, message)
456456
except BaseException as error:
457-
self._raise_connection_failure(error)
457+
await self._raise_connection_failure(error)
458458

459459
async def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]:
460460
"""Receive a raw BSON message or raise ConnectionFailure.
@@ -464,7 +464,7 @@ async def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _O
464464
try:
465465
return await async_receive_message(self, request_id, self.max_message_size)
466466
except BaseException as error:
467-
self._raise_connection_failure(error)
467+
await self._raise_connection_failure(error)
468468

469469
def _raise_if_not_writable(self, unacknowledged: bool) -> None:
470470
"""Raise NotPrimaryError on unacknowledged write if this socket is not
@@ -550,11 +550,11 @@ def validate_session(
550550
"Can only use session with the AsyncMongoClient that started it"
551551
)
552552

553-
def close_conn(self, reason: Optional[str]) -> None:
553+
async def close_conn(self, reason: Optional[str]) -> None:
554554
"""Close this connection with a reason."""
555555
if self.closed:
556556
return
557-
self._close_conn()
557+
await self._close_conn()
558558
if reason:
559559
if self.enabled_for_cmap:
560560
assert self.listeners is not None
@@ -571,7 +571,7 @@ def close_conn(self, reason: Optional[str]) -> None:
571571
error=reason,
572572
)
573573

574-
def _close_conn(self) -> None:
574+
async def _close_conn(self) -> None:
575575
"""Close this connection."""
576576
if self.closed:
577577
return
@@ -580,7 +580,7 @@ def _close_conn(self) -> None:
580580
# Note: We catch exceptions to avoid spurious errors on interpreter
581581
# shutdown.
582582
try:
583-
self.conn.close()
583+
await self.conn.close()
584584
except asyncio.CancelledError:
585585
raise
586586
except Exception: # noqa: S110
@@ -618,7 +618,7 @@ def idle_time_seconds(self) -> float:
618618
"""Seconds since this socket was last checked into its pool."""
619619
return time.monotonic() - self.last_checkin_time
620620

621-
def _raise_connection_failure(self, error: BaseException) -> NoReturn:
621+
async def _raise_connection_failure(self, error: BaseException) -> NoReturn:
622622
# Catch *all* exceptions from socket methods and close the socket. In
623623
# regular Python, socket operations only raise socket.error, even if
624624
# the underlying cause was a Ctrl-C: a signal raised during socket.recv
@@ -638,7 +638,7 @@ def _raise_connection_failure(self, error: BaseException) -> NoReturn:
638638
reason = None
639639
else:
640640
reason = ConnectionClosedReason.ERROR
641-
self.close_conn(reason)
641+
await self.close_conn(reason)
642642
# SSLError from PyOpenSSL inherits directly from Exception.
643643
if isinstance(error, (IOError, OSError, SSLError)):
644644
details = _get_timeout_details(self.opts)
@@ -864,7 +864,7 @@ async def _reset(
864864
# publishing the PoolClearedEvent.
865865
if close:
866866
for conn in sockets:
867-
conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
867+
await conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
868868
if self.enabled_for_cmap:
869869
assert listeners is not None
870870
listeners.publish_pool_closed(self.address)
@@ -895,7 +895,7 @@ async def _reset(
895895
serviceId=service_id,
896896
)
897897
for conn in sockets:
898-
conn.close_conn(ConnectionClosedReason.STALE)
898+
await conn.close_conn(ConnectionClosedReason.STALE)
899899

900900
async def update_is_writable(self, is_writable: Optional[bool]) -> None:
901901
"""Updates the is_writable attribute on all sockets currently in the
@@ -940,7 +940,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
940940
and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds
941941
):
942942
conn = self.conns.pop()
943-
conn.close_conn(ConnectionClosedReason.IDLE)
943+
await conn.close_conn(ConnectionClosedReason.IDLE)
944944

945945
while True:
946946
async with self.size_cond:
@@ -964,7 +964,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
964964
# Close connection and return if the pool was reset during
965965
# socket creation or while acquiring the pool lock.
966966
if self.gen.get_overall() != reference_generation:
967-
conn.close_conn(ConnectionClosedReason.STALE)
967+
await conn.close_conn(ConnectionClosedReason.STALE)
968968
return
969969
self.conns.appendleft(conn)
970970
self.active_contexts.discard(conn.cancel_context)
@@ -1052,7 +1052,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10521052
except BaseException:
10531053
async with self.lock:
10541054
self.active_contexts.discard(conn.cancel_context)
1055-
conn.close_conn(ConnectionClosedReason.ERROR)
1055+
await conn.close_conn(ConnectionClosedReason.ERROR)
10561056
raise
10571057

10581058
return conn
@@ -1246,7 +1246,7 @@ async def _get_conn(
12461246
except IndexError:
12471247
self._pending += 1
12481248
if conn: # We got a socket from the pool
1249-
if self._perished(conn):
1249+
if await self._perished(conn):
12501250
conn = None
12511251
continue
12521252
else: # We need to create a new connection
@@ -1259,7 +1259,7 @@ async def _get_conn(
12591259
except BaseException:
12601260
if conn:
12611261
# We checked out a socket but authentication failed.
1262-
conn.close_conn(ConnectionClosedReason.ERROR)
1262+
await conn.close_conn(ConnectionClosedReason.ERROR)
12631263
async with self.size_cond:
12641264
self.requests -= 1
12651265
if incremented:
@@ -1319,7 +1319,7 @@ async def checkin(self, conn: AsyncConnection) -> None:
13191319
await self.reset_without_pause()
13201320
else:
13211321
if self.closed:
1322-
conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
1322+
await conn.close_conn(ConnectionClosedReason.POOL_CLOSED)
13231323
elif conn.closed:
13241324
# CMAP requires the closed event be emitted after the check in.
13251325
if self.enabled_for_cmap:
@@ -1343,7 +1343,7 @@ async def checkin(self, conn: AsyncConnection) -> None:
13431343
# Hold the lock to ensure this section does not race with
13441344
# Pool.reset().
13451345
if self.stale_generation(conn.generation, conn.service_id):
1346-
conn.close_conn(ConnectionClosedReason.STALE)
1346+
await conn.close_conn(ConnectionClosedReason.STALE)
13471347
else:
13481348
conn.update_last_checkin_time()
13491349
conn.update_is_writable(bool(self.is_writable))
@@ -1361,7 +1361,7 @@ async def checkin(self, conn: AsyncConnection) -> None:
13611361
self.operation_count -= 1
13621362
self.size_cond.notify()
13631363

1364-
def _perished(self, conn: AsyncConnection) -> bool:
1364+
async def _perished(self, conn: AsyncConnection) -> bool:
13651365
"""Return True and close the connection if it is "perished".
13661366
13671367
This side-effecty function checks if this socket has been idle for
@@ -1381,18 +1381,18 @@ def _perished(self, conn: AsyncConnection) -> bool:
13811381
self.opts.max_idle_time_seconds is not None
13821382
and idle_time_seconds > self.opts.max_idle_time_seconds
13831383
):
1384-
conn.close_conn(ConnectionClosedReason.IDLE)
1384+
await conn.close_conn(ConnectionClosedReason.IDLE)
13851385
return True
13861386

13871387
if self._check_interval_seconds is not None and (
13881388
self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds
13891389
):
13901390
if conn.conn_closed():
1391-
conn.close_conn(ConnectionClosedReason.ERROR)
1391+
await conn.close_conn(ConnectionClosedReason.ERROR)
13921392
return True
13931393

13941394
if self.stale_generation(conn.generation, conn.service_id):
1395-
conn.close_conn(ConnectionClosedReason.STALE)
1395+
await conn.close_conn(ConnectionClosedReason.STALE)
13961396
return True
13971397

13981398
return False
@@ -1436,9 +1436,9 @@ def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn:
14361436
f"maxPoolSize: {self.opts.max_pool_size}, timeout: {timeout}"
14371437
)
14381438

1439-
def __del__(self) -> None:
1440-
# Avoid ResourceWarnings in Python 3
1441-
# Close all sockets without calling reset() or close() because it is
1442-
# not safe to acquire a lock in __del__.
1443-
for conn in self.conns:
1444-
conn.close_conn(None)
1439+
# def __del__(self) -> None:
1440+
# # Avoid ResourceWarnings in Python 3
1441+
# # Close all sockets without calling reset() or close() because it is
1442+
# # not safe to acquire a lock in __del__.
1443+
# for conn in self.conns:
1444+
# conn.close_conn(None)

0 commit comments

Comments
 (0)