Skip to content

Commit e8fbbdf

Browse files
feat: invalidate cache on bad connection info and failed IP lookup (#389)
This change is analogous to CloudSQL's GoogleCloudPlatform/cloud-sql-python-connector#1118. The Connector caches connection info for future connections and schedules refresh operations. For unrecoverable errors, the cache should be invalidated to stop future bad refreshes. The cache should also be invalidated on failed IP lookups. This PR does these things.
1 parent e682102 commit e8fbbdf

File tree

4 files changed

+124
-4
lines changed

4 files changed

+124
-4
lines changed

google/cloud/alloydb/connector/async_connector.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,14 @@ async def connect(
177177
# if ip_type is str, convert to IPTypes enum
178178
if isinstance(ip_type, str):
179179
ip_type = IPTypes(ip_type.upper())
180-
conn_info = await cache.connect_info()
181-
ip_address = conn_info.get_preferred_ip(ip_type)
180+
try:
181+
conn_info = await cache.connect_info()
182+
ip_address = conn_info.get_preferred_ip(ip_type)
183+
except Exception:
184+
# with an error from AlloyDB API call or IP type, invalidate the
185+
# cache and re-raise the error
186+
await self._remove_cached(instance_uri)
187+
raise
182188
logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433")
183189

184190
# callable to be used for auto IAM authn
@@ -202,6 +208,15 @@ def get_authentication_token() -> str:
202208
await cache.force_refresh()
203209
raise
204210

211+
async def _remove_cached(self, instance_uri: str) -> None:
212+
"""Stops all background refreshes and deletes the connection
213+
info cache from the map of caches.
214+
"""
215+
logger.debug(f"['{instance_uri}']: Removing connection info from cache")
216+
# remove cache from stored caches and close it
217+
cache = self._cache.pop(instance_uri)
218+
await cache.close()
219+
205220
async def __aenter__(self) -> Any:
206221
"""Enter async context manager by returning Connector object"""
207222
return self

google/cloud/alloydb/connector/connector.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,14 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
206206
# if ip_type is str, convert to IPTypes enum
207207
if isinstance(ip_type, str):
208208
ip_type = IPTypes(ip_type.upper())
209-
conn_info = await cache.connect_info()
210-
ip_address = conn_info.get_preferred_ip(ip_type)
209+
try:
210+
conn_info = await cache.connect_info()
211+
ip_address = conn_info.get_preferred_ip(ip_type)
212+
except Exception:
213+
# with an error from AlloyDB API call or IP type, invalidate the
214+
# cache and re-raise the error
215+
await self._remove_cached(instance_uri)
216+
raise
211217
logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433")
212218

213219
# synchronous drivers are blocking and run using executor
@@ -334,6 +340,15 @@ def metadata_exchange(
334340

335341
return sock
336342

343+
async def _remove_cached(self, instance_uri: str) -> None:
344+
"""Stops all background refreshes and deletes the connection
345+
info cache from the map of caches.
346+
"""
347+
logger.debug(f"['{instance_uri}']: Removing connection info from cache")
348+
# remove cache from stored caches and close it
349+
cache = self._cache.pop(instance_uri)
350+
await cache.close()
351+
337352
def __enter__(self) -> "Connector":
338353
"""Enter context manager by returning Connector object"""
339354
return self

tests/unit/test_async_connector.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import asyncio
1616
from typing import Union
1717

18+
from aiohttp import ClientResponseError
1819
from mock import patch
1920
from mocks import FakeAlloyDBClient
2021
from mocks import FakeConnectionInfo
@@ -23,6 +24,8 @@
2324

2425
from google.cloud.alloydb.connector import AsyncConnector
2526
from google.cloud.alloydb.connector import IPTypes
27+
from google.cloud.alloydb.connector.exceptions import IPTypeNotFoundError
28+
from google.cloud.alloydb.connector.instance import RefreshAheadCache
2629

2730
ALLOYDB_API_ENDPOINT = "https://alloydb.googleapis.com"
2831

@@ -294,3 +297,39 @@ async def test_async_connect_bad_ip_type(
294297
exc_info.value.args[0]
295298
== f"Incorrect value for ip_type, got '{bad_ip_type}'. Want one of: 'PUBLIC', 'PRIVATE', 'PSC'."
296299
)
300+
301+
302+
async def test_Connector_remove_cached_bad_instance(
303+
credentials: FakeCredentials,
304+
) -> None:
305+
"""When a Connector attempts to retrieve connection info for a
306+
non-existent instance, it should delete the instance from
307+
the cache and ensure no background refresh happens (which would be
308+
wasted cycles).
309+
"""
310+
instance_uri = "projects/test-project/locations/test-region/clusters/test-cluster/instances/bad-test-instance"
311+
async with AsyncConnector(credentials=credentials) as connector:
312+
with pytest.raises(ClientResponseError):
313+
await connector.connect(instance_uri, "asyncpg")
314+
assert instance_uri not in connector._cache
315+
316+
317+
async def test_Connector_remove_cached_no_ip_type(credentials: FakeCredentials) -> None:
318+
"""When a Connector attempts to connect and preferred IP type is not present,
319+
it should delete the instance from the cache and ensure no background refresh
320+
happens (which would be wasted cycles).
321+
"""
322+
instance_uri = "projects/test-project/locations/test-region/clusters/test-cluster/instances/test-instance"
323+
# set instance to only have Public IP
324+
fake_client = FakeAlloyDBClient()
325+
fake_client.instance.ip_addrs = {"PUBLIC": "127.0.0.1"}
326+
async with AsyncConnector(credentials=credentials) as connector:
327+
connector._client = fake_client
328+
# populate cache
329+
cache = RefreshAheadCache(instance_uri, fake_client, connector._keys)
330+
connector._cache[instance_uri] = cache
331+
# test instance does not have Private IP, thus should invalidate cache
332+
with pytest.raises(IPTypeNotFoundError):
333+
await connector.connect(instance_uri, "asyncpg", ip_type="private")
334+
# check that cache has been removed from dict
335+
assert instance_uri not in connector._cache

tests/unit/test_connector.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
from threading import Thread
1717
from typing import Union
1818

19+
from aiohttp import ClientResponseError
1920
from mock import patch
2021
from mocks import FakeAlloyDBClient
2122
from mocks import FakeCredentials
2223
import pytest
2324

2425
from google.cloud.alloydb.connector import Connector
2526
from google.cloud.alloydb.connector import IPTypes
27+
from google.cloud.alloydb.connector.exceptions import IPTypeNotFoundError
28+
from google.cloud.alloydb.connector.instance import RefreshAheadCache
29+
from google.cloud.alloydb.connector.utils import generate_keys
2630

2731

2832
def test_Connector_init(credentials: FakeCredentials) -> None:
@@ -203,3 +207,50 @@ def test_Connector_close_called_multiple_times(credentials: FakeCredentials) ->
203207
assert connector._thread.is_alive() is False
204208
# call connector.close a second time
205209
connector.close()
210+
211+
212+
async def test_Connector_remove_cached_bad_instance(
213+
credentials: FakeCredentials,
214+
) -> None:
215+
"""When a Connector attempts to retrieve connection info for a
216+
non-existent instance, it should delete the instance from
217+
the cache and ensure no background refresh happens (which would be
218+
wasted cycles).
219+
"""
220+
instance_uri = "projects/test-project/locations/test-region/clusters/test-cluster/instances/bad-test-instance"
221+
with Connector(credentials) as connector:
222+
connector._keys = asyncio.wrap_future(
223+
asyncio.run_coroutine_threadsafe(
224+
generate_keys(), asyncio.get_running_loop()
225+
),
226+
loop=asyncio.get_running_loop(),
227+
)
228+
with pytest.raises(ClientResponseError):
229+
await connector.connect_async(instance_uri, "pg8000")
230+
assert instance_uri not in connector._cache
231+
232+
233+
async def test_Connector_remove_cached_no_ip_type(credentials: FakeCredentials) -> None:
234+
"""When a Connector attempts to connect and preferred IP type is not present,
235+
it should delete the instance from the cache and ensure no background refresh
236+
happens (which would be wasted cycles).
237+
"""
238+
instance_uri = "projects/test-project/locations/test-region/clusters/test-cluster/instances/test-instance"
239+
# set instance to only have Public IP
240+
fake_client = FakeAlloyDBClient()
241+
fake_client.instance.ip_addrs = {"PUBLIC": "127.0.0.1"}
242+
with Connector(credentials=credentials) as connector:
243+
connector._client = fake_client
244+
connector._keys = asyncio.wrap_future(
245+
asyncio.run_coroutine_threadsafe(
246+
generate_keys(), asyncio.get_running_loop()
247+
),
248+
loop=asyncio.get_running_loop(),
249+
)
250+
cache = RefreshAheadCache(instance_uri, fake_client, connector._keys)
251+
connector._cache[instance_uri] = cache
252+
# test instance does not have Private IP, thus should invalidate cache
253+
with pytest.raises(IPTypeNotFoundError):
254+
await connector.connect_async(instance_uri, "pg8000", ip_type="private")
255+
# check that cache has been removed from dict
256+
assert instance_uri not in connector._cache

0 commit comments

Comments
 (0)