Skip to content

Commit 2f1227c

Browse files
authored
PYTHON-4807 - Specify how to handle unacknowledged+(ordered|verbose|m… (mongodb#1979)
1 parent dfb6a9a commit 2f1227c

File tree

6 files changed

+159
-6
lines changed

6 files changed

+159
-6
lines changed

pymongo/asynchronous/mongo_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2354,6 +2354,13 @@ async def bulk_write(
23542354
if not write_concern:
23552355
write_concern = self.write_concern
23562356

2357+
if write_concern and not write_concern.acknowledged and verbose_results:
2358+
raise InvalidOperation(
2359+
"Cannot request unacknowledged write concern and verbose results"
2360+
)
2361+
elif write_concern and not write_concern.acknowledged and ordered:
2362+
raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes")
2363+
23572364
common.validate_list("models", models)
23582365

23592366
blk = _AsyncClientBulk(

pymongo/synchronous/mongo_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2342,6 +2342,13 @@ def bulk_write(
23422342
if not write_concern:
23432343
write_concern = self.write_concern
23442344

2345+
if write_concern and not write_concern.acknowledged and verbose_results:
2346+
raise InvalidOperation(
2347+
"Cannot request unacknowledged write concern and verbose results"
2348+
)
2349+
elif write_concern and not write_concern.acknowledged and ordered:
2350+
raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes")
2351+
23452352
common.validate_list("models", models)
23462353

23472354
blk = _ClientBulk(

test/asynchronous/test_client_bulk_write.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,12 +401,16 @@ async def test_returns_error_if_unacknowledged_too_large_insert(self):
401401
# Insert document.
402402
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
403403
with self.assertRaises(DocumentTooLarge):
404-
await client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0))
404+
await client.bulk_write(
405+
models=models_insert, ordered=False, write_concern=WriteConcern(w=0)
406+
)
405407

406408
# Replace document.
407409
models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})]
408410
with self.assertRaises(DocumentTooLarge):
409-
await client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
411+
await client.bulk_write(
412+
models=models_replace, ordered=False, write_concern=WriteConcern(w=0)
413+
)
410414

411415
async def _setup_namespace_test_models(self):
412416
# See prose test specification below for details on these calculations.
@@ -590,6 +594,44 @@ async def test_upserted_result(self):
590594
self.assertEqual(result.update_results[1].did_upsert, True)
591595
self.assertEqual(result.update_results[2].did_upsert, False)
592596

597+
@async_client_context.require_version_min(8, 0, 0, -24)
598+
@async_client_context.require_no_serverless
599+
async def test_15_unacknowledged_write_across_batches(self):
600+
listener = OvertCommandListener()
601+
client = await self.async_rs_or_single_client(event_listeners=[listener])
602+
603+
collection = client.db["coll"]
604+
self.addAsyncCleanup(collection.drop)
605+
await collection.drop()
606+
await client.db.command({"create": "db.coll"})
607+
608+
b_repeated = "b" * (self.max_bson_object_size - 500)
609+
models = [
610+
InsertOne(namespace="db.coll", document={"a": b_repeated})
611+
for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1)
612+
]
613+
614+
listener.reset()
615+
616+
res = await client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0))
617+
self.assertEqual(False, res.acknowledged)
618+
619+
events = listener.started_events
620+
self.assertEqual(2, len(events))
621+
self.assertEqual(
622+
int(self.max_message_size_bytes / self.max_bson_object_size),
623+
len(events[0].command["ops"]),
624+
)
625+
self.assertEqual(1, len(events[1].command["ops"]))
626+
self.assertEqual(events[0].operation_id, events[1].operation_id)
627+
self.assertEqual({"w": 0}, events[0].command["writeConcern"])
628+
self.assertEqual({"w": 0}, events[1].command["writeConcern"])
629+
630+
self.assertEqual(
631+
int(self.max_message_size_bytes / self.max_bson_object_size) + 1,
632+
await collection.count_documents({}),
633+
)
634+
593635

594636
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
595637
class TestClientBulkWriteCSOT(AsyncIntegrationTest):

