From 5cf097e919951fb4a40684103c63ccaa7ebe0196 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 29 Oct 2024 10:34:51 -0400 Subject: [PATCH 1/3] PYTHON-4807 - Specify how to handle unacknowledged+(ordered|verbose|multi-batch) bulk writes --- pymongo/asynchronous/mongo_client.py | 7 +++ pymongo/synchronous/mongo_client.py | 7 +++ .../unacknowledged-client-bulkWrite.json | 5 +- .../crud/unified/client-bulkWrite-errors.json | 58 +++++++++++++++++++ 4 files changed, 75 insertions(+), 2 deletions(-) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 4e09efe401..338d1f72ca 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2354,6 +2354,13 @@ async def bulk_write( if not write_concern: write_concern = self.write_concern + if not write_concern.acknowledged and verbose_results: + raise InvalidOperation( + "Cannot request unacknowledged write concern and verbose results" + ) + elif not write_concern.acknowledged and ordered: + raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes") + common.validate_list("models", models) blk = _AsyncClientBulk( diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 815446bb2c..d164c28453 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2342,6 +2342,13 @@ def bulk_write( if not write_concern: write_concern = self.write_concern + if not write_concern.acknowledged and verbose_results: + raise InvalidOperation( + "Cannot request unacknowledged write concern and verbose results" + ) + elif not write_concern.acknowledged and ordered: + raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes") + common.validate_list("models", models) blk = _ClientBulk( diff --git a/test/command_monitoring/unacknowledged-client-bulkWrite.json b/test/command_monitoring/unacknowledged-client-bulkWrite.json index b30e1540f4..61bb00726c 100644 --- a/test/command_monitoring/unacknowledged-client-bulkWrite.json +++ b/test/command_monitoring/unacknowledged-client-bulkWrite.json @@ -91,7 +91,8 @@ } } } - ] + ], + "ordered": false }, "expectResult": { "insertedCount": { @@ -158,7 +159,7 @@ "command": { "bulkWrite": 1, "errorsOnly": true, - "ordered": true, + "ordered": false, "ops": [ { "insert": 0, diff --git a/test/crud/unified/client-bulkWrite-errors.json b/test/crud/unified/client-bulkWrite-errors.json index 8cc45bb5f2..015bd95c99 100644 --- a/test/crud/unified/client-bulkWrite-errors.json +++ b/test/crud/unified/client-bulkWrite-errors.json @@ -450,6 +450,64 @@ } } ] + }, + { + "description": "Requesting unacknowledged write with verboseResults is a client-side error", + "operations": [ + { + "name": "clientBulkWrite", + "object": "client0", + "arguments": { + "models": [ + { + "insertOne": { + "namespace": "crud-tests.coll0", + "document": { + "_id": 10 + } + } + } + ], + "verboseResults": true, + "ordered": false, + "writeConcern": { + "w": 0 + } + }, + "expectError": { + "isClientError": true, + "errorContains": "Cannot request unacknowledged write concern and verbose results" + } + } + ] + }, + { + "description": "Requesting unacknowledged write with ordered is a client-side error", + "operations": [ + { + "name": "clientBulkWrite", + "object": "client0", + "arguments": { + "models": [ + { + "insertOne": { + "namespace": "crud-tests.coll0", + "document": { + "_id": 10 + } + } + } + ], + "writeConcern": { + "w": 0 + } + }, + "expectError": { + "isClientError": true, + "errorContains": "Cannot request unacknowledged write concern and ordered writes" + } + } + ] } ] } From 978a1e60e8525d0b39c7175a2590c0f9f6a5da9f Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 29 Oct 2024 10:53:23 -0400 Subject: [PATCH 2/3] Check for None --- pymongo/asynchronous/mongo_client.py | 4 ++-- pymongo/synchronous/mongo_client.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 338d1f72ca..a71e4cb5cd 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2354,11 +2354,11 @@ async def bulk_write( if not write_concern: write_concern = self.write_concern - if not write_concern.acknowledged and verbose_results: + if write_concern and not write_concern.acknowledged and verbose_results: raise InvalidOperation( "Cannot request unacknowledged write concern and verbose results" ) - elif not write_concern.acknowledged and ordered: + elif write_concern and not write_concern.acknowledged and ordered: raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes") common.validate_list("models", models) diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index d164c28453..24696f0c8e 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2342,11 +2342,11 @@ def bulk_write( if not write_concern: write_concern = self.write_concern - if not write_concern.acknowledged and verbose_results: + if write_concern and not write_concern.acknowledged and verbose_results: raise InvalidOperation( "Cannot request unacknowledged write concern and verbose results" ) - elif not write_concern.acknowledged and ordered: + elif write_concern and not write_concern.acknowledged and ordered: raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes") common.validate_list("models", models) From c287246687587d383446f7bd6aa2050aeb510a55 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Tue, 29 Oct 2024 11:34:26 -0400 Subject: [PATCH 3/3] Add prose test --- test/asynchronous/test_client_bulk_write.py | 46 ++++++++++++++++++++- test/test_client_bulk_write.py | 42 ++++++++++++++++++- 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_client_bulk_write.py b/test/asynchronous/test_client_bulk_write.py index 9464337809..5f6b3353e8 100644 --- a/test/asynchronous/test_client_bulk_write.py +++ b/test/asynchronous/test_client_bulk_write.py @@ -401,12 +401,16 @@ async def test_returns_error_if_unacknowledged_too_large_insert(self): # Insert document. models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})] with self.assertRaises(DocumentTooLarge): - await client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0)) + await client.bulk_write( + models=models_insert, ordered=False, write_concern=WriteConcern(w=0) + ) # Replace document. models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})] with self.assertRaises(DocumentTooLarge): - await client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0)) + await client.bulk_write( + models=models_replace, ordered=False, write_concern=WriteConcern(w=0) + ) async def _setup_namespace_test_models(self): # See prose test specification below for details on these calculations. @@ -590,6 +594,44 @@ async def test_upserted_result(self): self.assertEqual(result.update_results[1].did_upsert, True) self.assertEqual(result.update_results[2].did_upsert, False) + @async_client_context.require_version_min(8, 0, 0, -24) + @async_client_context.require_no_serverless + async def test_15_unacknowledged_write_across_batches(self): + listener = OvertCommandListener() + client = await self.async_rs_or_single_client(event_listeners=[listener]) + + collection = client.db["coll"] + self.addAsyncCleanup(collection.drop) + await collection.drop() + await client.db.command({"create": "db.coll"}) + + b_repeated = "b" * (self.max_bson_object_size - 500) + models = [ + InsertOne(namespace="db.coll", document={"a": b_repeated}) + for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1) + ] + + listener.reset() + + res = await client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0)) + self.assertEqual(False, res.acknowledged) + + events = listener.started_events + self.assertEqual(2, len(events)) + self.assertEqual( + int(self.max_message_size_bytes / self.max_bson_object_size), + len(events[0].command["ops"]), + ) + self.assertEqual(1, len(events[1].command["ops"])) + self.assertEqual(events[0].operation_id, events[1].operation_id) + self.assertEqual({"w": 0}, events[0].command["writeConcern"]) + self.assertEqual({"w": 0}, events[1].command["writeConcern"]) + + self.assertEqual( + int(self.max_message_size_bytes / self.max_bson_object_size) + 1, + await collection.count_documents({}), + ) + # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteCSOT(AsyncIntegrationTest): diff --git a/test/test_client_bulk_write.py b/test/test_client_bulk_write.py index 58b5015dd2..733970dd57 100644 --- a/test/test_client_bulk_write.py +++ b/test/test_client_bulk_write.py @@ -401,12 +401,12 @@ def test_returns_error_if_unacknowledged_too_large_insert(self): # Insert document. models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})] with self.assertRaises(DocumentTooLarge): - client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0)) + client.bulk_write(models=models_insert, ordered=False, write_concern=WriteConcern(w=0)) # Replace document. models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})] with self.assertRaises(DocumentTooLarge): - client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0)) + client.bulk_write(models=models_replace, ordered=False, write_concern=WriteConcern(w=0)) def _setup_namespace_test_models(self): # See prose test specification below for details on these calculations. @@ -590,6 +590,44 @@ def test_upserted_result(self): self.assertEqual(result.update_results[1].did_upsert, True) self.assertEqual(result.update_results[2].did_upsert, False) + @client_context.require_version_min(8, 0, 0, -24) + @client_context.require_no_serverless + def test_15_unacknowledged_write_across_batches(self): + listener = OvertCommandListener() + client = self.rs_or_single_client(event_listeners=[listener]) + + collection = client.db["coll"] + self.addCleanup(collection.drop) + collection.drop() + client.db.command({"create": "db.coll"}) + + b_repeated = "b" * (self.max_bson_object_size - 500) + models = [ + InsertOne(namespace="db.coll", document={"a": b_repeated}) + for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1) + ] + + listener.reset() + + res = client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0)) + self.assertEqual(False, res.acknowledged) + + events = listener.started_events + self.assertEqual(2, len(events)) + self.assertEqual( + int(self.max_message_size_bytes / self.max_bson_object_size), + len(events[0].command["ops"]), + ) + self.assertEqual(1, len(events[1].command["ops"])) + self.assertEqual(events[0].operation_id, events[1].operation_id) + self.assertEqual({"w": 0}, events[0].command["writeConcern"]) + self.assertEqual({"w": 0}, events[1].command["writeConcern"]) + + self.assertEqual( + int(self.max_message_size_bytes / self.max_bson_object_size) + 1, + collection.count_documents({}), + ) + # https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites class TestClientBulkWriteCSOT(IntegrationTest):