diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 4e09efe401..a71e4cb5cd 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2354,6 +2354,13 @@ async def bulk_write( if not write_concern: write_concern = self.write_concern + if write_concern and not write_concern.acknowledged and verbose_results: + raise InvalidOperation( + "Cannot request unacknowledged write concern and verbose results" + ) + elif write_concern and not write_concern.acknowledged and ordered: + raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes") + common.validate_list("models", models) blk = _AsyncClientBulk( diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 815446bb2c..24696f0c8e 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2342,6 +2342,13 @@ def bulk_write( if not write_concern: write_concern = self.write_concern + if write_concern and not write_concern.acknowledged and verbose_results: + raise InvalidOperation( + "Cannot request unacknowledged write concern and verbose results" + ) + elif write_concern and not write_concern.acknowledged and ordered: + raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes") + common.validate_list("models", models) blk = _ClientBulk( diff --git a/test/asynchronous/test_client_bulk_write.py b/test/asynchronous/test_client_bulk_write.py index 9464337809..5f6b3353e8 100644 --- a/test/asynchronous/test_client_bulk_write.py +++ b/test/asynchronous/test_client_bulk_write.py @@ -401,12 +401,16 @@ async def test_returns_error_if_unacknowledged_too_large_insert(self): # Insert document. models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})] with self.assertRaises(DocumentTooLarge): - await client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0)) + await client.bulk_write( + models=models_insert, ordered=False, write_concern=WriteConcern(w=0) + ) # Replace document. models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})] with self.assertRaises(DocumentTooLarge): - await client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0)) + await client.bulk_write( + models=models_replace, ordered=False, write_concern=WriteConcern(w=0) + ) async def _setup_namespace_test_models(self): # See prose test specification below for details on these calculations. @@ -590,6 +594,44 @@ async def test_upserted_result(self): self.assertEqual(result.update_results[1].did_upsert, True) self.assertEqual(result.update_results[2].did_upsert, False) + @async_client_context.require_version_min(8, 0, 0, -24) + @async_client_context.require_no_serverless + async def test_15_unacknowledged_write_across_batches(self): + listener = OvertCommandListener() + client = await self.async_rs_or_single_client(event_listeners=[listener]) + + collection = client.db["coll"] + self.addAsyncCleanup(collection.drop) + await collection.drop() + await client.db.command({"create": "db.coll"}) + + b_repeated = "b" * (self.max_bson_object_size - 500) + models = [ + InsertOne(namespace="db.coll", document={"a": b_repeated}) + for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1) + ] + + listener.reset() + + res = await client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0)) + self.assertEqual(False, res.acknowledged) + + events = listener.started_events + self.assertEqual(2, len(events)) + self.assertEqual( + int(self.max_message_size_bytes / self.max_bson_object_size), + len(events[0].command["ops"]), + ) + self.assertEqual(1, len(events[1].command["ops"])) + self.assertEqual(events[0].operation_id, events[1].operation_id) + self.assertEqual({"w": 0}, events[0].command["writeConcern"]) + self.assertEqual({"w": 0}, events[1].command["writeConcern"]) + + self.assertEqual( + int(self.max_message_size_bytes / self.max_bson_object_size) + 1, + await collection.count_documents({}), + ) + # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteCSOT(AsyncIntegrationTest): diff --git a/test/command_monitoring/unacknowledged-client-bulkWrite.json b/test/command_monitoring/unacknowledged-client-bulkWrite.json index b30e1540f4..61bb00726c 100644 --- a/test/command_monitoring/unacknowledged-client-bulkWrite.json +++ b/test/command_monitoring/unacknowledged-client-bulkWrite.json @@ -91,7 +91,8 @@ } } } - ] + ], + "ordered": false }, "expectResult": { "insertedCount": { @@ -158,7 +159,7 @@ "command": { "bulkWrite": 1, "errorsOnly": true, - "ordered": true, + "ordered": false, "ops": [ { "insert": 0, diff --git a/test/crud/unified/client-bulkWrite-errors.json b/test/crud/unified/client-bulkWrite-errors.json index 8cc45bb5f2..015bd95c99 100644 --- a/test/crud/unified/client-bulkWrite-errors.json +++ b/test/crud/unified/client-bulkWrite-errors.json @@ -450,6 +450,64 @@ } } ] + }, + { + "description": "Requesting unacknowledged write with verboseResults is a client-side error", + "operations": [ + { + "name": "clientBulkWrite", + "object": "client0", + "arguments": { + "models": [ + { + "insertOne": { + "namespace": "crud-tests.coll0", + "document": { + "_id": 10 + } + } + } + ], + "verboseResults": true, + "ordered": false, + "writeConcern": { + "w": 0 + } + }, + "expectError": { + "isClientError": true, + "errorContains": "Cannot request unacknowledged write concern and verbose results" + } + } + ] + }, + { + "description": "Requesting unacknowledged write with ordered is a client-side error", + "operations": [ + { + "name": "clientBulkWrite", + "object": "client0", + "arguments": { + "models": [ + { + "insertOne": { + "namespace": "crud-tests.coll0", + "document": { + "_id": 10 + } + } + } + ], + "writeConcern": { + "w": 0 + } + }, + "expectError": { + "isClientError": true, + "errorContains": "Cannot request unacknowledged write concern and ordered writes" + } + } + ] } ] } diff --git a/test/test_client_bulk_write.py b/test/test_client_bulk_write.py index 58b5015dd2..733970dd57 100644 --- a/test/test_client_bulk_write.py +++ b/test/test_client_bulk_write.py @@ -401,12 +401,12 @@ def test_returns_error_if_unacknowledged_too_large_insert(self): # Insert document. models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})] with self.assertRaises(DocumentTooLarge): - client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0)) + client.bulk_write(models=models_insert, ordered=False, write_concern=WriteConcern(w=0)) # Replace document. models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})] with self.assertRaises(DocumentTooLarge): - client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0)) + client.bulk_write(models=models_replace, ordered=False, write_concern=WriteConcern(w=0)) def _setup_namespace_test_models(self): # See prose test specification below for details on these calculations. @@ -590,6 +590,44 @@ def test_upserted_result(self): self.assertEqual(result.update_results[1].did_upsert, True) self.assertEqual(result.update_results[2].did_upsert, False) + @client_context.require_version_min(8, 0, 0, -24) + @client_context.require_no_serverless + def test_15_unacknowledged_write_across_batches(self): + listener = OvertCommandListener() + client = self.rs_or_single_client(event_listeners=[listener]) + + collection = client.db["coll"] + self.addCleanup(collection.drop) + collection.drop() + client.db.command({"create": "db.coll"}) + + b_repeated = "b" * (self.max_bson_object_size - 500) + models = [ + InsertOne(namespace="db.coll", document={"a": b_repeated}) + for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1) + ] + + listener.reset() + + res = client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0)) + self.assertEqual(False, res.acknowledged) + + events = listener.started_events + self.assertEqual(2, len(events)) + self.assertEqual( + int(self.max_message_size_bytes / self.max_bson_object_size), + len(events[0].command["ops"]), + ) + self.assertEqual(1, len(events[1].command["ops"])) + self.assertEqual(events[0].operation_id, events[1].operation_id) + self.assertEqual({"w": 0}, events[0].command["writeConcern"]) + self.assertEqual({"w": 0}, events[1].command["writeConcern"]) + + self.assertEqual( + int(self.max_message_size_bytes / self.max_bson_object_size) + 1, + collection.count_documents({}), + ) + # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteCSOT(IntegrationTest):