Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions google/cloud/sql/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from google.cloud.sql.connector.enums import DriverMapping
from google.cloud.sql.connector.enums import IPTypes
from google.cloud.sql.connector.enums import RefreshStrategy
from google.cloud.sql.connector.exceptions import ConnectorLoopError
from google.cloud.sql.connector.instance import RefreshAheadCache
from google.cloud.sql.connector.lazy import LazyRefreshCache
from google.cloud.sql.connector.monitored_cache import MonitoredCache
Expand Down Expand Up @@ -281,6 +282,15 @@ async def connect_async(
KeyError: Unsupported database driver Must be one of pymysql, asyncpg,
pg8000, and pytds.
"""
# check if event loop is running in current thread
if self._loop != asyncio.get_running_loop():
raise ConnectorLoopError(
"Running event loop does not match 'connector._loop'. "
"Connector.connect_async() must be called from the event loop "
"the Connector was initialized with. If you need to connect "
"across event loops, please use a new Connector object."
)

if self._keys is None:
self._keys = asyncio.create_task(generate_keys())
if self._client is None:
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import asyncio
import os
from threading import Thread
from typing import Union

from aiohttp import ClientResponseError
Expand All @@ -29,6 +30,7 @@
from google.cloud.sql.connector.client import CloudSQLClient
from google.cloud.sql.connector.connection_name import ConnectionName
from google.cloud.sql.connector.exceptions import CloudSQLIPTypeError
from google.cloud.sql.connector.exceptions import ConnectorLoopError
from google.cloud.sql.connector.exceptions import IncompatibleDriverError
from google.cloud.sql.connector.instance import RefreshAheadCache

Expand Down Expand Up @@ -280,6 +282,38 @@ async def test_Connector_connect_async(
assert connection is True


@pytest.mark.asyncio
async def test_Connector_connect_async_multiple_event_loops(
fake_credentials: Credentials, fake_client: CloudSQLClient
) -> None:
"""Test that Connector.connect_async errors when run on wrong event loop."""

new_loop = asyncio.new_event_loop()
thread = Thread(target=new_loop.run_forever, daemon=True)
thread.start()

async with Connector(
credentials=fake_credentials, loop=asyncio.get_running_loop()
) as connector:
connector._client = fake_client
with pytest.raises(ConnectorLoopError) as exc_info:
future = asyncio.run_coroutine_threadsafe(
connector.connect_async(
"test-project:test-region:test-instance", "asyncpg"
),
loop=new_loop,
)
future.result()
assert (
exc_info.value.args[0] == "Running event loop does not match "
"'connector._loop'. Connector.connect_async() must be called from "
"the event loop the Connector was initialized with. If you need to "
"connect across event loops, please use a new Connector object."
)
new_loop.call_soon_threadsafe(new_loop.stop)
thread.join()


@pytest.mark.asyncio
async def test_create_async_connector(fake_credentials: Credentials) -> None:
"""Test that create_async_connector properly initializes connector
Expand Down
Loading