Skip to content

PYTHON-5084 - Convert test.test_heartbeat_monitoring to async #2100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions test/asynchronous/test_heartbeat_monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright 2016-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test the monitoring of the server heartbeats."""
from __future__ import annotations

import sys

sys.path[0:0] = [""]

from test.asynchronous import AsyncIntegrationTest, client_knobs, unittest
from test.utils import AsyncMockPool, HeartbeatEventListener, async_wait_until

from pymongo.asynchronous.monitor import Monitor
from pymongo.errors import ConnectionFailure
from pymongo.hello import Hello, HelloCompat

_IS_SYNC = False


class TestHeartbeatMonitoring(AsyncIntegrationTest):
async def create_mock_monitor(self, responses, uri, expected_results):
listener = HeartbeatEventListener()
with client_knobs(
heartbeat_frequency=0.1, min_heartbeat_interval=0.1, events_queue_frequency=0.1
):

class MockMonitor(Monitor):
async def _check_with_socket(self, *args, **kwargs):
if isinstance(responses[1], Exception):
raise responses[1]
return Hello(responses[1]), 99

_ = await self.async_single_client(
h=uri,
event_listeners=(listener,),
_monitor_class=MockMonitor,
_pool_class=AsyncMockPool,
connect=True,
)

expected_len = len(expected_results)
# Wait for *at least* expected_len number of results. The
# monitor thread may run multiple times during the execution
# of this test.
await async_wait_until(
lambda: len(listener.events) >= expected_len, "publish all events"
)

# zip gives us len(expected_results) pairs.
for expected, actual in zip(expected_results, listener.events):
self.assertEqual(expected, actual.__class__.__name__)
self.assertEqual(actual.connection_id, responses[0])
if expected != "ServerHeartbeatStartedEvent":
if isinstance(actual.reply, Hello):
self.assertEqual(actual.duration, 99)
self.assertEqual(actual.reply._doc, responses[1])
else:
self.assertEqual(actual.reply, responses[1])

async def test_standalone(self):
responses = (
("a", 27017),
{HelloCompat.LEGACY_CMD: True, "maxWireVersion": 4, "minWireVersion": 0, "ok": 1},
)
uri = "mongodb://a:27017"
expected_results = ["ServerHeartbeatStartedEvent", "ServerHeartbeatSucceededEvent"]

await self.create_mock_monitor(responses, uri, expected_results)

async def test_standalone_error(self):
responses = (("a", 27017), ConnectionFailure("SPECIAL MESSAGE"))
uri = "mongodb://a:27017"
# _check_with_socket failing results in a second attempt.
expected_results = [
"ServerHeartbeatStartedEvent",
"ServerHeartbeatFailedEvent",
"ServerHeartbeatStartedEvent",
"ServerHeartbeatFailedEvent",
]

await self.create_mock_monitor(responses, uri, expected_results)


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2399,7 +2399,7 @@ def test_reconnect(self):

# MongoClient discovers it's alone. The first attempt raises either
# ServerSelectionTimeoutError or AutoReconnect (from
# AsyncMockPool.get_socket).
# MockPool.get_socket).
with self.assertRaises(AutoReconnect):
c.db.collection.find_one()

Expand Down
34 changes: 18 additions & 16 deletions test/test_heartbeat_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from pymongo.hello import Hello, HelloCompat
from pymongo.synchronous.monitor import Monitor

_IS_SYNC = True


class TestHeartbeatMonitoring(IntegrationTest):
def create_mock_monitor(self, responses, uri, expected_results):
Expand All @@ -40,8 +42,12 @@ def _check_with_socket(self, *args, **kwargs):
raise responses[1]
return Hello(responses[1]), 99

