From 43c7dee4b7db2a7add8d39195276f511707a4af5 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 3 Sep 2024 10:17:44 -0700 Subject: [PATCH 01/12] PYTHON-4707 Migrate test_bulk.py to async --- test/asynchronous/test_bulk.py | 1130 ++++++++++++++++++++++++++++++++ tools/synchro.py | 1 + 2 files changed, 1131 insertions(+) create mode 100644 test/asynchronous/test_bulk.py diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py new file mode 100644 index 0000000000..39b815b9d9 --- /dev/null +++ b/test/asynchronous/test_bulk.py @@ -0,0 +1,1130 @@ +# Copyright 2014-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test the bulk API.""" +from __future__ import annotations + +import sys +import uuid +from typing import Any, Optional + +from pymongo.asynchronous.mongo_client import AsyncMongoClient + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest, async_client_context, remove_all_users, unittest +from test.utils import ( + async_wait_until, + rs_or_single_client_noauth, + single_client, + wait_until, +) + +from bson.binary import Binary, UuidRepresentation +from bson.codec_options import CodecOptions +from bson.objectid import ObjectId +from pymongo.common import partition_node +from pymongo.errors import ( + BulkWriteError, + ConfigurationError, + InvalidOperation, + OperationFailure, +) +from pymongo.operations import * +from pymongo.asynchronous.collection import AsyncCollection +from pymongo.write_concern import WriteConcern + +_IS_SYNC = False + +class AsyncBulkTestBase(AsyncIntegrationTest): + coll: AsyncCollection + coll_w0: AsyncCollection + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.coll = cls.db.test + cls.coll_w0 = cls.coll.with_options(write_concern=WriteConcern(w=0)) + + async def asyncSetUp(self): + super().setUp() + await self.coll.drop() + + def assertEqualResponse(self, expected, actual): + """Compare response from bulk.execute() to expected response.""" + for key, value in expected.items(): + if key == "nModified": + self.assertEqual(value, actual["nModified"]) + elif key == "upserted": + expected_upserts = value + actual_upserts = actual["upserted"] + self.assertEqual( + len(expected_upserts), + len(actual_upserts), + 'Expected %d elements in "upserted", got %d' + % (len(expected_upserts), len(actual_upserts)), + ) + + for e, a in zip(expected_upserts, actual_upserts): + self.assertEqualUpsert(e, a) + + elif key == "writeErrors": + expected_errors = value + actual_errors = actual["writeErrors"] + self.assertEqual( + len(expected_errors), + len(actual_errors), + 'Expected %d elements in "writeErrors", got %d' + % (len(expected_errors), len(actual_errors)), + ) + + for e, a in zip(expected_errors, actual_errors): + self.assertEqualWriteError(e, a) + + else: + self.assertEqual( + actual.get(key), + value, + f"{key!r} value of {actual.get(key)!r} does not match expected {value!r}", + ) + + def assertEqualUpsert(self, expected, actual): + """Compare bulk.execute()['upserts'] to expected value. + + Like: {'index': 0, '_id': ObjectId()} + """ + self.assertEqual(expected["index"], actual["index"]) + if expected["_id"] == "...": + # Unspecified value. + self.assertTrue("_id" in actual) + else: + self.assertEqual(expected["_id"], actual["_id"]) + + def assertEqualWriteError(self, expected, actual): + """Compare bulk.execute()['writeErrors'] to expected value. + + Like: {'index': 0, 'code': 123, 'errmsg': '...', 'op': { ... }} + """ + self.assertEqual(expected["index"], actual["index"]) + self.assertEqual(expected["code"], actual["code"]) + if expected["errmsg"] == "...": + # Unspecified value. + self.assertTrue("errmsg" in actual) + else: + self.assertEqual(expected["errmsg"], actual["errmsg"]) + + expected_op = expected["op"].copy() + actual_op = actual["op"].copy() + if expected_op.get("_id") == "...": + # Unspecified _id. + self.assertTrue("_id" in actual_op) + actual_op.pop("_id") + expected_op.pop("_id") + + self.assertEqual(expected_op, actual_op) + + +class AsyncTestBulk(AsyncBulkTestBase): + async def test_empty(self): + with self.assertRaises(InvalidOperation): + await self.coll.bulk_write([]) + + async def test_insert(self): + expected = { + "nMatched": 0, + "nModified": 0, + "nUpserted": 0, + "nInserted": 1, + "nRemoved": 0, + "upserted": [], + "writeErrors": [], + "writeConcernErrors": [], + } + + result = await self.coll.bulk_write([InsertOne({})]) + self.assertEqualResponse(expected, result.bulk_api_result) + self.assertEqual(1, result.inserted_count) + self.assertEqual(1, await self.coll.count_documents({})) + + async def _test_update_many(self, update): + expected = { + "nMatched": 2, + "nModified": 2, + "nUpserted": 0, + "nInserted": 0, + "nRemoved": 0, + "upserted": [], + "writeErrors": [], + "writeConcernErrors": [], + } + await self.coll.insert_many([{}, {}]) + + result = await self.coll.bulk_write([UpdateMany({}, update)]) + self.assertEqualResponse(expected, result.bulk_api_result) + self.assertEqual(2, result.matched_count) + self.assertTrue(result.modified_count in (2, None)) + + async def test_update_many(self): + await self._test_update_many({"$set": {"foo": "bar"}}) + + @async_client_context.require_version_min(4, 1, 11) + async def test_update_many_pipeline(self): + await self._test_update_many([{"$set": {"foo": "bar"}}]) + + async def test_array_filters_validation(self): + with self.assertRaises(TypeError): + await UpdateMany({}, {}, array_filters={}) + with self.assertRaises(TypeError): + await UpdateOne({}, {}, array_filters={}) + + async def test_array_filters_unacknowledged(self): + coll = self.coll_w0 + update_one = UpdateOne({}, {"$set": {"y.$[i].b": 5}}, array_filters=[{"i.b": 1}]) + update_many = UpdateMany({}, {"$set": {"y.$[i].b": 5}}, array_filters=[{"i.b": 1}]) + with self.assertRaises(ConfigurationError): + await coll.bulk_write([update_one]) + with self.assertRaises(ConfigurationError): + await coll.bulk_write([update_many]) + + async def _test_update_one(self, update): + expected = { + "nMatched": 1, + "nModified": 1, + "nUpserted": 0, + "nInserted": 0, + "nRemoved": 0, + "upserted": [], + "writeErrors": [], + "writeConcernErrors": [], + } + + await self.coll.insert_many([{}, {}]) + + result = await self.coll.bulk_write([UpdateOne({}, update)]) + self.assertEqualResponse(expected, result.bulk_api_result) + self.assertEqual(1, result.matched_count) + self.assertTrue(result.modified_count in (1, None)) + + async def test_update_one(self): + await self._test_update_one({"$set": {"foo": "bar"}}) + + @async_client_context.require_version_min(4, 1, 11) + async def test_update_one_pipeline(self): + await self._test_update_one([{"$set": {"foo": "bar"}}]) + + async def test_replace_one(self): + expected = { + "nMatched": 1, + "nModified": 1, + "nUpserted": 0, + "nInserted": 0, + "nRemoved": 0, + "upserted": [], + "writeErrors": [], + "writeConcernErrors": [], + } + + await self.coll.insert_many([{}, {}]) + + result = await self.coll.bulk_write([ReplaceOne({}, {"foo": "bar"})]) + self.assertEqualResponse(expected, result.bulk_api_result) + self.assertEqual(1, result.matched_count) + self.assertTrue(result.modified_count in (1, None)) + + async def test_remove(self): + # Test removing all documents, ordered. + expected = { + "nMatched": 0, + "nModified": 0, + "nUpserted": 0, + "nInserted": 0, + "nRemoved": 2, + "upserted": [], + "writeErrors": [], + "writeConcernErrors": [], + } + await self.coll.insert_many([{}, {}]) + + result = await self.coll.bulk_write([DeleteMany({})]) + self.assertEqualResponse(expected, result.bulk_api_result) + self.assertEqual(2, result.deleted_count) + + async def test_remove_one(self): + # Test removing one document, empty selector. + await self.coll.insert_many([{}, {}]) + expected = { + "nMatched": 0, + "nModified": 0, + "nUpserted": 0, + "nInserted": 0, + "nRemoved": 1, + "upserted": [], + "writeErrors": [], + "writeConcernErrors": [], + } + + result = await self.coll.bulk_write([DeleteOne({})]) + self.assertEqualResponse(expected, result.bulk_api_result) + self.assertEqual(1, result.deleted_count) + self.assertEqual(await self.coll.count_documents({}), 1) + + async def test_upsert(self): + expected = { + "nMatched": 0, + "nModified": 0, + "nUpserted": 1, + "nInserted": 0, + "nRemoved": 0, + "upserted": [{"index": 0, "_id": "..."}], + } + + result = await self.coll.bulk_write([ReplaceOne({}, {"foo": "bar"}, upsert=True)]) + self.assertEqualResponse(expected, result.bulk_api_result) + self.assertEqual(1, result.upserted_count) + assert result.upserted_ids is not None + self.assertEqual(1, len(result.upserted_ids)) + self.assertTrue(isinstance(result.upserted_ids.get(0), ObjectId)) + + self.assertEqual(await self.coll.count_documents({"foo": "bar"}), 1) + + async def test_numerous_inserts(self): + # Ensure we don't exceed server's maxWriteBatchSize size limit. + n_docs = await async_client_context.max_write_batch_size + 100 + requests = [InsertOne[dict]({}) for _ in range(n_docs)] + result = await self.coll.bulk_write(requests, ordered=False) + self.assertEqual(n_docs, result.inserted_count) + self.assertEqual(n_docs, await self.coll.count_documents({})) + + # Same with ordered bulk. + await self.coll.drop() + result = await self.coll.bulk_write(requests) + self.assertEqual(n_docs, result.inserted_count) + self.assertEqual(n_docs, await self.coll.count_documents({})) + + async def test_bulk_max_message_size(self): + await self.coll.delete_many({}) + self.addCleanup(self.coll.delete_many, {}) + _16_MB = 16 * 1000 * 1000 + # Generate a list of documents such that the first batched OP_MSG is + # as close as possible to the 48MB limit. + docs = [ + {"_id": 1, "l": "s" * _16_MB}, + {"_id": 2, "l": "s" * _16_MB}, + {"_id": 3, "l": "s" * (_16_MB - 10000)}, + ] + # Fill in the remaining ~10000 bytes with small documents. + for i in range(4, 10000): + docs.append({"_id": i}) + result = await self.coll.insert_many(docs) + self.assertEqual(len(docs), len(result.inserted_ids)) + + async def test_generator_insert(self): + def gen(): + yield {"a": 1, "b": 1} + yield {"a": 1, "b": 2} + yield {"a": 2, "b": 3} + yield {"a": 3, "b": 5} + yield {"a": 5, "b": 8} + + result = await self.coll.insert_many(gen()) + self.assertEqual(5, len(result.inserted_ids)) + + async def test_bulk_write_no_results(self): + result = await self.coll_w0.bulk_write([InsertOne({})]) + self.assertFalse(result.acknowledged) + self.assertRaises(InvalidOperation, lambda: result.inserted_count) + self.assertRaises(InvalidOperation, lambda: result.matched_count) + self.assertRaises(InvalidOperation, lambda: result.modified_count) + self.assertRaises(InvalidOperation, lambda: result.deleted_count) + self.assertRaises(InvalidOperation, lambda: result.upserted_count) + self.assertRaises(InvalidOperation, lambda: result.upserted_ids) + + async def test_bulk_write_invalid_arguments(self): + # The requests argument must be a list. + generator = (InsertOne[dict]({}) for _ in range(10)) + with self.assertRaises(TypeError): + await self.coll.bulk_write(generator) # type: ignore[arg-type] + + # Document is not wrapped in a bulk write operation. + with self.assertRaises(TypeError): + await self.coll.bulk_write([{}]) # type: ignore[list-item] + + async def test_upsert_large(self): + big = "a" * (await async_client_context.max_bson_size - 37) + result = await self.coll.bulk_write([UpdateOne({"x": 1}, {"$set": {"s": big}}, upsert=True)]) + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 1, + "nInserted": 0, + "nRemoved": 0, + "upserted": [{"index": 0, "_id": "..."}], + }, + result.bulk_api_result, + ) + + self.assertEqual(1, await self.coll.count_documents({"x": 1})) + + async def test_client_generated_upsert_id(self): + result = await self.coll.bulk_write( + [ + UpdateOne({"_id": 0}, {"$set": {"a": 0}}, upsert=True), + ReplaceOne({"a": 1}, {"_id": 1}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({"_id": 2}, {"_id": 2}, upsert=True), + ] + ) + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 3, + "nInserted": 0, + "nRemoved": 0, + "upserted": [ + {"index": 0, "_id": 0}, + {"index": 1, "_id": 1}, + {"index": 2, "_id": 2}, + ], + }, + result.bulk_api_result, + ) + + async def test_upsert_uuid_standard(self): + options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD) + coll = self.coll.with_options(codec_options=options) + uuids = [uuid.uuid4() for _ in range(3)] + result = await coll.bulk_write( + [ + UpdateOne({"_id": uuids[0]}, {"$set": {"a": 0}}, upsert=True), + ReplaceOne({"a": 1}, {"_id": uuids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({"_id": uuids[2]}, {"_id": uuids[2]}, upsert=True), + ] + ) + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 3, + "nInserted": 0, + "nRemoved": 0, + "upserted": [ + {"index": 0, "_id": uuids[0]}, + {"index": 1, "_id": uuids[1]}, + {"index": 2, "_id": uuids[2]}, + ], + }, + result.bulk_api_result, + ) + + async def test_upsert_uuid_unspecified(self): + options = CodecOptions(uuid_representation=UuidRepresentation.UNSPECIFIED) + coll = self.coll.with_options(codec_options=options) + uuids = [Binary.from_uuid(uuid.uuid4()) for _ in range(3)] + result = await coll.bulk_write( + [ + UpdateOne({"_id": uuids[0]}, {"$set": {"a": 0}}, upsert=True), + ReplaceOne({"a": 1}, {"_id": uuids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({"_id": uuids[2]}, {"_id": uuids[2]}, upsert=True), + ] + ) + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 3, + "nInserted": 0, + "nRemoved": 0, + "upserted": [ + {"index": 0, "_id": uuids[0]}, + {"index": 1, "_id": uuids[1]}, + {"index": 2, "_id": uuids[2]}, + ], + }, + result.bulk_api_result, + ) + + async def test_upsert_uuid_standard_subdocuments(self): + options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD) + coll = self.coll.with_options(codec_options=options) + ids: list = [{"f": Binary(bytes(i)), "f2": uuid.uuid4()} for i in range(3)] + + result = await coll.bulk_write( + [ + UpdateOne({"_id": ids[0]}, {"$set": {"a": 0}}, upsert=True), + ReplaceOne({"a": 1}, {"_id": ids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({"_id": ids[2]}, {"_id": ids[2]}, upsert=True), + ] + ) + + # The `Binary` values are returned as `bytes` objects. + for _id in ids: + _id["f"] = bytes(_id["f"]) + + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 3, + "nInserted": 0, + "nRemoved": 0, + "upserted": [ + {"index": 0, "_id": ids[0]}, + {"index": 1, "_id": ids[1]}, + {"index": 2, "_id": ids[2]}, + ], + }, + result.bulk_api_result, + ) + + async def test_single_ordered_batch(self): + result = await self.coll.bulk_write( + [ + InsertOne({"a": 1}), + UpdateOne({"a": 1}, {"$set": {"b": 1}}), + UpdateOne({"a": 2}, {"$set": {"b": 2}}, upsert=True), + InsertOne({"a": 3}), + DeleteOne({"a": 3}), + ] + ) + self.assertEqualResponse( + { + "nMatched": 1, + "nModified": 1, + "nUpserted": 1, + "nInserted": 2, + "nRemoved": 1, + "upserted": [{"index": 2, "_id": "..."}], + }, + result.bulk_api_result, + ) + + async def test_single_error_ordered_batch(self): + await self.coll.create_index("a", unique=True) + self.addCleanup(self.coll.drop_index, [("a", 1)]) + requests: list = [ + InsertOne({"b": 1, "a": 1}), + UpdateOne({"b": 2}, {"$set": {"a": 1}}, upsert=True), + InsertOne({"b": 3, "a": 2}), + ] + try: + await self.coll.bulk_write(requests) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 0, + "nInserted": 1, + "nRemoved": 0, + "upserted": [], + "writeConcernErrors": [], + "writeErrors": [ + { + "index": 1, + "code": 11000, + "errmsg": "...", + "op": { + "q": {"b": 2}, + "u": {"$set": {"a": 1}}, + "multi": False, + "upsert": True, + }, + } + ], + }, + result, + ) + + async def test_multiple_error_ordered_batch(self): + await self.coll.create_index("a", unique=True) + self.addCleanup(self.coll.drop_index, [("a", 1)]) + requests: list = [ + InsertOne({"b": 1, "a": 1}), + UpdateOne({"b": 2}, {"$set": {"a": 1}}, upsert=True), + UpdateOne({"b": 3}, {"$set": {"a": 2}}, upsert=True), + UpdateOne({"b": 2}, {"$set": {"a": 1}}, upsert=True), + InsertOne({"b": 4, "a": 3}), + InsertOne({"b": 5, "a": 1}), + ] + + try: + await self.coll.bulk_write(requests) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 0, + "nInserted": 1, + "nRemoved": 0, + "upserted": [], + "writeConcernErrors": [], + "writeErrors": [ + { + "index": 1, + "code": 11000, + "errmsg": "...", + "op": { + "q": {"b": 2}, + "u": {"$set": {"a": 1}}, + "multi": False, + "upsert": True, + }, + } + ], + }, + result, + ) + + async def test_single_unordered_batch(self): + requests: list = [ + InsertOne({"a": 1}), + UpdateOne({"a": 1}, {"$set": {"b": 1}}), + UpdateOne({"a": 2}, {"$set": {"b": 2}}, upsert=True), + InsertOne({"a": 3}), + DeleteOne({"a": 3}), + ] + result = await self.coll.bulk_write(requests, ordered=False) + self.assertEqualResponse( + { + "nMatched": 1, + "nModified": 1, + "nUpserted": 1, + "nInserted": 2, + "nRemoved": 1, + "upserted": [{"index": 2, "_id": "..."}], + "writeErrors": [], + "writeConcernErrors": [], + }, + result.bulk_api_result, + ) + + async def test_single_error_unordered_batch(self): + await self.coll.create_index("a", unique=True) + self.addCleanup(self.coll.drop_index, [("a", 1)]) + requests: list = [ + InsertOne({"b": 1, "a": 1}), + UpdateOne({"b": 2}, {"$set": {"a": 1}}, upsert=True), + InsertOne({"b": 3, "a": 2}), + ] + + try: + await self.coll.bulk_write(requests, ordered=False) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 0, + "nInserted": 2, + "nRemoved": 0, + "upserted": [], + "writeConcernErrors": [], + "writeErrors": [ + { + "index": 1, + "code": 11000, + "errmsg": "...", + "op": { + "q": {"b": 2}, + "u": {"$set": {"a": 1}}, + "multi": False, + "upsert": True, + }, + } + ], + }, + result, + ) + + async def test_multiple_error_unordered_batch(self): + await self.coll.create_index("a", unique=True) + self.addCleanup(self.coll.drop_index, [("a", 1)]) + requests: list = [ + InsertOne({"b": 1, "a": 1}), + UpdateOne({"b": 2}, {"$set": {"a": 3}}, upsert=True), + UpdateOne({"b": 3}, {"$set": {"a": 4}}, upsert=True), + UpdateOne({"b": 4}, {"$set": {"a": 3}}, upsert=True), + InsertOne({"b": 5, "a": 2}), + InsertOne({"b": 6, "a": 1}), + ] + + try: + await self.coll.bulk_write(requests, ordered=False) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + # Assume the update at index 1 runs before the update at index 3, + # although the spec does not require it. Same for inserts. + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 2, + "nInserted": 2, + "nRemoved": 0, + "upserted": [{"index": 1, "_id": "..."}, {"index": 2, "_id": "..."}], + "writeConcernErrors": [], + "writeErrors": [ + { + "index": 3, + "code": 11000, + "errmsg": "...", + "op": { + "q": {"b": 4}, + "u": {"$set": {"a": 3}}, + "multi": False, + "upsert": True, + }, + }, + { + "index": 5, + "code": 11000, + "errmsg": "...", + "op": {"_id": "...", "b": 6, "a": 1}, + }, + ], + }, + result, + ) + + async def test_large_inserts_ordered(self): + big = "x" * await async_client_context.max_bson_size + requests = [ + InsertOne({"b": 1, "a": 1}), + InsertOne({"big": big}), + InsertOne({"b": 2, "a": 2}), + ] + + try: + await self.coll.bulk_write(requests) + except BulkWriteError as exc: + result = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(1, result["nInserted"]) + + await self.coll.delete_many({}) + + big = "x" * (1024 * 1024 * 4) + write_result = await self.coll.bulk_write( + [ + InsertOne({"a": 1, "big": big}), + InsertOne({"a": 2, "big": big}), + InsertOne({"a": 3, "big": big}), + InsertOne({"a": 4, "big": big}), + InsertOne({"a": 5, "big": big}), + InsertOne({"a": 6, "big": big}), + ] + ) + + self.assertEqual(6, write_result.inserted_count) + self.assertEqual(6, await self.coll.count_documents({})) + + async def test_large_inserts_unordered(self): + big = "x" * await async_client_context.max_bson_size + requests = [ + InsertOne({"b": 1, "a": 1}), + InsertOne({"big": big}), + InsertOne({"b": 2, "a": 2}), + ] + + try: + await self.coll.bulk_write(requests, ordered=False) + except BulkWriteError as exc: + details = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, details["nInserted"]) + + await self.coll.delete_many({}) + + big = "x" * (1024 * 1024 * 4) + result = await self.coll.bulk_write( + [ + InsertOne({"a": 1, "big": big}), + InsertOne({"a": 2, "big": big}), + InsertOne({"a": 3, "big": big}), + InsertOne({"a": 4, "big": big}), + InsertOne({"a": 5, "big": big}), + InsertOne({"a": 6, "big": big}), + ], + ordered=False, + ) + + self.assertEqual(6, result.inserted_count) + self.assertEqual(6, await self.coll.count_documents({})) + + +class AsyncBulkAuthorizationTestBase(AsyncBulkTestBase): + @classmethod + @async_client_context.require_auth + @async_client_context.require_no_api_version + async def _setup_class(cls): + await super()._setup_class() + + async def asyncSetUp(self): + await super().setUp() + await async_client_context.create_user(self.db.name, "readonly", "pw", ["read"]) + await self.db.command( + "createRole", + "noremove", + privileges=[ + { + "actions": ["insert", "update", "find"], + "resource": {"db": "pymongo_test", "collection": "test"}, + } + ], + roles=[], + ) + + async_client_context.create_user(self.db.name, "noremove", "pw", ["noremove"]) + + async def asyncTearDown(self): + await self.db.command("dropRole", "noremove") + await remove_all_users(self.db) + + +class AsyncTestBulkUnacknowledged(AsyncBulkTestBase): + async def asyncTearDown(self): + await self.coll.delete_many({}) + + async def test_no_results_ordered_success(self): + requests: list = [ + InsertOne({"a": 1}), + UpdateOne({"a": 3}, {"$set": {"b": 1}}, upsert=True), + InsertOne({"a": 2}), + DeleteOne({"a": 1}), + ] + result = await self.coll_w0.bulk_write(requests) + self.assertFalse(result.acknowledged) + + async def predicate(): + return await self.coll.count_documents({}) == 2 + + await async_wait_until(predicate, "insert 2 documents") + + async def predicate(): + return await self.coll.find_one({"_id": 1}) is None + + await async_wait_until(predicate, 'removed {"_id": 1}') + + async def test_no_results_ordered_failure(self): + requests: list = [ + InsertOne({"_id": 1}), + UpdateOne({"_id": 3}, {"$set": {"b": 1}}, upsert=True), + InsertOne({"_id": 2}), + # Fails with duplicate key error. + InsertOne({"_id": 1}), + # Should not be executed since the batch is ordered. + DeleteOne({"_id": 1}), + ] + result = await self.coll_w0.bulk_write(requests) + self.assertFalse(result.acknowledged) + + async def predicate(): + return await self.coll.count_documents({}) == 3 + + await async_wait_until(predicate, "insert 3 documents") + self.assertEqual({"_id": 1}, await self.coll.find_one({"_id": 1})) + + async def test_no_results_unordered_success(self): + requests: list = [ + InsertOne({"a": 1}), + UpdateOne({"a": 3}, {"$set": {"b": 1}}, upsert=True), + InsertOne({"a": 2}), + DeleteOne({"a": 1}), + ] + result = await self.coll_w0.bulk_write(requests, ordered=False) + self.assertFalse(result.acknowledged) + + async def predicate(): + return await self.coll.count_documents({}) == 2 + + await async_wait_until(predicate, "insert 2 documents") + + async def predicate(): + return await self.coll.find_one({"_id": 1}) is None + + await async_wait_until(predicate, 'removed {"_id": 1}') + + async def test_no_results_unordered_failure(self): + requests: list = [ + InsertOne({"_id": 1}), + UpdateOne({"_id": 3}, {"$set": {"b": 1}}, upsert=True), + InsertOne({"_id": 2}), + # Fails with duplicate key error. + InsertOne({"_id": 1}), + # Should be executed since the batch is unordered. + DeleteOne({"_id": 1}), + ] + result = await self.coll_w0.bulk_write(requests, ordered=False) + self.assertFalse(result.acknowledged) + + async def predicate(): + return await self.coll.count_documents({}) == 2 + + await async_wait_until(predicate, "insert 2 documents") + + async def predicate(): + return await self.coll.find_one({"_id": 1}) is None + + await async_wait_until(predicate, 'removed {"_id": 1}') + + +class AsyncTestBulkAuthorization(AsyncBulkAuthorizationTestBase): + async def test_readonly(self): + # We test that an authorization failure aborts the batch and is raised + # as OperationFailure. + cli = rs_or_single_client_noauth( + username="readonly", password="pw", authSource="pymongo_test" + ) + coll = cli.pymongo_test.test + coll.find_one() + with self.assertRaises(OperationFailure): + await coll.bulk_write([InsertOne({"x": 1})]) + + async def test_no_remove(self): + # We test that an authorization failure aborts the batch and is raised + # as OperationFailure. + cli = rs_or_single_client_noauth( + username="noremove", password="pw", authSource="pymongo_test" + ) + coll = cli.pymongo_test.test + coll.find_one() + requests = [ + InsertOne({"x": 1}), + ReplaceOne({"x": 2}, {"x": 2}, upsert=True), + DeleteMany({}), # Prohibited. + InsertOne({"x": 3}), # Never attempted. + ] + with self.assertRaises(OperationFailure): + await coll.bulk_write(requests) + self.assertEqual({1, 2}, set(await self.coll.distinct("x"))) + + +class AsyncTestBulkWriteConcern(AsyncBulkTestBase): + w: Optional[int] + secondary: AsyncMongoClient + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.w = async_client_context.w + cls.secondary = None + if cls.w is not None and cls.w > 1: + for member in async_client_context.hello["hosts"]: + if member != async_client_context.hello["primary"]: + cls.secondary = single_client(*partition_node(member)) + break + + @classmethod + async def async_tearDownClass(cls): + if cls.secondary: + await cls.secondary.close() + + async def cause_wtimeout(self, requests, ordered): + if not async_client_context.test_commands_enabled: + self.skipTest("Test commands must be enabled.") + + # Use the rsSyncApplyStop failpoint to pause replication on a + # secondary which will cause a wtimeout error. + await self.secondary.admin.command("configureFailPoint", "rsSyncApplyStop", mode="alwaysOn") + + try: + coll = self.coll.with_options(write_concern=WriteConcern(w=self.w, wtimeout=1)) + return await coll.bulk_write(requests, ordered=ordered) + finally: + await self.secondary.admin.command("configureFailPoint", "rsSyncApplyStop", mode="off") + + @async_client_context.require_replica_set + @async_client_context.require_secondaries_count(1) + async def test_write_concern_failure_ordered(self): + # Ensure we don't raise on wnote. + coll_ww = self.coll.with_options(write_concern=WriteConcern(w=self.w)) + result = await coll_ww.bulk_write([DeleteOne({"something": "that does no exist"})]) + self.assertTrue(result.acknowledged) + + requests: list[Any] = [InsertOne({"a": 1}), InsertOne({"a": 2})] + # Replication wtimeout is a 'soft' error. + # It shouldn't stop batch processing. + try: + await self.cause_wtimeout(requests, ordered=True) + except BulkWriteError as exc: + details = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 0, + "nInserted": 2, + "nRemoved": 0, + "upserted": [], + "writeErrors": [], + }, + details, + ) + + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(details["writeConcernErrors"]) > 0) + + failed = details["writeConcernErrors"][0] + self.assertEqual(64, failed["code"]) + self.assertTrue(isinstance(failed["errmsg"], str)) + + await self.coll.delete_many({}) + await self.coll.create_index("a", unique=True) + self.addCleanup(self.coll.drop_index, [("a", 1)]) + + # Fail due to write concern support as well + # as duplicate key error on ordered batch. + requests = [ + InsertOne({"a": 1}), + ReplaceOne({"a": 3}, {"b": 1}, upsert=True), + InsertOne({"a": 1}), + InsertOne({"a": 2}), + ] + try: + await self.cause_wtimeout(requests, ordered=True) + except BulkWriteError as exc: + details = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqualResponse( + { + "nMatched": 0, + "nModified": 0, + "nUpserted": 1, + "nInserted": 1, + "nRemoved": 0, + "upserted": [{"index": 1, "_id": "..."}], + "writeErrors": [ + {"index": 2, "code": 11000, "errmsg": "...", "op": {"_id": "...", "a": 1}} + ], + }, + details, + ) + + self.assertTrue(len(details["writeConcernErrors"]) > 1) + failed = details["writeErrors"][0] + self.assertTrue("duplicate" in failed["errmsg"]) + + @async_client_context.require_replica_set + @async_client_context.require_secondaries_count(1) + async def test_write_concern_failure_unordered(self): + # Ensure we don't raise on wnote. + coll_ww = self.coll.with_options(write_concern=WriteConcern(w=self.w)) + result = await coll_ww.bulk_write([DeleteOne({"something": "that does no exist"})], ordered=False) + self.assertTrue(result.acknowledged) + + requests = [ + InsertOne({"a": 1}), + UpdateOne({"a": 3}, {"$set": {"a": 3, "b": 1}}, upsert=True), + InsertOne({"a": 2}), + ] + # Replication wtimeout is a 'soft' error. + # It shouldn't stop batch processing. + try: + await self.cause_wtimeout(requests, ordered=False) + except BulkWriteError as exc: + details = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, details["nInserted"]) + self.assertEqual(1, details["nUpserted"]) + self.assertEqual(0, len(details["writeErrors"])) + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(details["writeConcernErrors"]) > 1) + + await self.coll.delete_many({}) + await self.coll.create_index("a", unique=True) + self.addCleanup(self.coll.drop_index, [("a", 1)]) + + # Fail due to write concern support as well + # as duplicate key error on unordered batch. + requests: list = [ + InsertOne({"a": 1}), + UpdateOne({"a": 3}, {"$set": {"a": 3, "b": 1}}, upsert=True), + InsertOne({"a": 1}), + InsertOne({"a": 2}), + ] + try: + await self.cause_wtimeout(requests, ordered=False) + except BulkWriteError as exc: + details = exc.details + self.assertEqual(exc.code, 65) + else: + self.fail("Error not raised") + + self.assertEqual(2, details["nInserted"]) + self.assertEqual(1, details["nUpserted"]) + self.assertEqual(1, len(details["writeErrors"])) + # When talking to legacy servers there will be a + # write concern error for each operation. + self.assertTrue(len(details["writeConcernErrors"]) > 1) + + failed = details["writeErrors"][0] + self.assertEqual(2, failed["index"]) + self.assertEqual(11000, failed["code"]) + self.assertTrue(isinstance(failed["errmsg"], str)) + self.assertEqual(1, failed["op"]["a"]) + + failed = details["writeConcernErrors"][0] + self.assertEqual(64, failed["code"]) + self.assertTrue(isinstance(failed["errmsg"], str)) + + upserts = details["upserted"] + self.assertEqual(1, len(upserts)) + self.assertEqual(1, upserts[0]["index"]) + self.assertTrue(upserts[0].get("_id")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/synchro.py b/tools/synchro.py index 6fb7116747..758f20cbc7 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -154,6 +154,7 @@ "conftest.py", "pymongo_mocks.py", "utils_spec_runner.py", + "test_bulk.py", "test_client.py", "test_client_bulk_write.py", "test_collection.py", From 47fe209ec2760a675c96dafc0feccd48135f4bd8 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 3 Sep 2024 10:24:20 -0700 Subject: [PATCH 02/12] forgot to run pre-commit earlier --- test/asynchronous/test_bulk.py | 23 +++++---- test/test_bulk.py | 85 ++++++++++++++++++++++++---------- 2 files changed, 75 insertions(+), 33 deletions(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index 39b815b9d9..3987a70d10 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -34,6 +34,7 @@ from bson.binary import Binary, UuidRepresentation from bson.codec_options import CodecOptions from bson.objectid import ObjectId +from pymongo.asynchronous.collection import AsyncCollection from pymongo.common import partition_node from pymongo.errors import ( BulkWriteError, @@ -42,11 +43,11 @@ OperationFailure, ) from pymongo.operations import * -from pymongo.asynchronous.collection import AsyncCollection from pymongo.write_concern import WriteConcern _IS_SYNC = False + class AsyncBulkTestBase(AsyncIntegrationTest): coll: AsyncCollection coll_w0: AsyncCollection @@ -362,7 +363,9 @@ async def test_bulk_write_invalid_arguments(self): async def test_upsert_large(self): big = "a" * (await async_client_context.max_bson_size - 37) - result = await self.coll.bulk_write([UpdateOne({"x": 1}, {"$set": {"s": big}}, upsert=True)]) + result = await self.coll.bulk_write( + [UpdateOne({"x": 1}, {"$set": {"s": big}}, upsert=True)] + ) self.assertEqualResponse( { "nMatched": 0, @@ -843,7 +846,7 @@ async def predicate(): async def predicate(): return await self.coll.find_one({"_id": 1}) is None - + await async_wait_until(predicate, 'removed {"_id": 1}') async def test_no_results_ordered_failure(self): @@ -861,7 +864,7 @@ async def test_no_results_ordered_failure(self): async def predicate(): return await self.coll.count_documents({}) == 3 - + await async_wait_until(predicate, "insert 3 documents") self.assertEqual({"_id": 1}, await self.coll.find_one({"_id": 1})) @@ -877,12 +880,12 @@ async def test_no_results_unordered_success(self): async def predicate(): return await self.coll.count_documents({}) == 2 - + await async_wait_until(predicate, "insert 2 documents") async def predicate(): return await self.coll.find_one({"_id": 1}) is None - + await async_wait_until(predicate, 'removed {"_id": 1}') async def test_no_results_unordered_failure(self): @@ -900,12 +903,12 @@ async def test_no_results_unordered_failure(self): async def predicate(): return await self.coll.count_documents({}) == 2 - + await async_wait_until(predicate, "insert 2 documents") async def predicate(): return await self.coll.find_one({"_id": 1}) is None - + await async_wait_until(predicate, 'removed {"_id": 1}') @@ -1058,7 +1061,9 @@ async def test_write_concern_failure_ordered(self): async def test_write_concern_failure_unordered(self): # Ensure we don't raise on wnote. coll_ww = self.coll.with_options(write_concern=WriteConcern(w=self.w)) - result = await coll_ww.bulk_write([DeleteOne({"something": "that does no exist"})], ordered=False) + result = await coll_ww.bulk_write( + [DeleteOne({"something": "that does no exist"})], ordered=False + ) self.assertTrue(result.acknowledged) requests = [ diff --git a/test/test_bulk.py b/test/test_bulk.py index 663dfaf19c..6e4367fe10 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -44,6 +44,8 @@ from pymongo.synchronous.collection import Collection from pymongo.write_concern import WriteConcern +_IS_SYNC = True + class BulkTestBase(IntegrationTest): coll: Collection @@ -133,9 +135,10 @@ def assertEqualWriteError(self, expected, actual): self.assertEqual(expected_op, actual_op) -class TestBulk(BulkTestBase): +class SyncTestBulk(SyncBulkTestBase): def test_empty(self): - self.assertRaises(InvalidOperation, self.coll.bulk_write, []) + with self.assertRaises(InvalidOperation): + self.coll.bulk_write([]) def test_insert(self): expected = { @@ -180,15 +183,19 @@ def test_update_many_pipeline(self): self._test_update_many([{"$set": {"foo": "bar"}}]) def test_array_filters_validation(self): - self.assertRaises(TypeError, UpdateMany, {}, {}, array_filters={}) - self.assertRaises(TypeError, UpdateOne, {}, {}, array_filters={}) + with self.assertRaises(TypeError): + UpdateMany({}, {}, array_filters={}) + with self.assertRaises(TypeError): + UpdateOne({}, {}, array_filters={}) def test_array_filters_unacknowledged(self): coll = self.coll_w0 update_one = UpdateOne({}, {"$set": {"y.$[i].b": 5}}, array_filters=[{"i.b": 1}]) update_many = UpdateMany({}, {"$set": {"y.$[i].b": 5}}, array_filters=[{"i.b": 1}]) - self.assertRaises(ConfigurationError, coll.bulk_write, [update_one]) - self.assertRaises(ConfigurationError, coll.bulk_write, [update_many]) + with self.assertRaises(ConfigurationError): + coll.bulk_write([update_one]) + with self.assertRaises(ConfigurationError): + coll.bulk_write([update_many]) def _test_update_one(self, update): expected = { @@ -786,12 +793,12 @@ def test_large_inserts_unordered(self): self.assertEqual(6, self.coll.count_documents({})) -class BulkAuthorizationTestBase(BulkTestBase): +class SyncBulkAuthorizationTestBase(SyncBulkTestBase): @classmethod @client_context.require_auth @client_context.require_no_api_version - def setUpClass(cls): - super().setUpClass() + def _setup_class(cls): + super()._setup_class() def setUp(self): super().setUp() @@ -815,7 +822,7 @@ def tearDown(self): remove_all_users(self.db) -class TestBulkUnacknowledged(BulkTestBase): +class SyncTestBulkUnacknowledged(SyncBulkTestBase): def tearDown(self): self.coll.delete_many({}) @@ -828,8 +835,16 @@ def test_no_results_ordered_success(self): ] result = self.coll_w0.bulk_write(requests) self.assertFalse(result.acknowledged) - wait_until(lambda: self.coll.count_documents({}) == 2, "insert 2 documents") - wait_until(lambda: self.coll.find_one({"_id": 1}) is None, 'removed {"_id": 1}') + + def predicate(): + return self.coll.count_documents({}) == 2 + + wait_until(predicate, "insert 2 documents") + + def predicate(): + return self.coll.find_one({"_id": 1}) is None + + wait_until(predicate, 'removed {"_id": 1}') def test_no_results_ordered_failure(self): requests: list = [ @@ -843,7 +858,11 @@ def test_no_results_ordered_failure(self): ] result = self.coll_w0.bulk_write(requests) self.assertFalse(result.acknowledged) - wait_until(lambda: self.coll.count_documents({}) == 3, "insert 3 documents") + + def predicate(): + return self.coll.count_documents({}) == 3 + + wait_until(predicate, "insert 3 documents") self.assertEqual({"_id": 1}, self.coll.find_one({"_id": 1})) def test_no_results_unordered_success(self): @@ -855,8 +874,16 @@ def test_no_results_unordered_success(self): ] result = self.coll_w0.bulk_write(requests, ordered=False) self.assertFalse(result.acknowledged) - wait_until(lambda: self.coll.count_documents({}) == 2, "insert 2 documents") - wait_until(lambda: self.coll.find_one({"_id": 1}) is None, 'removed {"_id": 1}') + + def predicate(): + return self.coll.count_documents({}) == 2 + + wait_until(predicate, "insert 2 documents") + + def predicate(): + return self.coll.find_one({"_id": 1}) is None + + wait_until(predicate, 'removed {"_id": 1}') def test_no_results_unordered_failure(self): requests: list = [ @@ -870,11 +897,19 @@ def test_no_results_unordered_failure(self): ] result = self.coll_w0.bulk_write(requests, ordered=False) self.assertFalse(result.acknowledged) - wait_until(lambda: self.coll.count_documents({}) == 2, "insert 2 documents") - wait_until(lambda: self.coll.find_one({"_id": 1}) is None, 'removed {"_id": 1}') + + def predicate(): + return self.coll.count_documents({}) == 2 + + wait_until(predicate, "insert 2 documents") + + def predicate(): + return self.coll.find_one({"_id": 1}) is None + + wait_until(predicate, 'removed {"_id": 1}') -class TestBulkAuthorization(BulkAuthorizationTestBase): +class SyncTestBulkAuthorization(SyncBulkAuthorizationTestBase): def test_readonly(self): # We test that an authorization failure aborts the batch and is raised # as OperationFailure. @@ -883,7 +918,8 @@ def test_readonly(self): ) coll = cli.pymongo_test.test coll.find_one() - self.assertRaises(OperationFailure, coll.bulk_write, [InsertOne({"x": 1})]) + with self.assertRaises(OperationFailure): + coll.bulk_write([InsertOne({"x": 1})]) def test_no_remove(self): # We test that an authorization failure aborts the batch and is raised @@ -899,11 +935,12 @@ def test_no_remove(self): DeleteMany({}), # Prohibited. InsertOne({"x": 3}), # Never attempted. ] - self.assertRaises(OperationFailure, coll.bulk_write, requests) + with self.assertRaises(OperationFailure): + coll.bulk_write(requests) self.assertEqual({1, 2}, set(self.coll.distinct("x"))) -class TestBulkWriteConcern(BulkTestBase): +class SyncTestBulkWriteConcern(SyncBulkTestBase): w: Optional[int] secondary: MongoClient @@ -919,7 +956,7 @@ def setUpClass(cls): break @classmethod - def tearDownClass(cls): + def async_tearDownClass(cls): if cls.secondary: cls.secondary.close() @@ -929,13 +966,13 @@ def cause_wtimeout(self, requests, ordered): # Use the rsSyncApplyStop failpoint to pause replication on a # secondary which will cause a wtimeout error. - self.secondary.admin.command("configureFailPoint", "rsSyncApplyStop", mode="alwaysOn") + self.secondary.admin.command("configureFailPoint", "rsApplyStop", mode="alwaysOn") try: coll = self.coll.with_options(write_concern=WriteConcern(w=self.w, wtimeout=1)) return coll.bulk_write(requests, ordered=ordered) finally: - self.secondary.admin.command("configureFailPoint", "rsSyncApplyStop", mode="off") + self.secondary.admin.command("configureFailPoint", "rsApplyStop", mode="off") @client_context.require_replica_set @client_context.require_secondaries_count(1) From af0611a93bb4f74434435d08227dfd09c46115eb Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 3 Sep 2024 10:51:16 -0700 Subject: [PATCH 03/12] modified syncro.py to account for class names in test_bulk.py --- test/test_bulk.py | 10 +++++----- tools/synchro.py | 2 ++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/test/test_bulk.py b/test/test_bulk.py index 6e4367fe10..6dfbc102e2 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -135,7 +135,7 @@ def assertEqualWriteError(self, expected, actual): self.assertEqual(expected_op, actual_op) -class SyncTestBulk(SyncBulkTestBase): +class TestBulk(BulkTestBase): def test_empty(self): with self.assertRaises(InvalidOperation): self.coll.bulk_write([]) @@ -793,7 +793,7 @@ def test_large_inserts_unordered(self): self.assertEqual(6, self.coll.count_documents({})) -class SyncBulkAuthorizationTestBase(SyncBulkTestBase): +class BulkAuthorizationTestBase(BulkTestBase): @classmethod @client_context.require_auth @client_context.require_no_api_version @@ -822,7 +822,7 @@ def tearDown(self): remove_all_users(self.db) -class SyncTestBulkUnacknowledged(SyncBulkTestBase): +class TestBulkUnacknowledged(BulkTestBase): def tearDown(self): self.coll.delete_many({}) @@ -909,7 +909,7 @@ def predicate(): wait_until(predicate, 'removed {"_id": 1}') -class SyncTestBulkAuthorization(SyncBulkAuthorizationTestBase): +class TestBulkAuthorization(BulkAuthorizationTestBase): def test_readonly(self): # We test that an authorization failure aborts the batch and is raised # as OperationFailure. @@ -940,7 +940,7 @@ def test_no_remove(self): self.assertEqual({1, 2}, set(self.coll.distinct("x"))) -class SyncTestBulkWriteConcern(SyncBulkTestBase): +class TestBulkWriteConcern(BulkTestBase): w: Optional[int] secondary: MongoClient diff --git a/tools/synchro.py b/tools/synchro.py index 758f20cbc7..cc2ebec9af 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -46,6 +46,8 @@ "async_sendall": "sendall", "asynchronous": "synchronous", "Asynchronous": "Synchronous", + "AsyncBulkTestBase": "BulkTestBase", + "AsyncBulkAuthorizationTestBase": "BulkAuthorizationTestBase", "anext": "next", "_ALock": "_Lock", "_ACondition": "_Condition", From b5ded746f31bcee52bf8a14c001a71770a730b77 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 3 Sep 2024 11:40:22 -0700 Subject: [PATCH 04/12] fixed typing errors --- test/asynchronous/test_bulk.py | 6 +++--- test/test_bulk.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index 3987a70d10..fc5efc9de3 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -185,9 +185,9 @@ async def test_update_many_pipeline(self): async def test_array_filters_validation(self): with self.assertRaises(TypeError): - await UpdateMany({}, {}, array_filters={}) + await UpdateMany({}, {}, array_filters={}) # type: ignore[arg-type] with self.assertRaises(TypeError): - await UpdateOne({}, {}, array_filters={}) + await UpdateOne({}, {}, array_filters={}) # type: ignore[arg-type] async def test_array_filters_unacknowledged(self): coll = self.coll_w0 @@ -939,7 +939,7 @@ async def test_no_remove(self): InsertOne({"x": 3}), # Never attempted. ] with self.assertRaises(OperationFailure): - await coll.bulk_write(requests) + await coll.bulk_write(requests) # type: ignore[arg-type] self.assertEqual({1, 2}, set(await self.coll.distinct("x"))) diff --git a/test/test_bulk.py b/test/test_bulk.py index 6dfbc102e2..e079e38d4a 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -184,9 +184,9 @@ def test_update_many_pipeline(self): def test_array_filters_validation(self): with self.assertRaises(TypeError): - UpdateMany({}, {}, array_filters={}) + UpdateMany({}, {}, array_filters={}) # type: ignore[arg-type] with self.assertRaises(TypeError): - UpdateOne({}, {}, array_filters={}) + UpdateOne({}, {}, array_filters={}) # type: ignore[arg-type] def test_array_filters_unacknowledged(self): coll = self.coll_w0 @@ -936,7 +936,7 @@ def test_no_remove(self): InsertOne({"x": 3}), # Never attempted. ] with self.assertRaises(OperationFailure): - coll.bulk_write(requests) + coll.bulk_write(requests) # type: ignore[arg-type] self.assertEqual({1, 2}, set(self.coll.distinct("x"))) From 26e6057417610d8a8bd200d5c09cd8560ed98279 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 3 Sep 2024 15:05:38 -0700 Subject: [PATCH 05/12] fix rsSyncApplyStop by hardcoding it into synchro.py --- test/asynchronous/test_bulk.py | 2 +- test/test_bulk.py | 4 ++-- tools/synchro.py | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index fc5efc9de3..2dfa4c66a6 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -804,7 +804,7 @@ async def _setup_class(cls): await super()._setup_class() async def asyncSetUp(self): - await super().setUp() + super().setUp() await async_client_context.create_user(self.db.name, "readonly", "pw", ["read"]) await self.db.command( "createRole", diff --git a/test/test_bulk.py b/test/test_bulk.py index e079e38d4a..a6c7f75a37 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -966,13 +966,13 @@ def cause_wtimeout(self, requests, ordered): # Use the rsSyncApplyStop failpoint to pause replication on a # secondary which will cause a wtimeout error. - self.secondary.admin.command("configureFailPoint", "rsApplyStop", mode="alwaysOn") + self.secondary.admin.command("configureFailPoint", "rsSyncApplyStop", mode="alwaysOn") try: coll = self.coll.with_options(write_concern=WriteConcern(w=self.w, wtimeout=1)) return coll.bulk_write(requests, ordered=ordered) finally: - self.secondary.admin.command("configureFailPoint", "rsApplyStop", mode="off") + self.secondary.admin.command("configureFailPoint", "rsSyncApplyStop", mode="off") @client_context.require_replica_set @client_context.require_secondaries_count(1) diff --git a/tools/synchro.py b/tools/synchro.py index cc2ebec9af..d590710044 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -297,6 +297,8 @@ def translate_docstrings(lines: list[str]) -> list[str]: lines[i] = lines[i].replace(k, replacements[k]) if "Sync" in lines[i] and "Synchronous" not in lines[i] and replacements[k] in lines[i]: lines[i] = lines[i].replace("Sync", "") + if "rsApplyStop" in lines[i]: + lines[i] = lines[i].replace("rsApplyStop", "rsSyncApplyStop") if "async for" in lines[i] or "async with" in lines[i] or "async def" in lines[i]: lines[i] = lines[i].replace("async ", "") if "await " in lines[i] and "tailable" not in lines[i]: From 4a8a0a7b165025fe081a82874ccf15dfdcb3a4a0 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 3 Sep 2024 15:30:10 -0700 Subject: [PATCH 06/12] change AsyncTestBulkWriteConcern's setupClass to async --- test/asynchronous/test_bulk.py | 8 ++++---- test/test_bulk.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index 2dfa4c66a6..10dd92ef4d 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -948,13 +948,13 @@ class AsyncTestBulkWriteConcern(AsyncBulkTestBase): secondary: AsyncMongoClient @classmethod - def setUpClass(cls): - super().setUpClass() + async def _setup_class(cls): + await super()._setup_class() cls.w = async_client_context.w cls.secondary = None if cls.w is not None and cls.w > 1: - for member in async_client_context.hello["hosts"]: - if member != async_client_context.hello["primary"]: + for member in await async_client_context.hello["hosts"]: + if member != await async_client_context.hello["primary"]: cls.secondary = single_client(*partition_node(member)) break diff --git a/test/test_bulk.py b/test/test_bulk.py index a6c7f75a37..ac01c831fc 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -945,8 +945,8 @@ class TestBulkWriteConcern(BulkTestBase): secondary: MongoClient @classmethod - def setUpClass(cls): - super().setUpClass() + def _setup_class(cls): + super()._setup_class() cls.w = client_context.w cls.secondary = None if cls.w is not None and cls.w > 1: From 4c74f3e2a406499655d9c6b9528949693da8ff63 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 3 Sep 2024 16:23:12 -0700 Subject: [PATCH 07/12] changed AsyncBulkTestBase's setup class to async --- test/asynchronous/test_bulk.py | 4 ++-- test/test_bulk.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index 10dd92ef4d..b8e49d4bca 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -53,8 +53,8 @@ class AsyncBulkTestBase(AsyncIntegrationTest): coll_w0: AsyncCollection @classmethod - def setUpClass(cls): - super().setUpClass() + async def _setup_class(cls): + await super()._setup_class() cls.coll = cls.db.test cls.coll_w0 = cls.coll.with_options(write_concern=WriteConcern(w=0)) diff --git a/test/test_bulk.py b/test/test_bulk.py index ac01c831fc..bfc35fc049 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -52,8 +52,8 @@ class BulkTestBase(IntegrationTest): coll_w0: Collection @classmethod - def setUpClass(cls): - super().setUpClass() + def _setup_class(cls): + super()._setup_class() cls.coll = cls.db.test cls.coll_w0 = cls.coll.with_options(write_concern=WriteConcern(w=0)) From e6e443828839ce14d0c20f1315894785ee226626 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 3 Sep 2024 16:55:40 -0700 Subject: [PATCH 08/12] fix that hello isn't properly awaited attempt 2? --- test/asynchronous/test_bulk.py | 4 ++-- test/test_bulk.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index b8e49d4bca..f29e677bcc 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -953,8 +953,8 @@ async def _setup_class(cls): cls.w = async_client_context.w cls.secondary = None if cls.w is not None and cls.w > 1: - for member in await async_client_context.hello["hosts"]: - if member != await async_client_context.hello["primary"]: + for member in (await async_client_context).hello["hosts"]: + if member != (await async_client_context).hello["primary"]: cls.secondary = single_client(*partition_node(member)) break diff --git a/test/test_bulk.py b/test/test_bulk.py index bfc35fc049..895ac07a89 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -950,8 +950,8 @@ def _setup_class(cls): cls.w = client_context.w cls.secondary = None if cls.w is not None and cls.w > 1: - for member in client_context.hello["hosts"]: - if member != client_context.hello["primary"]: + for member in (client_context).hello["hosts"]: + if member != (client_context).hello["primary"]: cls.secondary = single_client(*partition_node(member)) break From 0001955e934ad610c1c0bd3a80804b454b402293 Mon Sep 17 00:00:00 2001 From: Iris Date: Tue, 3 Sep 2024 17:37:45 -0700 Subject: [PATCH 09/12] fix - hello isn't properly awaited attempt 3? --- test/asynchronous/test_bulk.py | 4 ++-- test/test_bulk.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index f29e677bcc..c4faa9be0b 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -953,8 +953,8 @@ async def _setup_class(cls): cls.w = async_client_context.w cls.secondary = None if cls.w is not None and cls.w > 1: - for member in (await async_client_context).hello["hosts"]: - if member != (await async_client_context).hello["primary"]: + for member in (await async_client_context.hello)["hosts"]: + if member != (await async_client_context.hello)["primary"]: cls.secondary = single_client(*partition_node(member)) break diff --git a/test/test_bulk.py b/test/test_bulk.py index 895ac07a89..9069109cfa 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -950,8 +950,8 @@ def _setup_class(cls): cls.w = client_context.w cls.secondary = None if cls.w is not None and cls.w > 1: - for member in (client_context).hello["hosts"]: - if member != (client_context).hello["primary"]: + for member in (client_context.hello)["hosts"]: + if member != (client_context.hello)["primary"]: cls.secondary = single_client(*partition_node(member)) break From 5759fbef539046e5223f10ffcedaeb783d0fd53f Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 4 Sep 2024 07:59:15 -0700 Subject: [PATCH 10/12] fix - hello isn't properly awaited attempt 4? --- test/asynchronous/test_bulk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index c4faa9be0b..49db9aedeb 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -950,7 +950,7 @@ class AsyncTestBulkWriteConcern(AsyncBulkTestBase): @classmethod async def _setup_class(cls): await super()._setup_class() - cls.w = async_client_context.w + cls.w = await async_client_context.w cls.secondary = None if cls.w is not None and cls.w > 1: for member in (await async_client_context.hello)["hosts"]: From 2470a8e4b5f171ebce7c760e6af4ab297c59f65f Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 4 Sep 2024 08:24:04 -0700 Subject: [PATCH 11/12] fix - hello isn't properly awaited attempt 5? --- test/__init__.py | 5 ++++- test/asynchronous/__init__.py | 5 ++++- test/asynchronous/test_bulk.py | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/test/__init__.py b/test/__init__.py index 2a23ae0fd3..c3a63dd20d 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -569,7 +569,10 @@ def require_secondaries_count(self, count): def sec_count(): return 0 if not self.client else len(self.client.secondaries) - return self._require(lambda: sec_count() >= count, "Not enough secondaries available") + def check(): + return sec_count() >= count + + return self._require(check, "Not enough secondaries available") @property def supports_secondary_read_pref(self): diff --git a/test/asynchronous/__init__.py b/test/asynchronous/__init__.py index 3d22b5ff76..95be18cda6 100644 --- a/test/asynchronous/__init__.py +++ b/test/asynchronous/__init__.py @@ -571,7 +571,10 @@ def require_secondaries_count(self, count): async def sec_count(): return 0 if not self.client else len(await self.client.secondaries) - return self._require(lambda: sec_count() >= count, "Not enough secondaries available") + async def check(): + return await sec_count() >= count + + return self._require(check, "Not enough secondaries available") @property async def supports_secondary_read_pref(self): diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index 49db9aedeb..c4faa9be0b 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -950,7 +950,7 @@ class AsyncTestBulkWriteConcern(AsyncBulkTestBase): @classmethod async def _setup_class(cls): await super()._setup_class() - cls.w = await async_client_context.w + cls.w = async_client_context.w cls.secondary = None if cls.w is not None and cls.w > 1: for member in (await async_client_context.hello)["hosts"]: From dad61db68f4005ab19b5d753dfe9b685175a218d Mon Sep 17 00:00:00 2001 From: Iris Date: Wed, 4 Sep 2024 10:00:32 -0700 Subject: [PATCH 12/12] fix - hello isn't properly awaited attempt 6? --- test/asynchronous/test_bulk.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/asynchronous/test_bulk.py b/test/asynchronous/test_bulk.py index c4faa9be0b..24111ad7c0 100644 --- a/test/asynchronous/test_bulk.py +++ b/test/asynchronous/test_bulk.py @@ -25,10 +25,9 @@ from test.asynchronous import AsyncIntegrationTest, async_client_context, remove_all_users, unittest from test.utils import ( + async_rs_or_single_client_noauth, async_wait_until, - rs_or_single_client_noauth, single_client, - wait_until, ) from bson.binary import Binary, UuidRepresentation @@ -916,7 +915,7 @@ class AsyncTestBulkAuthorization(AsyncBulkAuthorizationTestBase): async def test_readonly(self): # We test that an authorization failure aborts the batch and is raised # as OperationFailure. - cli = rs_or_single_client_noauth( + cli = await async_rs_or_single_client_noauth( username="readonly", password="pw", authSource="pymongo_test" ) coll = cli.pymongo_test.test @@ -927,7 +926,7 @@ async def test_readonly(self): async def test_no_remove(self): # We test that an authorization failure aborts the batch and is raised # as OperationFailure. - cli = rs_or_single_client_noauth( + cli = await async_rs_or_single_client_noauth( username="noremove", password="pw", authSource="pymongo_test" ) coll = cli.pymongo_test.test