diff --git a/test/asynchronous/test_monitoring.py b/test/asynchronous/test_monitoring.py new file mode 100644 index 0000000000..3f6563ee56 --- /dev/null +++ b/test/asynchronous/test_monitoring.py @@ -0,0 +1,1280 @@ +# Copyright 2015-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import asyncio +import copy +import datetime +import sys +import time +from typing import Any + +sys.path[0:0] = [""] + +from test.asynchronous import ( + AsyncIntegrationTest, + async_client_context, + client_knobs, + sanitize_cmd, + unittest, +) +from test.utils import ( + EventListener, + async_rs_or_single_client, + async_single_client, + async_wait_until, +) + +from bson.int64 import Int64 +from bson.objectid import ObjectId +from bson.son import SON +from pymongo import CursorType, DeleteOne, InsertOne, UpdateOne, monitoring +from pymongo.asynchronous.command_cursor import AsyncCommandCursor +from pymongo.asynchronous.helpers import anext +from pymongo.errors import AutoReconnect, NotPrimaryError, OperationFailure +from pymongo.read_preferences import ReadPreference +from pymongo.write_concern import WriteConcern + +_IS_SYNC = False + + +class AsyncTestCommandMonitoring(AsyncIntegrationTest): + listener: EventListener + + @classmethod + @async_client_context.require_connection + async def _setup_class(cls): + await super()._setup_class() + cls.listener = EventListener() + cls.client = await async_rs_or_single_client( + event_listeners=[cls.listener], retryWrites=False + ) + + @classmethod + async def _tearDown_class(cls): + await cls.client.close() + await super()._tearDown_class() + + async def asyncTearDown(self): + self.listener.reset() + await super().asyncTearDown() + + async def test_started_simple(self): + await self.client.pymongo_test.command("ping") + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand(SON([("ping", 1)]), started.command) + self.assertEqual("ping", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + + async def test_succeeded_simple(self): + await self.client.pymongo_test.command("ping") + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertEqual("ping", succeeded.command_name) + self.assertEqual(await self.client.address, succeeded.connection_id) + self.assertEqual(1, succeeded.reply.get("ok")) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + + async def test_failed_simple(self): + try: + await self.client.pymongo_test.command("oops!") + except OperationFailure: + pass + started = self.listener.started_events[0] + failed = self.listener.failed_events[0] + self.assertEqual(0, len(self.listener.succeeded_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent)) + self.assertEqual("oops!", failed.command_name) + self.assertEqual(await self.client.address, failed.connection_id) + self.assertEqual(0, failed.failure.get("ok")) + self.assertTrue(isinstance(failed.request_id, int)) + self.assertTrue(isinstance(failed.duration_micros, int)) + + async def test_find_one(self): + await self.client.pymongo_test.test.find_one() + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("find", "test"), ("filter", {}), ("limit", 1), ("singleBatch", True)]), + started.command, + ) + self.assertEqual("find", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + + async def test_find_and_get_more(self): + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_many([{} for _ in range(10)]) + self.listener.reset() + cursor = self.client.pymongo_test.test.find(projection={"_id": False}, batch_size=4) + for _ in range(4): + await anext(cursor) + cursor_id = cursor.cursor_id + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON( + [("find", "test"), ("filter", {}), ("projection", {"_id": False}), ("batchSize", 4)] + ), + started.command, + ) + self.assertEqual("find", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("find", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + csr = succeeded.reply["cursor"] + self.assertEqual(csr["id"], cursor_id) + self.assertEqual(csr["ns"], "pymongo_test.test") + self.assertEqual(csr["firstBatch"], [{} for _ in range(4)]) + + self.listener.reset() + # Next batch. Exhausting the cursor could cause a getMore + # that returns id of 0 and no results. + await anext(cursor) + try: + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 4)]), + started.command, + ) + self.assertEqual("getMore", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("getMore", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + csr = succeeded.reply["cursor"] + self.assertEqual(csr["id"], cursor_id) + self.assertEqual(csr["ns"], "pymongo_test.test") + self.assertEqual(csr["nextBatch"], [{} for _ in range(4)]) + finally: + # Exhaust the cursor to avoid kill cursors. + tuple(await cursor.to_list()) + + async def test_find_with_explain(self): + cmd = SON([("explain", SON([("find", "test"), ("filter", {})]))]) + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_one({}) + self.listener.reset() + coll = self.client.pymongo_test.test + # Test that we publish the unwrapped command. + if await self.client.is_mongos: + coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED) + res = await coll.find().explain() + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand(cmd, started.command) + self.assertEqual("explain", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("explain", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(await self.client.address, succeeded.connection_id) + self.assertEqual(res, succeeded.reply) + + async def _test_find_options(self, query, expected_cmd): + coll = self.client.pymongo_test.test + await coll.drop() + await coll.create_index("x") + await coll.insert_many([{"x": i} for i in range(5)]) + + # Test that we publish the unwrapped command. + self.listener.reset() + if await self.client.is_mongos: + coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED) + + cursor = coll.find(**query) + + await anext(cursor) + try: + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand(expected_cmd, started.command) + self.assertEqual("find", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("find", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(await self.client.address, succeeded.connection_id) + finally: + # Exhaust the cursor to avoid kill cursors. + tuple(await cursor.to_list()) + + async def test_find_options(self): + query = { + "filter": {}, + "hint": [("x", 1)], + "max_time_ms": 10000, + "max": {"x": 10}, + "min": {"x": -10}, + "return_key": True, + "show_record_id": True, + "projection": {"x": False}, + "skip": 1, + "no_cursor_timeout": True, + "sort": [("_id", 1)], + "allow_partial_results": True, + "comment": "this is a test", + "batch_size": 2, + } + + cmd = { + "find": "test", + "filter": {}, + "hint": SON([("x", 1)]), + "comment": "this is a test", + "maxTimeMS": 10000, + "max": {"x": 10}, + "min": {"x": -10}, + "returnKey": True, + "showRecordId": True, + "sort": SON([("_id", 1)]), + "projection": {"x": False}, + "skip": 1, + "batchSize": 2, + "noCursorTimeout": True, + "allowPartialResults": True, + } + + if async_client_context.version < (4, 1, 0, -1): + query["max_scan"] = 10 + cmd["maxScan"] = 10 + + await self._test_find_options(query, cmd) + + @async_client_context.require_version_max(3, 7, 2) + async def test_find_snapshot(self): + # Test "snapshot" parameter separately, can't combine with "sort". + query = {"filter": {}, "snapshot": True} + + cmd = {"find": "test", "filter": {}, "snapshot": True} + + await self._test_find_options(query, cmd) + + async def test_command_and_get_more(self): + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_many([{"x": 1} for _ in range(10)]) + self.listener.reset() + coll = self.client.pymongo_test.test + # Test that we publish the unwrapped command. + if await self.client.is_mongos: + coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED) + cursor = await coll.aggregate([{"$project": {"_id": False, "x": 1}}], batchSize=4) + for _ in range(4): + await anext(cursor) + cursor_id = cursor.cursor_id + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON( + [ + ("aggregate", "test"), + ("pipeline", [{"$project": {"_id": False, "x": 1}}]), + ("cursor", {"batchSize": 4}), + ] + ), + started.command, + ) + self.assertEqual("aggregate", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("aggregate", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + expected_cursor = { + "id": cursor_id, + "ns": "pymongo_test.test", + "firstBatch": [{"x": 1} for _ in range(4)], + } + self.assertEqualCommand(expected_cursor, succeeded.reply.get("cursor")) + + self.listener.reset() + await anext(cursor) + try: + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 4)]), + started.command, + ) + self.assertEqual("getMore", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("getMore", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + expected_result = { + "cursor": { + "id": cursor_id, + "ns": "pymongo_test.test", + "nextBatch": [{"x": 1} for _ in range(4)], + }, + "ok": 1.0, + } + self.assertEqualReply(expected_result, succeeded.reply) + finally: + # Exhaust the cursor to avoid kill cursors. + tuple(await cursor.to_list()) + + async def test_get_more_failure(self): + address = await self.client.address + coll = self.client.pymongo_test.test + cursor_id = Int64(12345) + cursor_doc = {"id": cursor_id, "firstBatch": [], "ns": coll.full_name} + cursor = AsyncCommandCursor(coll, cursor_doc, address) + try: + await anext(cursor) + except Exception: + pass + started = self.listener.started_events[0] + self.assertEqual(0, len(self.listener.succeeded_events)) + failed = self.listener.failed_events[0] + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("getMore", cursor_id), ("collection", "test")]), started.command + ) + self.assertEqual("getMore", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent)) + self.assertTrue(isinstance(failed.duration_micros, int)) + self.assertEqual("getMore", failed.command_name) + self.assertTrue(isinstance(failed.request_id, int)) + self.assertEqual(cursor.address, failed.connection_id) + self.assertEqual(0, failed.failure.get("ok")) + + @async_client_context.require_replica_set + @async_client_context.require_secondaries_count(1) + async def test_not_primary_error(self): + address = next(iter(await async_client_context.client.secondaries)) + client = await async_single_client(*address, event_listeners=[self.listener]) + # Clear authentication command results from the listener. + await client.admin.command("ping") + self.listener.reset() + error = None + try: + await client.pymongo_test.test.find_one_and_delete({}) + except NotPrimaryError as exc: + error = exc.errors + started = self.listener.started_events[0] + failed = self.listener.failed_events[0] + self.assertEqual(0, len(self.listener.succeeded_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent)) + self.assertEqual("findAndModify", failed.command_name) + self.assertEqual(address, failed.connection_id) + self.assertEqual(0, failed.failure.get("ok")) + self.assertTrue(isinstance(failed.request_id, int)) + self.assertTrue(isinstance(failed.duration_micros, int)) + self.assertEqual(error, failed.failure) + + @async_client_context.require_no_mongos + async def test_exhaust(self): + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_many([{} for _ in range(11)]) + self.listener.reset() + cursor = self.client.pymongo_test.test.find( + projection={"_id": False}, batch_size=5, cursor_type=CursorType.EXHAUST + ) + await anext(cursor) + cursor_id = cursor.cursor_id + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON( + [("find", "test"), ("filter", {}), ("projection", {"_id": False}), ("batchSize", 5)] + ), + started.command, + ) + self.assertEqual("find", started.command_name) + self.assertEqual(cursor.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("find", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertEqual(cursor.address, succeeded.connection_id) + expected_result = { + "cursor": { + "id": cursor_id, + "ns": "pymongo_test.test", + "firstBatch": [{} for _ in range(5)], + }, + "ok": 1, + } + self.assertEqualReply(expected_result, succeeded.reply) + + self.listener.reset() + tuple(await cursor.to_list()) + self.assertEqual(0, len(self.listener.failed_events)) + for event in self.listener.started_events: + self.assertTrue(isinstance(event, monitoring.CommandStartedEvent)) + self.assertEqualCommand( + SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 5)]), + event.command, + ) + self.assertEqual("getMore", event.command_name) + self.assertEqual(cursor.address, event.connection_id) + self.assertEqual("pymongo_test", event.database_name) + self.assertTrue(isinstance(event.request_id, int)) + for event in self.listener.succeeded_events: + self.assertTrue(isinstance(event, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(event.duration_micros, int)) + self.assertEqual("getMore", event.command_name) + self.assertTrue(isinstance(event.request_id, int)) + self.assertEqual(cursor.address, event.connection_id) + # Last getMore receives a response with cursor id 0. + self.assertEqual(0, self.listener.succeeded_events[-1].reply["cursor"]["id"]) + + async def test_kill_cursors(self): + with client_knobs(kill_cursor_frequency=0.01): + await self.client.pymongo_test.test.drop() + await self.client.pymongo_test.test.insert_many([{} for _ in range(10)]) + cursor = self.client.pymongo_test.test.find().batch_size(5) + await anext(cursor) + cursor_id = cursor.cursor_id + self.listener.reset() + await cursor.close() + await asyncio.sleep(2) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + # There could be more than one cursor_id here depending on + # when the thread last ran. + self.assertIn(cursor_id, started.command["cursors"]) + self.assertEqual("killCursors", started.command_name) + self.assertIs(type(started.connection_id), tuple) + self.assertEqual(cursor.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(succeeded.duration_micros, int)) + self.assertEqual("killCursors", succeeded.command_name) + self.assertTrue(isinstance(succeeded.request_id, int)) + self.assertIs(type(succeeded.connection_id), tuple) + self.assertEqual(cursor.address, succeeded.connection_id) + # There could be more than one cursor_id here depending on + # when the thread last ran. + self.assertTrue( + cursor_id in succeeded.reply["cursorsUnknown"] + or cursor_id in succeeded.reply["cursorsKilled"] + ) + + async def test_non_bulk_writes(self): + coll = self.client.pymongo_test.test + await coll.drop() + self.listener.reset() + + # Implied write concern insert_one + res = await coll.insert_one({"x": 1}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("insert", coll.name), + ("ordered", True), + ("documents", [{"_id": res.inserted_id, "x": 1}]), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("insert", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + # Unacknowledged insert_one + self.listener.reset() + coll = coll.with_options(write_concern=WriteConcern(w=0)) + res = await coll.insert_one({"x": 1}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("insert", coll.name), + ("ordered", True), + ("documents", [{"_id": res.inserted_id, "x": 1}]), + ("writeConcern", {"w": 0}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("insert", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + self.assertEqualReply(succeeded.reply, {"ok": 1}) + + # Explicit write concern insert_one + self.listener.reset() + coll = coll.with_options(write_concern=WriteConcern(w=1)) + res = await coll.insert_one({"x": 1}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("insert", coll.name), + ("ordered", True), + ("documents", [{"_id": res.inserted_id, "x": 1}]), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("insert", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + # delete_many + self.listener.reset() + res = await coll.delete_many({"x": 1}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("delete", coll.name), + ("ordered", True), + ("deletes", [SON([("q", {"x": 1}), ("limit", 0)])]), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("delete", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(res.deleted_count, reply.get("n")) + + # replace_one + self.listener.reset() + oid = ObjectId() + res = await coll.replace_one({"_id": oid}, {"_id": oid, "x": 1}, upsert=True) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("update", coll.name), + ("ordered", True), + ( + "updates", + [ + SON( + [ + ("q", {"_id": oid}), + ("u", {"_id": oid, "x": 1}), + ("multi", False), + ("upsert", True), + ] + ) + ], + ), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("update", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + self.assertEqual([{"index": 0, "_id": oid}], reply.get("upserted")) + + # update_one + self.listener.reset() + res = await coll.update_one({"x": 1}, {"$inc": {"x": 1}}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("update", coll.name), + ("ordered", True), + ( + "updates", + [ + SON( + [ + ("q", {"x": 1}), + ("u", {"$inc": {"x": 1}}), + ("multi", False), + ("upsert", False), + ] + ) + ], + ), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("update", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + # update_many + self.listener.reset() + res = await coll.update_many({"x": 2}, {"$inc": {"x": 1}}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("update", coll.name), + ("ordered", True), + ( + "updates", + [ + SON( + [ + ("q", {"x": 2}), + ("u", {"$inc": {"x": 1}}), + ("multi", True), + ("upsert", False), + ] + ) + ], + ), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("update", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + # delete_one + self.listener.reset() + _ = await coll.delete_one({"x": 3}) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("delete", coll.name), + ("ordered", True), + ("deletes", [SON([("q", {"x": 3}), ("limit", 1)])]), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("delete", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(1, reply.get("n")) + + self.assertEqual(0, await coll.count_documents({})) + + # write errors + await coll.insert_one({"_id": 1}) + try: + self.listener.reset() + await coll.insert_one({"_id": 1}) + except OperationFailure: + pass + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON( + [ + ("insert", coll.name), + ("ordered", True), + ("documents", [{"_id": 1}]), + ("writeConcern", {"w": 1}), + ] + ) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("insert", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + reply = succeeded.reply + self.assertEqual(1, reply.get("ok")) + self.assertEqual(0, reply.get("n")) + errors = reply.get("writeErrors") + self.assertIsInstance(errors, list) + error = errors[0] + self.assertEqual(0, error.get("index")) + self.assertIsInstance(error.get("code"), int) + self.assertIsInstance(error.get("errmsg"), str) + + async def test_insert_many(self): + # This always uses the bulk API. + coll = self.client.pymongo_test.test + await coll.drop() + self.listener.reset() + + big = "x" * (1024 * 1024 * 4) + docs = [{"_id": i, "big": big} for i in range(6)] + await coll.insert_many(docs) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) + documents = [] + count = 0 + operation_id = started[0].operation_id + self.assertIsInstance(operation_id, int) + for start, succeed in zip(started, succeeded): + self.assertIsInstance(start, monitoring.CommandStartedEvent) + cmd = sanitize_cmd(start.command) + self.assertEqual(["insert", "ordered", "documents"], list(cmd.keys())) + self.assertEqual(coll.name, cmd["insert"]) + self.assertIs(True, cmd["ordered"]) + documents.extend(cmd["documents"]) + self.assertEqual("pymongo_test", start.database_name) + self.assertEqual("insert", start.command_name) + self.assertIsInstance(start.request_id, int) + self.assertEqual(await self.client.address, start.connection_id) + self.assertIsInstance(succeed, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeed.duration_micros, int) + self.assertEqual(start.command_name, succeed.command_name) + self.assertEqual(start.request_id, succeed.request_id) + self.assertEqual(start.connection_id, succeed.connection_id) + self.assertEqual(start.operation_id, operation_id) + self.assertEqual(succeed.operation_id, operation_id) + reply = succeed.reply + self.assertEqual(1, reply.get("ok")) + count += reply.get("n", 0) + self.assertEqual(documents, docs) + self.assertEqual(6, count) + + async def test_insert_many_unacknowledged(self): + coll = self.client.pymongo_test.test + await coll.drop() + unack_coll = coll.with_options(write_concern=WriteConcern(w=0)) + self.listener.reset() + + # Force two batches on legacy servers. + big = "x" * (1024 * 1024 * 12) + docs = [{"_id": i, "big": big} for i in range(6)] + await unack_coll.insert_many(docs) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) + documents = [] + operation_id = started[0].operation_id + self.assertIsInstance(operation_id, int) + for start, succeed in zip(started, succeeded): + self.assertIsInstance(start, monitoring.CommandStartedEvent) + cmd = sanitize_cmd(start.command) + cmd.pop("writeConcern", None) + self.assertEqual(["insert", "ordered", "documents"], list(cmd.keys())) + self.assertEqual(coll.name, cmd["insert"]) + self.assertIs(True, cmd["ordered"]) + documents.extend(cmd["documents"]) + self.assertEqual("pymongo_test", start.database_name) + self.assertEqual("insert", start.command_name) + self.assertIsInstance(start.request_id, int) + self.assertEqual(await self.client.address, start.connection_id) + self.assertIsInstance(succeed, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeed.duration_micros, int) + self.assertEqual(start.command_name, succeed.command_name) + self.assertEqual(start.request_id, succeed.request_id) + self.assertEqual(start.connection_id, succeed.connection_id) + self.assertEqual(start.operation_id, operation_id) + self.assertEqual(succeed.operation_id, operation_id) + self.assertEqual(1, succeed.reply.get("ok")) + self.assertEqual(documents, docs) + + async def check(): + return await coll.count_documents({}) == 6 + + await async_wait_until(check, "insert documents with w=0") + + async def test_bulk_write(self): + coll = self.client.pymongo_test.test + await coll.drop() + self.listener.reset() + + await coll.bulk_write( + [ + InsertOne({"_id": 1}), + UpdateOne({"_id": 1}, {"$set": {"x": 1}}), + DeleteOne({"_id": 1}), + ] + ) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) + operation_id = started[0].operation_id + pairs = list(zip(started, succeeded)) + self.assertEqual(3, len(pairs)) + for start, succeed in pairs: + self.assertIsInstance(start, monitoring.CommandStartedEvent) + self.assertEqual("pymongo_test", start.database_name) + self.assertIsInstance(start.request_id, int) + self.assertEqual(await self.client.address, start.connection_id) + self.assertIsInstance(succeed, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeed.duration_micros, int) + self.assertEqual(start.command_name, succeed.command_name) + self.assertEqual(start.request_id, succeed.request_id) + self.assertEqual(start.connection_id, succeed.connection_id) + self.assertEqual(start.operation_id, operation_id) + self.assertEqual(succeed.operation_id, operation_id) + + expected = SON([("insert", coll.name), ("ordered", True), ("documents", [{"_id": 1}])]) + self.assertEqualCommand(expected, started[0].command) + expected = SON( + [ + ("update", coll.name), + ("ordered", True), + ( + "updates", + [ + SON( + [ + ("q", {"_id": 1}), + ("u", {"$set": {"x": 1}}), + ("multi", False), + ("upsert", False), + ] + ) + ], + ), + ] + ) + self.assertEqualCommand(expected, started[1].command) + expected = SON( + [ + ("delete", coll.name), + ("ordered", True), + ("deletes", [SON([("q", {"_id": 1}), ("limit", 1)])]), + ] + ) + self.assertEqualCommand(expected, started[2].command) + + @async_client_context.require_failCommand_fail_point + async def test_bulk_write_command_network_error(self): + coll = self.client.pymongo_test.test + self.listener.reset() + + insert_network_error = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "closeConnection": True, + }, + } + async with self.fail_point(insert_network_error): + with self.assertRaises(AutoReconnect): + await coll.bulk_write([InsertOne({"_id": 1})]) + failed = self.listener.failed_events + self.assertEqual(1, len(failed)) + event = failed[0] + self.assertEqual(event.command_name, "insert") + self.assertIsInstance(event.failure, dict) + self.assertEqual(event.failure["errtype"], "AutoReconnect") + self.assertTrue(event.failure["errmsg"]) + + @async_client_context.require_failCommand_fail_point + async def test_bulk_write_command_error(self): + coll = self.client.pymongo_test.test + self.listener.reset() + + insert_command_error = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["insert"], + "await acloseAsyncConnection": False, + "errorCode": 10107, # Not primary + }, + } + async with self.fail_point(insert_command_error): + with self.assertRaises(NotPrimaryError): + await coll.bulk_write([InsertOne({"_id": 1})]) + failed = self.listener.failed_events + self.assertEqual(1, len(failed)) + event = failed[0] + self.assertEqual(event.command_name, "insert") + self.assertIsInstance(event.failure, dict) + self.assertEqual(event.failure["code"], 10107) + self.assertTrue(event.failure["errmsg"]) + + async def test_write_errors(self): + coll = self.client.pymongo_test.test + await coll.drop() + self.listener.reset() + + try: + await coll.bulk_write( + [ + InsertOne({"_id": 1}), + InsertOne({"_id": 1}), + InsertOne({"_id": 1}), + DeleteOne({"_id": 1}), + ], + ordered=False, + ) + except OperationFailure: + pass + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) + operation_id = started[0].operation_id + pairs = list(zip(started, succeeded)) + errors = [] + for start, succeed in pairs: + self.assertIsInstance(start, monitoring.CommandStartedEvent) + self.assertEqual("pymongo_test", start.database_name) + self.assertIsInstance(start.request_id, int) + self.assertEqual(await self.client.address, start.connection_id) + self.assertIsInstance(succeed, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeed.duration_micros, int) + self.assertEqual(start.command_name, succeed.command_name) + self.assertEqual(start.request_id, succeed.request_id) + self.assertEqual(start.connection_id, succeed.connection_id) + self.assertEqual(start.operation_id, operation_id) + self.assertEqual(succeed.operation_id, operation_id) + if "writeErrors" in succeed.reply: + errors.extend(succeed.reply["writeErrors"]) + + self.assertEqual(2, len(errors)) + fields = {"index", "code", "errmsg"} + for error in errors: + self.assertTrue(fields.issubset(set(error))) + + async def test_first_batch_helper(self): + # Regardless of server version and use of helpers._first_batch + # this test should still pass. + self.listener.reset() + tuple(await (await self.client.pymongo_test.test.list_indexes()).to_list()) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + expected = SON([("listIndexes", "test"), ("cursor", {})]) + self.assertEqualCommand(expected, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("listIndexes", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertIsInstance(succeeded.duration_micros, int) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + self.assertTrue("cursor" in succeeded.reply) + self.assertTrue("ok" in succeeded.reply) + + self.listener.reset() + + @async_client_context.require_version_max(6, 1, 99) + async def test_sensitive_commands(self): + listeners = self.client._event_listeners + + self.listener.reset() + cmd = SON([("getnonce", 1)]) + listeners.publish_command_start(cmd, "pymongo_test", 12345, await self.client.address, None) # type: ignore[arg-type] + delta = datetime.timedelta(milliseconds=100) + listeners.publish_command_success( + delta, + {"nonce": "e474f4561c5eb40b", "ok": 1.0}, + "getnonce", + 12345, + await self.client.address, # type: ignore[arg-type] + None, + database_name="pymongo_test", + ) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertIsInstance(started, monitoring.CommandStartedEvent) + self.assertEqual({}, started.command) + self.assertEqual("pymongo_test", started.database_name) + self.assertEqual("getnonce", started.command_name) + self.assertIsInstance(started.request_id, int) + self.assertEqual(await self.client.address, started.connection_id) + self.assertIsInstance(succeeded, monitoring.CommandSucceededEvent) + self.assertEqual(succeeded.duration_micros, 100000) + self.assertEqual(started.command_name, succeeded.command_name) + self.assertEqual(started.request_id, succeeded.request_id) + self.assertEqual(started.connection_id, succeeded.connection_id) + self.assertEqual({}, succeeded.reply) + + +class AsyncTestGlobalListener(AsyncIntegrationTest): + listener: EventListener + saved_listeners: Any + + @classmethod + @async_client_context.require_connection + async def _setup_class(cls): + await super()._setup_class() + cls.listener = EventListener() + # We plan to call register(), which internally modifies _LISTENERS. + cls.saved_listeners = copy.deepcopy(monitoring._LISTENERS) + monitoring.register(cls.listener) + cls.client = await async_single_client() + # Get one (authenticated) socket in the pool. + await cls.client.pymongo_test.command("ping") + + @classmethod + async def _tearDown_class(cls): + monitoring._LISTENERS = cls.saved_listeners + await cls.client.close() + await super()._tearDown_class() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.listener.reset() + + async def test_simple(self): + await self.client.pymongo_test.command("ping") + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) + self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) + self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) + self.assertEqualCommand(SON([("ping", 1)]), started.command) + self.assertEqual("ping", started.command_name) + self.assertEqual(await self.client.address, started.connection_id) + self.assertEqual("pymongo_test", started.database_name) + self.assertTrue(isinstance(started.request_id, int)) + + +class AsyncTestEventClasses(unittest.IsolatedAsyncioTestCase): + def test_command_event_repr(self): + request_id, connection_id, operation_id, db_name = 1, ("localhost", 27017), 2, "admin" + event = monitoring.CommandStartedEvent( + {"ping": 1}, db_name, request_id, connection_id, operation_id + ) + self.assertEqual( + repr(event), + "", + ) + delta = datetime.timedelta(milliseconds=100) + event = monitoring.CommandSucceededEvent( + delta, {"ok": 1}, "ping", request_id, connection_id, operation_id, database_name=db_name + ) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.CommandFailedEvent( + delta, {"ok": 0}, "ping", request_id, connection_id, operation_id, database_name=db_name + ) + self.assertEqual( + repr(event), + "", + ) + + def test_server_heartbeat_event_repr(self): + connection_id = ("localhost", 27017) + event = monitoring.ServerHeartbeatStartedEvent(connection_id) + self.assertEqual( + repr(event), "" + ) + delta = 0.1 + event = monitoring.ServerHeartbeatSucceededEvent( + delta, + {"ok": 1}, # type: ignore[arg-type] + connection_id, + ) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.ServerHeartbeatFailedEvent( + delta, + "ERROR", # type: ignore[arg-type] + connection_id, + ) + self.assertEqual( + repr(event), + "", + ) + + def test_server_event_repr(self): + server_address = ("localhost", 27017) + topology_id = ObjectId("000000000000000000000001") + event = monitoring.ServerOpeningEvent(server_address, topology_id) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.ServerDescriptionChangedEvent( + "PREV", # type: ignore[arg-type] + "NEW", # type: ignore[arg-type] + server_address, + topology_id, + ) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.ServerClosedEvent(server_address, topology_id) + self.assertEqual( + repr(event), + "", + ) + + def test_topology_event_repr(self): + topology_id = ObjectId("000000000000000000000001") + event = monitoring.TopologyOpenedEvent(topology_id) + self.assertEqual(repr(event), "") + event = monitoring.TopologyDescriptionChangedEvent( + "PREV", # type: ignore[arg-type] + "NEW", # type: ignore[arg-type] + topology_id, + ) + self.assertEqual( + repr(event), + "", + ) + event = monitoring.TopologyClosedEvent(topology_id) + self.assertEqual(repr(event), "") + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_monitoring.py b/test/test_monitoring.py index ed6a3d0bc2..8322e29918 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import annotations +import asyncio import copy import datetime import sys @@ -21,8 +22,19 @@ sys.path[0:0] = [""] -from test import IntegrationTest, client_context, client_knobs, sanitize_cmd, unittest -from test.utils import EventListener, rs_or_single_client, single_client, wait_until +from test import ( + IntegrationTest, + client_context, + client_knobs, + sanitize_cmd, + unittest, +) +from test.utils import ( + EventListener, + rs_or_single_client, + single_client, + wait_until, +) from bson.int64 import Int64 from bson.objectid import ObjectId @@ -31,23 +43,26 @@ from pymongo.errors import AutoReconnect, NotPrimaryError, OperationFailure from pymongo.read_preferences import ReadPreference from pymongo.synchronous.command_cursor import CommandCursor +from pymongo.synchronous.helpers import next from pymongo.write_concern import WriteConcern +_IS_SYNC = True + class TestCommandMonitoring(IntegrationTest): listener: EventListener @classmethod @client_context.require_connection - def setUpClass(cls): - super().setUpClass() + def _setup_class(cls): + super()._setup_class() cls.listener = EventListener() cls.client = rs_or_single_client(event_listeners=[cls.listener], retryWrites=False) @classmethod - def tearDownClass(cls): + def _tearDown_class(cls): cls.client.close() - super().tearDownClass() + super()._tearDown_class() def tearDown(self): self.listener.reset() @@ -171,7 +186,7 @@ def test_find_and_get_more(self): self.assertEqual(csr["nextBatch"], [{} for _ in range(4)]) finally: # Exhaust the cursor to avoid kill cursors. - tuple(cursor) + tuple(cursor.to_list()) def test_find_with_explain(self): cmd = SON([("explain", SON([("find", "test"), ("filter", {})]))]) @@ -230,7 +245,7 @@ def _test_find_options(self, query, expected_cmd): self.assertEqual(self.client.address, succeeded.connection_id) finally: # Exhaust the cursor to avoid kill cursors. - tuple(cursor) + tuple(cursor.to_list()) def test_find_options(self): query = { @@ -356,7 +371,7 @@ def test_command_and_get_more(self): self.assertEqualReply(expected_result, succeeded.reply) finally: # Exhaust the cursor to avoid kill cursors. - tuple(cursor) + tuple(cursor.to_list()) def test_get_more_failure(self): address = self.client.address @@ -451,7 +466,7 @@ def test_exhaust(self): self.assertEqualReply(expected_result, succeeded.reply) self.listener.reset() - tuple(cursor) + tuple(cursor.to_list()) self.assertEqual(0, len(self.listener.failed_events)) for event in self.listener.started_events: self.assertTrue(isinstance(event, monitoring.CommandStartedEvent)) @@ -898,7 +913,11 @@ def test_insert_many_unacknowledged(self): self.assertEqual(succeed.operation_id, operation_id) self.assertEqual(1, succeed.reply.get("ok")) self.assertEqual(documents, docs) - wait_until(lambda: coll.count_documents({}) == 6, "insert documents with w=0") + + def check(): + return coll.count_documents({}) == 6 + + wait_until(check, "insert documents with w=0") def test_bulk_write(self): coll = self.client.pymongo_test.test @@ -1058,7 +1077,7 @@ def test_first_batch_helper(self): # Regardless of server version and use of helpers._first_batch # this test should still pass. self.listener.reset() - tuple(self.client.pymongo_test.test.list_indexes()) + tuple((self.client.pymongo_test.test.list_indexes()).to_list()) started = self.listener.started_events[0] succeeded = self.listener.succeeded_events[0] self.assertEqual(0, len(self.listener.failed_events)) @@ -1119,8 +1138,8 @@ class TestGlobalListener(IntegrationTest): @classmethod @client_context.require_connection - def setUpClass(cls): - super().setUpClass() + def _setup_class(cls): + super()._setup_class() cls.listener = EventListener() # We plan to call register(), which internally modifies _LISTENERS. cls.saved_listeners = copy.deepcopy(monitoring._LISTENERS) @@ -1130,10 +1149,10 @@ def setUpClass(cls): cls.client.pymongo_test.command("ping") @classmethod - def tearDownClass(cls): + def _tearDown_class(cls): monitoring._LISTENERS = cls.saved_listeners cls.client.close() - super().tearDownClass() + super()._tearDown_class() def setUp(self): super().setUp() diff --git a/tools/synchro.py b/tools/synchro.py index dfe3854e22..f38a83f128 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -172,6 +172,7 @@ "test_session.py", "test_transactions.py", "test_client_context.py", + "test_monitoring.py", ] sync_test_files = [