test/command_monitoring/unacknowledged-client-bulkWrite.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@
9191
}
9292
}
9393
}
94-
]
94+
],
95+
"ordered": false
9596
},
9697
"expectResult": {
9798
"insertedCount": {
@@ -158,7 +159,7 @@
158159
"command": {
159160
"bulkWrite": 1,
160161
"errorsOnly": true,
161-
"ordered": true,
162+
"ordered": false,
162163
"ops": [
163164
{
164165
"insert": 0,

test/crud/unified/client-bulkWrite-errors.json

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,64 @@
450450
}
451451
}
452452
]
453+
},
454+
{
455+
"description": "Requesting unacknowledged write with verboseResults is a client-side error",
456+
"operations": [
457+
{
458+
"name": "clientBulkWrite",
459+
"object": "client0",
460+
"arguments": {
461+
"models": [
462+
{
463+
"insertOne": {
464+
"namespace": "crud-tests.coll0",
465+
"document": {
466+
"_id": 10
467+
}
468+
}
469+
}
470+
],
471+
"verboseResults": true,
472+
"ordered": false,
473+
"writeConcern": {
474+
"w": 0
475+
}
476+
},
477+
"expectError": {
478+
"isClientError": true,
479+
"errorContains": "Cannot request unacknowledged write concern and verbose results"
480+
}
481+
}
482+
]
483+
},
484+
{
485+
"description": "Requesting unacknowledged write with ordered is a client-side error",
486+
"operations": [
487+
{
488+
"name": "clientBulkWrite",
489+
"object": "client0",
490+
"arguments": {
491+
"models": [
492+
{
493+
"insertOne": {
494+
"namespace": "crud-tests.coll0",
495+
"document": {
496+
"_id": 10
497+
}
498+
}
499+
}
500+
],
501+
"writeConcern": {
502+
"w": 0
503+
}
504+
},
505+
"expectError": {
506+
"isClientError": true,
507+
"errorContains": "Cannot request unacknowledged write concern and ordered writes"
508+
}
509+
}
510+
]
453511
}
454512
]
455513
}

test/test_client_bulk_write.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,12 +401,12 @@ def test_returns_error_if_unacknowledged_too_large_insert(self):
401401
# Insert document.
402402
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
403403
with self.assertRaises(DocumentTooLarge):
404-
client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0))
404+
client.bulk_write(models=models_insert, ordered=False, write_concern=WriteConcern(w=0))
405405

406406
# Replace document.
407407
models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})]
408408
with self.assertRaises(DocumentTooLarge):
409-
client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
409+
client.bulk_write(models=models_replace, ordered=False, write_concern=WriteConcern(w=0))
410410

411411
def _setup_namespace_test_models(self):
412412
# See prose test specification below for details on these calculations.
@@ -590,6 +590,44 @@ def test_upserted_result(self):
590590
self.assertEqual(result.update_results[1].did_upsert, True)
591591
self.assertEqual(result.update_results[2].did_upsert, False)
592592

593+
@client_context.require_version_min(8, 0, 0, -24)
594+
@client_context.require_no_serverless
595+
def test_15_unacknowledged_write_across_batches(self):
596+
listener = OvertCommandListener()
597+
client = self.rs_or_single_client(event_listeners=[listener])
598+
599+
collection = client.db["coll"]
600+
self.addCleanup(collection.drop)
601+
collection.drop()
602+
client.db.command({"create": "db.coll"})
603+
604+
b_repeated = "b" * (self.max_bson_object_size - 500)
605+
models = [
606+
InsertOne(namespace="db.coll", document={"a": b_repeated})
607+
for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1)
608+
]
609+
610+
listener.reset()
611+
612+
res = client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0))
613+
self.assertEqual(False, res.acknowledged)
614+
615+
events = listener.started_events
616+
self.assertEqual(2, len(events))
617+
self.assertEqual(
618+
int(self.max_message_size_bytes / self.max_bson_object_size),
619+
len(events[0].command["ops"]),
620+
)
621+
self.assertEqual(1, len(events[1].command["ops"]))
622+
self.assertEqual(events[0].operation_id, events[1].operation_id)
623+
self.assertEqual({"w": 0}, events[0].command["writeConcern"])
624+
self.assertEqual({"w": 0}, events[1].command["writeConcern"])
625+
626+
self.assertEqual(
627+
int(self.max_message_size_bytes / self.max_bson_object_size) + 1,
628+
collection.count_documents({}),
629+
)
630+
593631

594632
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
595633
class TestClientBulkWriteCSOT(IntegrationTest):

0 commit comments

Comments
 (0)