Skip to content
Merged
3 changes: 3 additions & 0 deletions pymongo/asynchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)

from bson import CodecOptions, _convert_raw_document_lists_to_streams
from pymongo import _csot
from pymongo.asynchronous.cursor import _ConnectionManager
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
self._address = address
self._batch_size = batch_size
self._max_await_time_ms = max_await_time_ms
self._timeout = self._collection.database.client.options.timeout
self._session = session
self._explicit_session = explicit_session
self._killed = self._id == 0
Expand Down Expand Up @@ -385,6 +387,7 @@ async def __aenter__(self) -> AsyncCommandCursor[_DocumentType]:
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.close()

@_csot.apply
async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
"""Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``.

Expand Down
4 changes: 3 additions & 1 deletion pymongo/asynchronous/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
from bson.code import Code
from bson.son import SON
from pymongo import helpers_shared
from pymongo import _csot, helpers_shared
from pymongo.asynchronous.helpers import anext
from pymongo.collation import validate_collation_or_none
from pymongo.common import (
Expand Down Expand Up @@ -196,6 +196,7 @@ def __init__(
self._explain = False
self._comment = comment
self._max_time_ms = max_time_ms
self._timeout = self._collection.database.client.options.timeout
self._max_await_time_ms: Optional[int] = None
self._max: Optional[Union[dict[Any, Any], _Sort]] = max
self._min: Optional[Union[dict[Any, Any], _Sort]] = min
Expand Down Expand Up @@ -1290,6 +1291,7 @@ async def __aenter__(self) -> AsyncCursor[_DocumentType]:
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.close()

@_csot.apply
async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
"""Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``.

Expand Down
3 changes: 3 additions & 0 deletions pymongo/synchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
)

from bson import CodecOptions, _convert_raw_document_lists_to_streams
from pymongo import _csot
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
from pymongo.message import (
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
self._address = address
self._batch_size = batch_size
self._max_await_time_ms = max_await_time_ms
self._timeout = self._collection.database.client.options.timeout
self._session = session
self._explicit_session = explicit_session
self._killed = self._id == 0
Expand Down Expand Up @@ -385,6 +387,7 @@ def __enter__(self) -> CommandCursor[_DocumentType]:
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.close()

@_csot.apply
def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
"""Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``.

Expand Down
4 changes: 3 additions & 1 deletion pymongo/synchronous/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from bson import RE_TYPE, _convert_raw_document_lists_to_streams
from bson.code import Code
from bson.son import SON
from pymongo import helpers_shared
from pymongo import _csot, helpers_shared
from pymongo.collation import validate_collation_or_none
from pymongo.common import (
validate_is_document_type,
Expand Down Expand Up @@ -196,6 +196,7 @@ def __init__(
self._explain = False
self._comment = comment
self._max_time_ms = max_time_ms
self._timeout = self._collection.database.client.options.timeout
self._max_await_time_ms: Optional[int] = None
self._max: Optional[Union[dict[Any, Any], _Sort]] = max
self._min: Optional[Union[dict[Any, Any], _Sort]] = min
Expand Down Expand Up @@ -1288,6 +1289,7 @@ def __enter__(self) -> Cursor[_DocumentType]:
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.close()

@_csot.apply
def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
"""Converts the contents of this cursor to a list more efficiently than ``[doc for doc in cursor]``.

Expand Down
34 changes: 33 additions & 1 deletion test/asynchronous/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
AllowListEventListener,
EventListener,
OvertCommandListener,
delay,
ignore_deprecations,
wait_until,
)
Expand All @@ -44,7 +45,7 @@
from pymongo.asynchronous.cursor import AsyncCursor, CursorType
from pymongo.asynchronous.helpers import anext
from pymongo.collation import Collation
from pymongo.errors import ExecutionTimeout, InvalidOperation, OperationFailure
from pymongo.errors import ExecutionTimeout, InvalidOperation, OperationFailure, PyMongoError
from pymongo.operations import _IndexList
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
Expand Down Expand Up @@ -1410,6 +1411,18 @@ async def test_to_list_length(self):
docs = await c.to_list(3)
self.assertEqual(len(docs), 2)

