Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ PyMongo 4.11 brings a number of changes including:
:meth:`~pymongo.collection.Collection.update_one`, :meth:`~pymongo.collection.Collection.replace_one`,
:class:`~pymongo.operations.UpdateOne`, and
:class:`~pymongo.operations.UpdateMany`,
- :meth:`~pymongo.mongo_client.MongoClient.bulk_write` and
:meth:`~pymongo.asynchronous.mongo_client.AsyncMongoClient.bulk_write` now throw an error
when ``ordered=True`` or ``verboseResults=True`` are used with unacknowledged writes.
These are unavoidable breaking changes.

Issues Resolved
...............
Expand Down
49 changes: 5 additions & 44 deletions pymongo/asynchronous/client_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,11 @@ async def retryable_bulk(
_throw_client_bulk_write_exception(full_result, self.verbose_results)
return full_result

async def execute_command_unack_unordered(
async def execute_command_unack(
self,
conn: AsyncConnection,
) -> None:
"""Execute commands with OP_MSG and w=0 writeConcern, unordered."""
"""Execute commands with OP_MSG and w=0 writeConcern. Always unordered."""
db_name = "admin"
cmd_name = "bulkWrite"
listeners = self.client._event_listeners
Expand All @@ -704,8 +704,8 @@ async def execute_command_unack_unordered(
while self.idx_offset < self.total_ops:
# Construct the server command, specifying the relevant options.
cmd = {"bulkWrite": 1}
cmd["errorsOnly"] = not self.verbose_results
cmd["ordered"] = self.ordered # type: ignore[assignment]
cmd["errorsOnly"] = True
cmd["ordered"] = False
if self.bypass_doc_val is not None:
cmd["bypassDocumentValidation"] = self.bypass_doc_val
cmd["writeConcern"] = {"w": 0} # type: ignore[assignment]
Expand All @@ -723,43 +723,6 @@ async def execute_command_unack_unordered(

self.idx_offset += len(to_send_ops)

async def execute_command_unack_ordered(
self,
conn: AsyncConnection,
) -> None:
"""Execute commands with OP_MSG and w=0 WriteConcern, ordered."""
full_result: MutableMapping[str, Any] = {
"anySuccessful": False,
"error": None,
"writeErrors": [],
"writeConcernErrors": [],
"nInserted": 0,
"nUpserted": 0,
"nMatched": 0,
"nModified": 0,
"nDeleted": 0,
"insertResults": {},
"updateResults": {},
"deleteResults": {},
}
# Ordered bulk writes have to be acknowledged so that we stop
# processing at the first error, even when the application
# specified unacknowledged writeConcern.
initial_write_concern = WriteConcern()
op_id = _randint()
try:
await self._execute_command(
initial_write_concern,
None,
conn,
op_id,
False,
full_result,
self.write_concern,
)
except OperationFailure:
pass

async def execute_no_results(
self,
conn: AsyncConnection,
Expand All @@ -775,9 +738,7 @@ async def execute_no_results(
"Cannot set bypass_document_validation with unacknowledged write concern"
)

if self.ordered:
return await self.execute_command_unack_ordered(conn)
return await self.execute_command_unack_unordered(conn)
return await self.execute_command_unack(conn)

async def execute(
self,
Expand Down
49 changes: 5 additions & 44 deletions pymongo/synchronous/client_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,11 +679,11 @@ def retryable_bulk(
_throw_client_bulk_write_exception(full_result, self.verbose_results)
return full_result

def execute_command_unack_unordered(
def execute_command_unack(
self,
conn: Connection,
) -> None:
"""Execute commands with OP_MSG and w=0 writeConcern, unordered."""
"""Execute commands with OP_MSG and w=0 writeConcern. Always unordered."""
db_name = "admin"
cmd_name = "bulkWrite"
listeners = self.client._event_listeners
Expand All @@ -702,8 +702,8 @@ def execute_command_unack_unordered(
while self.idx_offset < self.total_ops:
# Construct the server command, specifying the relevant options.
cmd = {"bulkWrite": 1}
cmd["errorsOnly"] = not self.verbose_results
cmd["ordered"] = self.ordered # type: ignore[assignment]
cmd["errorsOnly"] = True
cmd["ordered"] = False
if self.bypass_doc_val is not None:
cmd["bypassDocumentValidation"] = self.bypass_doc_val
cmd["writeConcern"] = {"w": 0} # type: ignore[assignment]
Expand All @@ -721,43 +721,6 @@ def execute_command_unack_unordered(

self.idx_offset += len(to_send_ops)

def execute_command_unack_ordered(
self,
conn: Connection,
) -> None:
"""Execute commands with OP_MSG and w=0 WriteConcern, ordered."""
full_result: MutableMapping[str, Any] = {
"anySuccessful": False,
"error": None,
"writeErrors": [],
"writeConcernErrors": [],
"nInserted": 0,
"nUpserted": 0,
"nMatched": 0,
"nModified": 0,
"nDeleted": 0,
"insertResults": {},
"updateResults": {},
"deleteResults": {},
}
# Ordered bulk writes have to be acknowledged so that we stop
# processing at the first error, even when the application
# specified unacknowledged writeConcern.
initial_write_concern = WriteConcern()
op_id = _randint()
try:
self._execute_command(
initial_write_concern,
None,
conn,
op_id,
False,
full_result,
self.write_concern,
)
except OperationFailure:
pass

def execute_no_results(
self,
conn: Connection,
Expand All @@ -773,9 +736,7 @@ def execute_no_results(
"Cannot set bypass_document_validation with unacknowledged write concern"
)

if self.ordered:
return self.execute_command_unack_ordered(conn)
return self.execute_command_unack_unordered(conn)
return self.execute_command_unack(conn)

def execute(
self,
Expand Down
Loading