From dded00863359546c888c322081ca550575d2e452 Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Sat, 13 Sep 2025 16:13:15 +0800 Subject: [PATCH 01/12] wait for Dapr health check asynchronously Switch Dapr health check from blocking call to async call to avoid blocking the event loop in async environments Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/grpc/subscription.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dapr/aio/clients/grpc/subscription.py b/dapr/aio/clients/grpc/subscription.py index 9aabf8b2..980eef45 100644 --- a/dapr/aio/clients/grpc/subscription.py +++ b/dapr/aio/clients/grpc/subscription.py @@ -51,7 +51,8 @@ async def outgoing_request_iterator(): async def reconnect_stream(self): await self.close() - DaprHealth.wait_until_ready() + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, DaprHealth.wait_until_ready) print('Attempting to reconnect...') await self.start() From ef96101526677f4cf0817ba2ae83700d807971f8 Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Sat, 13 Sep 2025 16:38:08 +0800 Subject: [PATCH 02/12] add StatusCode.UNKNOWN branch Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/grpc/subscription.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dapr/aio/clients/grpc/subscription.py b/dapr/aio/clients/grpc/subscription.py index 980eef45..c965fb9a 100644 --- a/dapr/aio/clients/grpc/subscription.py +++ b/dapr/aio/clients/grpc/subscription.py @@ -67,7 +67,7 @@ async def next_message(self): return None return SubscriptionMessage(message.event_message) except AioRpcError as e: - if e.code() == StatusCode.UNAVAILABLE: + if e.code() == StatusCode.UNAVAILABLE or e.code() == StatusCode.UNKNOWN: print( f'gRPC error while reading from stream: {e.details()}, ' f'Status Code: {e.code()}. ' From a91578dedbc5cd51f698dcdab6fd624248ea620c Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Tue, 14 Oct 2025 17:37:00 +0800 Subject: [PATCH 03/12] aio dapr health Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/grpc/subscription.py | 5 +-- dapr/aio/clients/health.py | 55 +++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 dapr/aio/clients/health.py diff --git a/dapr/aio/clients/grpc/subscription.py b/dapr/aio/clients/grpc/subscription.py index c965fb9a..2d28cab7 100644 --- a/dapr/aio/clients/grpc/subscription.py +++ b/dapr/aio/clients/grpc/subscription.py @@ -3,7 +3,7 @@ from grpc.aio import AioRpcError from dapr.clients.grpc._response import TopicEventResponse -from dapr.clients.health import DaprHealth +from dapr.aio.clients.health import DaprHealth from dapr.common.pubsub.subscription import ( StreamInactiveError, SubscriptionMessage, @@ -51,8 +51,7 @@ async def outgoing_request_iterator(): async def reconnect_stream(self): await self.close() - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, DaprHealth.wait_until_ready) + await DaprHealth.wait_until_ready() print('Attempting to reconnect...') await self.start() diff --git a/dapr/aio/clients/health.py b/dapr/aio/clients/health.py new file mode 100644 index 00000000..cb0d3b07 --- /dev/null +++ b/dapr/aio/clients/health.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import asyncio +import urllib.request +import urllib.error +import time + +from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT +from dapr.clients.http.helpers import get_api_url +from dapr.conf import settings + + +class DaprHealth: + @staticmethod + async def wait_until_ready(): + health_url = f'{get_api_url()}/healthz/outbound' + headers = {USER_AGENT_HEADER: DAPR_USER_AGENT} + if settings.DAPR_API_TOKEN is not None: + headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN + timeout = float(settings.DAPR_HEALTH_TIMEOUT) + + start = time.time() + while True: + try: + req = urllib.request.Request(health_url, headers=headers) + with urllib.request.urlopen(req, context=DaprHealth.get_ssl_context()) as response: + if 200 <= response.status < 300: + break + except urllib.error.URLError as e: + print(f'Health check on {health_url} failed: {e.reason}') + except Exception as e: + print(f'Unexpected error during health check: {e}') + + remaining = (start + timeout) - time.time() + if remaining <= 0: + raise TimeoutError(f'Dapr health check timed out, after {timeout}.') + await asyncio.sleep(min(1.0, remaining)) + + @staticmethod + def get_ssl_context(): + # This method is used (overwritten) from tests + # to return context for self-signed certificates + return None From afe00eeff59009b364738351cd963f83377b4c96 Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Tue, 14 Oct 2025 18:04:06 +0800 Subject: [PATCH 04/12] add healthcheck test Signed-off-by: mingsing <107218803@qq.com> --- ...est_heatlhcheck.py => test_healthcheck.py} | 0 tests/clients/test_healthcheck_async.py | 180 ++++++++++++++++++ 2 files changed, 180 insertions(+) rename tests/clients/{test_heatlhcheck.py => test_healthcheck.py} (100%) create mode 100644 tests/clients/test_healthcheck_async.py diff --git a/tests/clients/test_heatlhcheck.py b/tests/clients/test_healthcheck.py similarity index 100% rename from tests/clients/test_heatlhcheck.py rename to tests/clients/test_healthcheck.py diff --git a/tests/clients/test_healthcheck_async.py b/tests/clients/test_healthcheck_async.py new file mode 100644 index 00000000..9502fec7 --- /dev/null +++ b/tests/clients/test_healthcheck_async.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import asyncio +import time +import unittest +from unittest.mock import patch, MagicMock + +from dapr.aio.clients.health import DaprHealth +from dapr.conf import settings +from dapr.version import __version__ + + +class DaprHealthCheckAsyncTests(unittest.IsolatedAsyncioTestCase): + @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') + @patch('urllib.request.urlopen') + async def test_wait_until_ready_success(self, mock_urlopen): + mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) + + try: + await DaprHealth.wait_until_ready() + except Exception as e: + self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') + + mock_urlopen.assert_called_once() + + called_url = mock_urlopen.call_args[0][0].full_url + self.assertEqual(called_url, 'http://domain.com:3500/v1.0/healthz/outbound') + + # Check headers are properly set + headers = mock_urlopen.call_args[0][0].headers + self.assertIn('User-agent', headers) + self.assertEqual(headers['User-agent'], f'dapr-sdk-python/{__version__}') + + @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') + @patch.object(settings, 'DAPR_API_TOKEN', 'mytoken') + @patch('urllib.request.urlopen') + async def test_wait_until_ready_success_with_api_token(self, mock_urlopen): + mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) + + try: + await DaprHealth.wait_until_ready() + except Exception as e: + self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') + + mock_urlopen.assert_called_once() + + # Check headers are properly set + headers = mock_urlopen.call_args[0][0].headers + self.assertIn('User-agent', headers) + self.assertEqual(headers['User-agent'], f'dapr-sdk-python/{__version__}') + self.assertIn('Dapr-api-token', headers) + self.assertEqual(headers['Dapr-api-token'], 'mytoken') + + @patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '2.5') + @patch('urllib.request.urlopen') + async def test_wait_until_ready_timeout(self, mock_urlopen): + mock_urlopen.return_value.__enter__.return_value = MagicMock(status=500) + + start = time.time() + + with self.assertRaises(TimeoutError): + await DaprHealth.wait_until_ready() + + self.assertGreaterEqual(time.time() - start, 2.5) + self.assertGreater(mock_urlopen.call_count, 1) + + @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') + @patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '5.0') + @patch('urllib.request.urlopen') + async def test_health_check_does_not_block(self, mock_urlopen): + """Test that health check doesn't block other async tasks from running""" + # Mock health check to retry several times before succeeding + call_count = [0] # Use list to allow modification in nested function + + class MockResponse: + def __init__(self, status): + self.status = status + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return None + + def side_effect(*args, **kwargs): + call_count[0] += 1 + # First 2 calls fail with URLError, then succeed + # This will cause ~2 seconds of retries (1 second sleep after each failure) + if call_count[0] <= 2: + import urllib.error + raise urllib.error.URLError('Connection refused') + else: + return MockResponse(status=200) + + mock_urlopen.side_effect = side_effect + + # Counter that will be incremented by background task + counter = [0] # Use list to allow modification in nested function + is_running = [True] + + async def increment_counter(): + """Background task that increments counter every 0.5 seconds""" + while is_running[0]: + await asyncio.sleep(0.5) + counter[0] += 1 + + # Start the background task + counter_task = asyncio.create_task(increment_counter()) + + try: + # Run health check (will take ~2 seconds with retries) + await DaprHealth.wait_until_ready() + + # Stop the background task + is_running[0] = False + await asyncio.sleep(0.1) # Give it time to finish current iteration + + # Verify the counter was incremented during health check + # In 2 seconds with 0.5s intervals, we expect at least 3 increments + self.assertGreaterEqual( + counter[0], + 3, + f'Expected counter to increment at least 3 times during health check, ' + f'but got {counter[0]}. This indicates health check may be blocking.', + ) + + # Verify health check made multiple attempts + self.assertGreaterEqual(call_count[0], 2) + + finally: + # Clean up + is_running[0] = False + counter_task.cancel() + try: + await counter_task + except asyncio.CancelledError: + pass + + @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') + @patch('urllib.request.urlopen') + async def test_multiple_health_checks_concurrent(self, mock_urlopen): + """Test that multiple health check calls can run concurrently""" + mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) + + # Run multiple health checks concurrently + start_time = time.time() + results = await asyncio.gather( + DaprHealth.wait_until_ready(), + DaprHealth.wait_until_ready(), + DaprHealth.wait_until_ready(), + ) + elapsed = time.time() - start_time + + # All should complete successfully + self.assertEqual(len(results), 3) + self.assertIsNone(results[0]) + self.assertIsNone(results[1]) + self.assertIsNone(results[2]) + + # Should complete quickly since they run concurrently + self.assertLess(elapsed, 1.0) + + # Verify multiple calls were made + self.assertGreaterEqual(mock_urlopen.call_count, 3) + + +if __name__ == '__main__': + unittest.main() From d9cfc36fdccc1e62cde291c851967c3bcc1c12c4 Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Thu, 16 Oct 2025 17:23:26 +0800 Subject: [PATCH 05/12] ruff pass Signed-off-by: mingsing <107218803@qq.com> --- tests/clients/test_healthcheck_async.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/clients/test_healthcheck_async.py b/tests/clients/test_healthcheck_async.py index 9502fec7..510a2e0c 100644 --- a/tests/clients/test_healthcheck_async.py +++ b/tests/clients/test_healthcheck_async.py @@ -100,6 +100,7 @@ def side_effect(*args, **kwargs): # This will cause ~2 seconds of retries (1 second sleep after each failure) if call_count[0] <= 2: import urllib.error + raise urllib.error.URLError('Connection refused') else: return MockResponse(status=200) From dde4988a18b1ff81526655642e78c1420d41878f Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Wed, 22 Oct 2025 11:02:28 +0800 Subject: [PATCH 06/12] fix async health check Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/grpc/client.py | 2 +- tests/clients/test_dapr_grpc_client_async_secure.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 995b8268..2e801330 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -1912,7 +1912,7 @@ async def wait(self, timeout_s: float): remaining = (start + timeout_s) - time.time() if remaining < 0: raise e - asyncio.sleep(min(1, remaining)) + await asyncio.sleep(min(1, remaining)) async def get_metadata(self) -> GetMetadataResponse: """Returns information about the sidecar allowing for runtime diff --git a/tests/clients/test_dapr_grpc_client_async_secure.py b/tests/clients/test_dapr_grpc_client_async_secure.py index 652feac2..4898284a 100644 --- a/tests/clients/test_dapr_grpc_client_async_secure.py +++ b/tests/clients/test_dapr_grpc_client_async_secure.py @@ -18,7 +18,8 @@ from unittest.mock import patch from dapr.aio.clients.grpc.client import DaprGrpcClientAsync -from dapr.clients.health import DaprHealth +from dapr.aio.clients.health import DaprHealth as DaprHealthAsync +from dapr.clients.health import DaprHealth as DaprHealth from tests.clients.certs import replacement_get_credentials_func, replacement_get_health_context from tests.clients.test_dapr_grpc_client_async import DaprGrpcClientAsyncTests from .fake_dapr_server import FakeDaprSidecar @@ -26,6 +27,7 @@ DaprGrpcClientAsync.get_credentials = replacement_get_credentials_func +DaprHealthAsync.get_ssl_context = replacement_get_health_context DaprHealth.get_ssl_context = replacement_get_health_context From 45ce7d7c56b43951468c231cdd98467f3b0c5c54 Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Sat, 29 Nov 2025 16:17:46 +0800 Subject: [PATCH 07/12] use aiohttp Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/health.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/dapr/aio/clients/health.py b/dapr/aio/clients/health.py index cb0d3b07..4b4c1188 100644 --- a/dapr/aio/clients/health.py +++ b/dapr/aio/clients/health.py @@ -12,9 +12,8 @@ See the License for the specific language governing permissions and limitations under the License. """ +import aiohttp import asyncio -import urllib.request -import urllib.error import time from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT @@ -32,21 +31,24 @@ async def wait_until_ready(): timeout = float(settings.DAPR_HEALTH_TIMEOUT) start = time.time() - while True: - try: - req = urllib.request.Request(health_url, headers=headers) - with urllib.request.urlopen(req, context=DaprHealth.get_ssl_context()) as response: - if 200 <= response.status < 300: - break - except urllib.error.URLError as e: - print(f'Health check on {health_url} failed: {e.reason}') - except Exception as e: - print(f'Unexpected error during health check: {e}') - - remaining = (start + timeout) - time.time() - if remaining <= 0: - raise TimeoutError(f'Dapr health check timed out, after {timeout}.') - await asyncio.sleep(min(1.0, remaining)) + ssl_context = DaprHealth.get_ssl_context() + + connector = aiohttp.TCPConnector(ssl=ssl_context) + async with aiohttp.ClientSession(connector=connector) as session: + while True: + try: + async with session.get(health_url, headers=headers) as response: + if 200 <= response.status < 300: + break + except aiohttp.ClientError as e: + print(f'Health check on {health_url} failed: {e}') + except Exception as e: + print(f'Unexpected error during health check: {e}') + + remaining = (start + timeout) - time.time() + if remaining <= 0: + raise TimeoutError(f'Dapr health check timed out, after {timeout}.') + await asyncio.sleep(min(1.0, remaining)) @staticmethod def get_ssl_context(): From d72b22ebdf6f8c4b221c0f639a4e2b17d6efd87a Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Mon, 1 Dec 2025 14:35:50 +0800 Subject: [PATCH 08/12] use aiohttp for asynchronous health check Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/health.py | 15 ++- tests/clients/test_healthcheck_async.py | 120 ++++++++++++++---------- 2 files changed, 81 insertions(+), 54 deletions(-) diff --git a/dapr/aio/clients/health.py b/dapr/aio/clients/health.py index 4b4c1188..f151c1b4 100644 --- a/dapr/aio/clients/health.py +++ b/dapr/aio/clients/health.py @@ -12,11 +12,13 @@ See the License for the specific language governing permissions and limitations under the License. """ + import aiohttp import asyncio import time +from warnings import warn -from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT +from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, DAPR_USER_AGENT, USER_AGENT_HEADER from dapr.clients.http.helpers import get_api_url from dapr.conf import settings @@ -24,6 +26,15 @@ class DaprHealth: @staticmethod async def wait_until_ready(): + warn( + 'This method is deprecated. Use DaprHealth.wait_for_sidecar instead.', + DeprecationWarning, + stacklevel=2, + ) + await DaprHealth.wait_for_sidecar() + + @staticmethod + async def wait_for_sidecar(): health_url = f'{get_api_url()}/healthz/outbound' headers = {USER_AGENT_HEADER: DAPR_USER_AGENT} if settings.DAPR_API_TOKEN is not None: @@ -48,7 +59,7 @@ async def wait_until_ready(): remaining = (start + timeout) - time.time() if remaining <= 0: raise TimeoutError(f'Dapr health check timed out, after {timeout}.') - await asyncio.sleep(min(1.0, remaining)) + await asyncio.sleep(min(1, remaining)) @staticmethod def get_ssl_context(): diff --git a/tests/clients/test_healthcheck_async.py b/tests/clients/test_healthcheck_async.py index 510a2e0c..66876873 100644 --- a/tests/clients/test_healthcheck_async.py +++ b/tests/clients/test_healthcheck_async.py @@ -12,10 +12,11 @@ See the License for the specific language governing permissions and limitations under the License. """ + import asyncio import time import unittest -from unittest.mock import patch, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch from dapr.aio.clients.health import DaprHealth from dapr.conf import settings @@ -24,88 +25,98 @@ class DaprHealthCheckAsyncTests(unittest.IsolatedAsyncioTestCase): @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') - @patch('urllib.request.urlopen') - async def test_wait_until_ready_success(self, mock_urlopen): - mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) + @patch('aiohttp.ClientSession.get') + async def test_wait_for_sidecar_success(self, mock_get): + # Create mock response + mock_response = MagicMock() + mock_response.status = 200 + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + mock_get.return_value = mock_response try: - await DaprHealth.wait_until_ready() + await DaprHealth.wait_for_sidecar() except Exception as e: - self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') + self.fail(f'wait_for_sidecar() raised an exception unexpectedly: {e}') - mock_urlopen.assert_called_once() + mock_get.assert_called_once() - called_url = mock_urlopen.call_args[0][0].full_url + # Check URL + called_url = mock_get.call_args[0][0] self.assertEqual(called_url, 'http://domain.com:3500/v1.0/healthz/outbound') # Check headers are properly set - headers = mock_urlopen.call_args[0][0].headers - self.assertIn('User-agent', headers) - self.assertEqual(headers['User-agent'], f'dapr-sdk-python/{__version__}') + headers = mock_get.call_args[1]['headers'] + self.assertIn('User-Agent', headers) + self.assertEqual(headers['User-Agent'], f'dapr-sdk-python/{__version__}') @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') @patch.object(settings, 'DAPR_API_TOKEN', 'mytoken') - @patch('urllib.request.urlopen') - async def test_wait_until_ready_success_with_api_token(self, mock_urlopen): - mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) + @patch('aiohttp.ClientSession.get') + async def test_wait_for_sidecar_success_with_api_token(self, mock_get): + # Create mock response + mock_response = MagicMock() + mock_response.status = 200 + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + mock_get.return_value = mock_response try: - await DaprHealth.wait_until_ready() + await DaprHealth.wait_for_sidecar() except Exception as e: - self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') + self.fail(f'wait_for_sidecar() raised an exception unexpectedly: {e}') - mock_urlopen.assert_called_once() + mock_get.assert_called_once() # Check headers are properly set - headers = mock_urlopen.call_args[0][0].headers - self.assertIn('User-agent', headers) - self.assertEqual(headers['User-agent'], f'dapr-sdk-python/{__version__}') - self.assertIn('Dapr-api-token', headers) - self.assertEqual(headers['Dapr-api-token'], 'mytoken') + headers = mock_get.call_args[1]['headers'] + self.assertIn('User-Agent', headers) + self.assertEqual(headers['User-Agent'], f'dapr-sdk-python/{__version__}') + self.assertIn('dapr-api-token', headers) + self.assertEqual(headers['dapr-api-token'], 'mytoken') @patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '2.5') - @patch('urllib.request.urlopen') - async def test_wait_until_ready_timeout(self, mock_urlopen): - mock_urlopen.return_value.__enter__.return_value = MagicMock(status=500) + @patch('aiohttp.ClientSession.get') + async def test_wait_for_sidecar_timeout(self, mock_get): + # Create mock response that always returns 500 + mock_response = MagicMock() + mock_response.status = 500 + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + mock_get.return_value = mock_response start = time.time() with self.assertRaises(TimeoutError): - await DaprHealth.wait_until_ready() + await DaprHealth.wait_for_sidecar() self.assertGreaterEqual(time.time() - start, 2.5) - self.assertGreater(mock_urlopen.call_count, 1) + self.assertGreater(mock_get.call_count, 1) @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') @patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '5.0') - @patch('urllib.request.urlopen') - async def test_health_check_does_not_block(self, mock_urlopen): + @patch('aiohttp.ClientSession.get') + async def test_health_check_does_not_block(self, mock_get): """Test that health check doesn't block other async tasks from running""" # Mock health check to retry several times before succeeding call_count = [0] # Use list to allow modification in nested function - class MockResponse: - def __init__(self, status): - self.status = status - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - return None - def side_effect(*args, **kwargs): call_count[0] += 1 - # First 2 calls fail with URLError, then succeed + # First 2 calls fail with ClientError, then succeed # This will cause ~2 seconds of retries (1 second sleep after each failure) if call_count[0] <= 2: - import urllib.error + import aiohttp - raise urllib.error.URLError('Connection refused') + raise aiohttp.ClientError('Connection refused') else: - return MockResponse(status=200) + mock_response = MagicMock() + mock_response.status = 200 + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + return mock_response - mock_urlopen.side_effect = side_effect + mock_get.side_effect = side_effect # Counter that will be incremented by background task counter = [0] # Use list to allow modification in nested function @@ -122,7 +133,7 @@ async def increment_counter(): try: # Run health check (will take ~2 seconds with retries) - await DaprHealth.wait_until_ready() + await DaprHealth.wait_for_sidecar() # Stop the background task is_running[0] = False @@ -150,17 +161,22 @@ async def increment_counter(): pass @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') - @patch('urllib.request.urlopen') - async def test_multiple_health_checks_concurrent(self, mock_urlopen): + @patch('aiohttp.ClientSession.get') + async def test_multiple_health_checks_concurrent(self, mock_get): """Test that multiple health check calls can run concurrently""" - mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) + # Create mock response + mock_response = MagicMock() + mock_response.status = 200 + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + mock_get.return_value = mock_response # Run multiple health checks concurrently start_time = time.time() results = await asyncio.gather( - DaprHealth.wait_until_ready(), - DaprHealth.wait_until_ready(), - DaprHealth.wait_until_ready(), + DaprHealth.wait_for_sidecar(), + DaprHealth.wait_for_sidecar(), + DaprHealth.wait_for_sidecar(), ) elapsed = time.time() - start_time @@ -174,7 +190,7 @@ async def test_multiple_health_checks_concurrent(self, mock_urlopen): self.assertLess(elapsed, 1.0) # Verify multiple calls were made - self.assertGreaterEqual(mock_urlopen.call_count, 3) + self.assertGreaterEqual(mock_get.call_count, 3) if __name__ == '__main__': From 17c330282a373ea5f4103678170ffb40815e5f74 Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Mon, 1 Dec 2025 19:50:54 +0800 Subject: [PATCH 09/12] remove deprecated wait_until_ready in async DaprHealth Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/grpc/subscription.py | 2 +- dapr/aio/clients/health.py | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/dapr/aio/clients/grpc/subscription.py b/dapr/aio/clients/grpc/subscription.py index 358224ad..32c544a2 100644 --- a/dapr/aio/clients/grpc/subscription.py +++ b/dapr/aio/clients/grpc/subscription.py @@ -52,7 +52,7 @@ async def outgoing_request_iterator(): async def reconnect_stream(self): await self.close() - await DaprHealth.wait_until_ready() + await DaprHealth.wait_for_sidecar() print('Attempting to reconnect...') await self.start() diff --git a/dapr/aio/clients/health.py b/dapr/aio/clients/health.py index f151c1b4..b9994140 100644 --- a/dapr/aio/clients/health.py +++ b/dapr/aio/clients/health.py @@ -24,15 +24,6 @@ class DaprHealth: - @staticmethod - async def wait_until_ready(): - warn( - 'This method is deprecated. Use DaprHealth.wait_for_sidecar instead.', - DeprecationWarning, - stacklevel=2, - ) - await DaprHealth.wait_for_sidecar() - @staticmethod async def wait_for_sidecar(): health_url = f'{get_api_url()}/healthz/outbound' From 10774295a8304c7b255495e6c92eb854a24046a0 Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Mon, 1 Dec 2025 20:10:45 +0800 Subject: [PATCH 10/12] rm DaprHealth.get_ssl_context in test_dapr_grpc_client_async_secure Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/grpc/client.py | 2 +- tests/clients/test_dapr_grpc_client_async_secure.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index d363775f..d459d62a 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -90,7 +90,7 @@ UnlockResponseStatus, ) from dapr.clients.grpc._state import StateItem, StateOptions -from dapr.clients.health import DaprHealth +from dapr.aio.clients.health import DaprHealth from dapr.clients.retry import RetryPolicy from dapr.common.pubsub.subscription import StreamInactiveError from dapr.conf import settings diff --git a/tests/clients/test_dapr_grpc_client_async_secure.py b/tests/clients/test_dapr_grpc_client_async_secure.py index 4898284a..fc14d00e 100644 --- a/tests/clients/test_dapr_grpc_client_async_secure.py +++ b/tests/clients/test_dapr_grpc_client_async_secure.py @@ -19,7 +19,6 @@ from dapr.aio.clients.grpc.client import DaprGrpcClientAsync from dapr.aio.clients.health import DaprHealth as DaprHealthAsync -from dapr.clients.health import DaprHealth as DaprHealth from tests.clients.certs import replacement_get_credentials_func, replacement_get_health_context from tests.clients.test_dapr_grpc_client_async import DaprGrpcClientAsyncTests from .fake_dapr_server import FakeDaprSidecar @@ -28,7 +27,6 @@ DaprGrpcClientAsync.get_credentials = replacement_get_credentials_func DaprHealthAsync.get_ssl_context = replacement_get_health_context -DaprHealth.get_ssl_context = replacement_get_health_context class DaprSecureGrpcClientAsyncTests(DaprGrpcClientAsyncTests): From 0c8371c11b9d081c1b0d6d158bfdb25089a95d67 Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Mon, 1 Dec 2025 20:28:20 +0800 Subject: [PATCH 11/12] format Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/health.py | 4 ++-- tests/clients/test_dapr_grpc_client_async_secure.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/dapr/aio/clients/health.py b/dapr/aio/clients/health.py index b9994140..9ab66ebb 100644 --- a/dapr/aio/clients/health.py +++ b/dapr/aio/clients/health.py @@ -13,10 +13,10 @@ limitations under the License. """ -import aiohttp import asyncio import time -from warnings import warn + +import aiohttp from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, DAPR_USER_AGENT, USER_AGENT_HEADER from dapr.clients.http.helpers import get_api_url diff --git a/tests/clients/test_dapr_grpc_client_async_secure.py b/tests/clients/test_dapr_grpc_client_async_secure.py index fc14d00e..d28aaa20 100644 --- a/tests/clients/test_dapr_grpc_client_async_secure.py +++ b/tests/clients/test_dapr_grpc_client_async_secure.py @@ -14,16 +14,15 @@ """ import unittest - from unittest.mock import patch from dapr.aio.clients.grpc.client import DaprGrpcClientAsync from dapr.aio.clients.health import DaprHealth as DaprHealthAsync +from dapr.conf import settings from tests.clients.certs import replacement_get_credentials_func, replacement_get_health_context from tests.clients.test_dapr_grpc_client_async import DaprGrpcClientAsyncTests -from .fake_dapr_server import FakeDaprSidecar -from dapr.conf import settings +from .fake_dapr_server import FakeDaprSidecar DaprGrpcClientAsync.get_credentials = replacement_get_credentials_func DaprHealthAsync.get_ssl_context = replacement_get_health_context From 00222e12aee047056fb48085665afe6f4e790623 Mon Sep 17 00:00:00 2001 From: mingsing <107218803@qq.com> Date: Tue, 2 Dec 2025 10:08:06 +0800 Subject: [PATCH 12/12] Revert "rm DaprHealth.get_ssl_context in test_dapr_grpc_client_async_secure" Signed-off-by: mingsing <107218803@qq.com> --- dapr/aio/clients/grpc/client.py | 2 +- tests/clients/test_dapr_grpc_client_async_secure.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index d459d62a..d363775f 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -90,7 +90,7 @@ UnlockResponseStatus, ) from dapr.clients.grpc._state import StateItem, StateOptions -from dapr.aio.clients.health import DaprHealth +from dapr.clients.health import DaprHealth from dapr.clients.retry import RetryPolicy from dapr.common.pubsub.subscription import StreamInactiveError from dapr.conf import settings diff --git a/tests/clients/test_dapr_grpc_client_async_secure.py b/tests/clients/test_dapr_grpc_client_async_secure.py index d28aaa20..1d685287 100644 --- a/tests/clients/test_dapr_grpc_client_async_secure.py +++ b/tests/clients/test_dapr_grpc_client_async_secure.py @@ -18,14 +18,15 @@ from dapr.aio.clients.grpc.client import DaprGrpcClientAsync from dapr.aio.clients.health import DaprHealth as DaprHealthAsync +from dapr.clients.health import DaprHealth from dapr.conf import settings from tests.clients.certs import replacement_get_credentials_func, replacement_get_health_context from tests.clients.test_dapr_grpc_client_async import DaprGrpcClientAsyncTests - from .fake_dapr_server import FakeDaprSidecar DaprGrpcClientAsync.get_credentials = replacement_get_credentials_func DaprHealthAsync.get_ssl_context = replacement_get_health_context +DaprHealth.get_ssl_context = replacement_get_health_context class DaprSecureGrpcClientAsyncTests(DaprGrpcClientAsyncTests):