async def test_to_list_csot_applied(self):
client = await self.async_single_client(timeoutMS=500)
# Initialize the client with a larger timeout to help make test less flakey
with pymongo.timeout(2):
await client.admin.command("ping")
coll = client.pymongo.test
await coll.insert_many([{} for _ in range(5)])
cursor = coll.find({"$where": delay(1)})
with self.assertRaises(PyMongoError) as ctx:
await cursor.to_list()
self.assertTrue(ctx.exception.timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.


@async_client_context.require_change_streams
async def test_command_cursor_to_list(self):
# Set maxAwaitTimeMS=1 to speed up the test.
Expand Down Expand Up @@ -1439,6 +1452,25 @@ async def test_command_cursor_to_list_length(self):
result = await db.test.aggregate([pipeline])
self.assertEqual(len(await result.to_list(1)), 1)

@async_client_context.require_failCommand_blockConnection
async def test_command_cursor_to_list_csot_applied(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 things:

  • Let's use regular aggregate here since there's nothing special about $changeStream here and it complicates the test.
  • The failCommands needs to be for "getMore", not "aggregate" since await client.db.test.aggregate() actually runs the aggregate command and to_list only runs getMore. We'll need to set batchSize to ensure at least 1 getMore even runs.

client = await self.async_single_client(timeoutMS=500)
# Initialize the client with a larger timeout to help make test less flakey
with pymongo.timeout(2):
await client.admin.command("ping")
coll = client.pymongo.test
await coll.insert_many([{} for _ in range(5)])
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 5},
"data": {"failCommands": ["getMore"], "blockConnection": True, "blockTimeMS": 1000},
}
cursor = await coll.aggregate([], batchSize=1)
async with self.fail_point(fail_command):
with self.assertRaises(PyMongoError) as ctx:
await cursor.to_list()
self.assertTrue(ctx.exception.timeout)


class TestRawBatchCursor(AsyncIntegrationTest):
async def test_find_raw(self):
Expand Down
34 changes: 33 additions & 1 deletion test/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
AllowListEventListener,
EventListener,
OvertCommandListener,
delay,
ignore_deprecations,
wait_until,
)
Expand All @@ -42,7 +43,7 @@
from bson.code import Code
from pymongo import ASCENDING, DESCENDING
from pymongo.collation import Collation
from pymongo.errors import ExecutionTimeout, InvalidOperation, OperationFailure
from pymongo.errors import ExecutionTimeout, InvalidOperation, OperationFailure, PyMongoError
from pymongo.operations import _IndexList
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference
Expand Down Expand Up @@ -1401,6 +1402,18 @@ def test_to_list_length(self):
docs = c.to_list(3)
self.assertEqual(len(docs), 2)

def test_to_list_csot_applied(self):
client = self.single_client(timeoutMS=500)
# Initialize the client with a larger timeout to help make test less flakey
with pymongo.timeout(2):
client.admin.command("ping")
coll = client.pymongo.test
coll.insert_many([{} for _ in range(5)])
cursor = coll.find({"$where": delay(1)})
with self.assertRaises(PyMongoError) as ctx:
cursor.to_list()
self.assertTrue(ctx.exception.timeout)

@client_context.require_change_streams
def test_command_cursor_to_list(self):
# Set maxAwaitTimeMS=1 to speed up the test.
Expand Down Expand Up @@ -1430,6 +1443,25 @@ def test_command_cursor_to_list_length(self):
result = db.test.aggregate([pipeline])
self.assertEqual(len(result.to_list(1)), 1)

@client_context.require_failCommand_blockConnection
def test_command_cursor_to_list_csot_applied(self):
client = self.single_client(timeoutMS=500)
# Initialize the client with a larger timeout to help make test less flakey
with pymongo.timeout(2):
client.admin.command("ping")
coll = client.pymongo.test
coll.insert_many([{} for _ in range(5)])
fail_command = {
"configureFailPoint": "failCommand",
"mode": {"times": 5},
"data": {"failCommands": ["getMore"], "blockConnection": True, "blockTimeMS": 1000},
}
cursor = coll.aggregate([], batchSize=1)
with self.fail_point(fail_command):
with self.assertRaises(PyMongoError) as ctx:
cursor.to_list()
self.assertTrue(ctx.exception.timeout)


class TestRawBatchCursor(IntegrationTest):
def test_find_raw(self):
Expand Down
Loading