From 94e50baf8854cfb0aac5e570f1c3e5acf4baefee Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Tue, 4 Jun 2024 22:21:19 +0000 Subject: [PATCH 1/2] fix: error on connect_async() event loop mismatch --- google/cloud/sql/connector/connector.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/google/cloud/sql/connector/connector.py b/google/cloud/sql/connector/connector.py index c76092a40..929d998e9 100755 --- a/google/cloud/sql/connector/connector.py +++ b/google/cloud/sql/connector/connector.py @@ -281,6 +281,15 @@ async def connect_async( KeyError: Unsupported database driver Must be one of pymysql, asyncpg, pg8000, and pytds. """ + # check if event loop is running in current thread + if self._loop != asyncio.get_running_loop(): + raise ConnectorLoopError( + "Running event loop does not match 'connector._loop'. " + "Connector.connect_async() must be called from the event loop " + "the Connector was initialized with. If you need to connect " + "across event loops/threads please use a new Connector object." + ) + if self._keys is None: self._keys = asyncio.create_task(generate_keys()) if self._client is None: From dc2212a00d8617a028dd98de392b6c4f86ea51fe Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Tue, 1 Oct 2024 17:24:02 +0000 Subject: [PATCH 2/2] chore: add test and update message --- google/cloud/sql/connector/connector.py | 3 ++- tests/unit/test_connector.py | 34 +++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/google/cloud/sql/connector/connector.py b/google/cloud/sql/connector/connector.py index 929d998e9..0229b7283 100755 --- a/google/cloud/sql/connector/connector.py +++ b/google/cloud/sql/connector/connector.py @@ -34,6 +34,7 @@ from google.cloud.sql.connector.enums import DriverMapping from google.cloud.sql.connector.enums import IPTypes from google.cloud.sql.connector.enums import RefreshStrategy +from google.cloud.sql.connector.exceptions import ConnectorLoopError from google.cloud.sql.connector.instance import RefreshAheadCache from google.cloud.sql.connector.lazy import LazyRefreshCache from google.cloud.sql.connector.monitored_cache import MonitoredCache @@ -287,7 +288,7 @@ async def connect_async( "Running event loop does not match 'connector._loop'. " "Connector.connect_async() must be called from the event loop " "the Connector was initialized with. If you need to connect " - "across event loops/threads please use a new Connector object." + "across event loops, please use a new Connector object." ) if self._keys is None: diff --git a/tests/unit/test_connector.py b/tests/unit/test_connector.py index 498c947cc..1bcb42616 100644 --- a/tests/unit/test_connector.py +++ b/tests/unit/test_connector.py @@ -16,6 +16,7 @@ import asyncio import os +from threading import Thread from typing import Union from aiohttp import ClientResponseError @@ -29,6 +30,7 @@ from google.cloud.sql.connector.client import CloudSQLClient from google.cloud.sql.connector.connection_name import ConnectionName from google.cloud.sql.connector.exceptions import CloudSQLIPTypeError +from google.cloud.sql.connector.exceptions import ConnectorLoopError from google.cloud.sql.connector.exceptions import IncompatibleDriverError from google.cloud.sql.connector.instance import RefreshAheadCache @@ -280,6 +282,38 @@ async def test_Connector_connect_async( assert connection is True +@pytest.mark.asyncio +async def test_Connector_connect_async_multiple_event_loops( + fake_credentials: Credentials, fake_client: CloudSQLClient +) -> None: + """Test that Connector.connect_async errors when run on wrong event loop.""" + + new_loop = asyncio.new_event_loop() + thread = Thread(target=new_loop.run_forever, daemon=True) + thread.start() + + async with Connector( + credentials=fake_credentials, loop=asyncio.get_running_loop() + ) as connector: + connector._client = fake_client + with pytest.raises(ConnectorLoopError) as exc_info: + future = asyncio.run_coroutine_threadsafe( + connector.connect_async( + "test-project:test-region:test-instance", "asyncpg" + ), + loop=new_loop, + ) + future.result() + assert ( + exc_info.value.args[0] == "Running event loop does not match " + "'connector._loop'. Connector.connect_async() must be called from " + "the event loop the Connector was initialized with. If you need to " + "connect across event loops, please use a new Connector object." + ) + new_loop.call_soon_threadsafe(new_loop.stop) + thread.join() + + @pytest.mark.asyncio async def test_create_async_connector(fake_credentials: Credentials) -> None: """Test that create_async_connector properly initializes connector