Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2354,6 +2354,13 @@ async def bulk_write(
if not write_concern:
write_concern = self.write_concern

if write_concern and not write_concern.acknowledged and verbose_results:
raise InvalidOperation(
"Cannot request unacknowledged write concern and verbose results"
)
elif write_concern and not write_concern.acknowledged and ordered:
raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't there be some dead code we can remove now that we don't support these operations?


common.validate_list("models", models)

blk = _AsyncClientBulk(
Expand Down
7 changes: 7 additions & 0 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2342,6 +2342,13 @@ def bulk_write(
if not write_concern:
write_concern = self.write_concern

if write_concern and not write_concern.acknowledged and verbose_results:
raise InvalidOperation(
"Cannot request unacknowledged write concern and verbose results"
)
elif write_concern and not write_concern.acknowledged and ordered:
raise InvalidOperation("Cannot request unacknowledged write concern and ordered writes")

common.validate_list("models", models)

blk = _ClientBulk(
Expand Down
46 changes: 44 additions & 2 deletions test/asynchronous/test_client_bulk_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,16 @@ async def test_returns_error_if_unacknowledged_too_large_insert(self):
# Insert document.
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
with self.assertRaises(DocumentTooLarge):
await client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0))
await client.bulk_write(
models=models_insert, ordered=False, write_concern=WriteConcern(w=0)
)

# Replace document.
models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})]
with self.assertRaises(DocumentTooLarge):
await client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
await client.bulk_write(
models=models_replace, ordered=False, write_concern=WriteConcern(w=0)
)

async def _setup_namespace_test_models(self):
# See prose test specification below for details on these calculations.
Expand Down Expand Up @@ -590,6 +594,44 @@ async def test_upserted_result(self):
self.assertEqual(result.update_results[1].did_upsert, True)
self.assertEqual(result.update_results[2].did_upsert, False)

@async_client_context.require_version_min(8, 0, 0, -24)
@async_client_context.require_no_serverless
async def test_15_unacknowledged_write_across_batches(self):
listener = OvertCommandListener()
client = await self.async_rs_or_single_client(event_listeners=[listener])

collection = client.db["coll"]
self.addAsyncCleanup(collection.drop)
await collection.drop()
await client.db.command({"create": "db.coll"})

b_repeated = "b" * (self.max_bson_object_size - 500)
models = [
InsertOne(namespace="db.coll", document={"a": b_repeated})
for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1)
]

listener.reset()

res = await client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0))
self.assertEqual(False, res.acknowledged)

events = listener.started_events
self.assertEqual(2, len(events))
self.assertEqual(
int(self.max_message_size_bytes / self.max_bson_object_size),
len(events[0].command["ops"]),
)
self.assertEqual(1, len(events[1].command["ops"]))
self.assertEqual(events[0].operation_id, events[1].operation_id)
self.assertEqual({"w": 0}, events[0].command["writeConcern"])
self.assertEqual({"w": 0}, events[1].command["writeConcern"])

self.assertEqual(
int(self.max_message_size_bytes / self.max_bson_object_size) + 1,
await collection.count_documents({}),
)


# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteCSOT(AsyncIntegrationTest):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
}
}
}
]
],
"ordered": false
},
"expectResult": {
"insertedCount": {
Expand Down Expand Up @@ -158,7 +159,7 @@
"command": {
"bulkWrite": 1,
"errorsOnly": true,
"ordered": true,
"ordered": false,
"ops": [
{
"insert": 0,
Expand Down
58 changes: 58 additions & 0 deletions test/crud/unified/client-bulkWrite-errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,64 @@
}
}
]
},
{
"description": "Requesting unacknowledged write with verboseResults is a client-side error",
"operations": [
{
"name": "clientBulkWrite",
"object": "client0",
"arguments": {
"models": [
{
"insertOne": {
"namespace": "crud-tests.coll0",
"document": {
"_id": 10
}
}
}
],
"verboseResults": true,
"ordered": false,
"writeConcern": {
"w": 0
}
},
"expectError": {
"isClientError": true,
"errorContains": "Cannot request unacknowledged write concern and verbose results"
}
}
]
},
{
"description": "Requesting unacknowledged write with ordered is a client-side error",
"operations": [
{
"name": "clientBulkWrite",
"object": "client0",
"arguments": {
"models": [
{
"insertOne": {
"namespace": "crud-tests.coll0",
"document": {
"_id": 10
}
}
}
],
"writeConcern": {
"w": 0
}
},
"expectError": {
"isClientError": true,
"errorContains": "Cannot request unacknowledged write concern and ordered writes"
}
}
]
}
]
}
42 changes: 40 additions & 2 deletions test/test_client_bulk_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,12 @@ def test_returns_error_if_unacknowledged_too_large_insert(self):
# Insert document.
models_insert = [InsertOne(namespace="db.coll", document={"a": b_repeated})]
with self.assertRaises(DocumentTooLarge):
client.bulk_write(models=models_insert, write_concern=WriteConcern(w=0))
client.bulk_write(models=models_insert, ordered=False, write_concern=WriteConcern(w=0))

# Replace document.
models_replace = [ReplaceOne(namespace="db.coll", filter={}, replacement={"a": b_repeated})]
with self.assertRaises(DocumentTooLarge):
client.bulk_write(models=models_replace, write_concern=WriteConcern(w=0))
client.bulk_write(models=models_replace, ordered=False, write_concern=WriteConcern(w=0))

def _setup_namespace_test_models(self):
# See prose test specification below for details on these calculations.
Expand Down Expand Up @@ -590,6 +590,44 @@ def test_upserted_result(self):
self.assertEqual(result.update_results[1].did_upsert, True)
self.assertEqual(result.update_results[2].did_upsert, False)

@client_context.require_version_min(8, 0, 0, -24)
@client_context.require_no_serverless
def test_15_unacknowledged_write_across_batches(self):
listener = OvertCommandListener()
client = self.rs_or_single_client(event_listeners=[listener])

collection = client.db["coll"]
self.addCleanup(collection.drop)
collection.drop()
client.db.command({"create": "db.coll"})

b_repeated = "b" * (self.max_bson_object_size - 500)
models = [
InsertOne(namespace="db.coll", document={"a": b_repeated})
for _ in range(int(self.max_message_size_bytes / self.max_bson_object_size) + 1)
]

listener.reset()

res = client.bulk_write(models, ordered=False, write_concern=WriteConcern(w=0))
self.assertEqual(False, res.acknowledged)

events = listener.started_events
self.assertEqual(2, len(events))
self.assertEqual(
int(self.max_message_size_bytes / self.max_bson_object_size),
len(events[0].command["ops"]),
)
self.assertEqual(1, len(events[1].command["ops"]))
self.assertEqual(events[0].operation_id, events[1].operation_id)
self.assertEqual({"w": 0}, events[0].command["writeConcern"])
self.assertEqual({"w": 0}, events[1].command["writeConcern"])

self.assertEqual(
int(self.max_message_size_bytes / self.max_bson_object_size) + 1,
collection.count_documents({}),
)


# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
class TestClientBulkWriteCSOT(IntegrationTest):
Expand Down
Loading