m = self.single_client(
h=uri, event_listeners=(listener,), _monitor_class=MockMonitor, _pool_class=MockPool
_ = self.single_client(
h=uri,
event_listeners=(listener,),
_monitor_class=MockMonitor,
_pool_class=MockPool,
connect=True,
)

expected_len = len(expected_results)
Expand All @@ -50,20 +56,16 @@ def _check_with_socket(self, *args, **kwargs):
# of this test.
wait_until(lambda: len(listener.events) >= expected_len, "publish all events")

try:
# zip gives us len(expected_results) pairs.
for expected, actual in zip(expected_results, listener.events):
self.assertEqual(expected, actual.__class__.__name__)
self.assertEqual(actual.connection_id, responses[0])
if expected != "ServerHeartbeatStartedEvent":
if isinstance(actual.reply, Hello):
self.assertEqual(actual.duration, 99)
self.assertEqual(actual.reply._doc, responses[1])
else:
self.assertEqual(actual.reply, responses[1])

finally:
m.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh a try finally block is...wacky HAHA i wonder how that came to be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try...finally without an except is something we've used to ensure a resource is cleaned up even if an exception occurs unexpectedly. It looks pretty goofy though yeah.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, that makes sense

# zip gives us len(expected_results) pairs.
for expected, actual in zip(expected_results, listener.events):
self.assertEqual(expected, actual.__class__.__name__)
self.assertEqual(actual.connection_id, responses[0])
if expected != "ServerHeartbeatStartedEvent":
if isinstance(actual.reply, Hello):
self.assertEqual(actual.duration, 99)
self.assertEqual(actual.reply._doc, responses[1])
else:
self.assertEqual(actual.reply, responses[1])

def test_standalone(self):
responses = (
Expand Down
59 changes: 58 additions & 1 deletion test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from pymongo.errors import ConfigurationError, OperationFailure
from pymongo.hello import HelloCompat
from pymongo.helpers_shared import _SENSITIVE_COMMANDS
from pymongo.lock import _create_lock
from pymongo.lock import _async_create_lock, _create_lock
from pymongo.monitoring import (
ConnectionCheckedInEvent,
ConnectionCheckedOutEvent,
Expand Down Expand Up @@ -312,6 +312,22 @@ def failed(self, event):
self.event_list.append("serverHeartbeatFailedEvent")


class AsyncMockConnection:
def __init__(self):
self.cancel_context = _CancellationContext()
self.more_to_come = False
self.id = random.randint(0, 100)

def close_conn(self, reason):
pass

def __aenter__(self):
return self

def __aexit__(self, exc_type, exc_val, exc_tb):
pass


class MockConnection:
def __init__(self):
self.cancel_context = _CancellationContext()
Expand All @@ -328,6 +344,47 @@ def __exit__(self, exc_type, exc_val, exc_tb):
pass


class AsyncMockPool:
def __init__(self, address, options, handshake=True, client_id=None):
self.gen = _PoolGeneration()
self._lock = _async_create_lock()
self.opts = options
self.operation_count = 0
self.conns = []

def stale_generation(self, gen, service_id):
return self.gen.stale(gen, service_id)

@contextlib.asynccontextmanager
async def checkout(self, handler=None):
yield AsyncMockConnection()

async def checkin(self, *args, **kwargs):
pass

async def _reset(self, service_id=None):
async with self._lock:
self.gen.inc(service_id)

async def ready(self):
pass

async def reset(self, service_id=None, interrupt_connections=False):
await self._reset()

async def reset_without_pause(self):
await self._reset()

async def close(self):
await self._reset()

async def update_is_writable(self, is_writable):
pass

async def remove_stale_sockets(self, *args, **kwargs):
pass


class MockPool:
def __init__(self, address, options, handshake=True, client_id=None):
self.gen = _PoolGeneration()
Expand Down
3 changes: 3 additions & 0 deletions tools/synchro.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
"_async_create_lock": "_create_lock",
"_async_create_condition": "_create_condition",
"_async_cond_wait": "_cond_wait",
"AsyncMockConnection": "MockConnection",
"AsyncMockPool": "MockPool",
}

docstring_replacements: dict[tuple[str, str], str] = {
Expand Down Expand Up @@ -206,6 +208,7 @@ def async_only_test(f: str) -> bool:
"test_database.py",
"test_data_lake.py",
"test_encryption.py",
"test_heartbeat_monitoring.py",
"test_grid_file.py",
"test_logger.py",
"test_monitoring.py",
Expand Down
Loading