Skip to content

Commit ea2a524

Browse files
feat: support native asyncpg connection pools (#409)
This changed is analogous to CloudSQL's change: GoogleCloudPlatform/cloud-sql-python-connector#1182
1 parent aa2dedb commit ea2a524

File tree

3 files changed

+174
-15
lines changed

3 files changed

+174
-15
lines changed

README.md

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,42 @@ currently supports the following asyncio database drivers:
251251

252252
[asyncio]: https://docs.python.org/3/library/asyncio.html
253253

254+
#### Asyncpg Connection Pool
255+
256+
```python
257+
import asyncpg
258+
from google.cloud.alloydb.connector import AsyncConnector
259+
260+
async def main():
261+
# initialize Connector object for connections to AlloyDB
262+
connector = AsyncConnector()
263+
264+
# creation function to generate asyncpg connections as the 'connect' arg
265+
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
266+
return await connector.connect(
267+
instance_connection_name,
268+
"asyncpg",
269+
user="my-user",
270+
password="my-password",
271+
db="my-db",
272+
)
273+
274+
# initialize connection pool
275+
pool = await asyncpg.create_pool(
276+
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
277+
connect=getconn,
278+
)
279+
280+
# acquire connection and query AlloyDB database
281+
async with pool.acquire() as conn:
282+
res = await conn.fetch("SELECT NOW()")
283+
284+
# close Connector
285+
await connector.close()
286+
```
287+
288+
#### SQLAlchemy Async Engine
289+
254290
```python
255291
import asyncpg
256292

@@ -260,7 +296,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
260296
from google.cloud.alloydb.connector import AsyncConnector
261297

262298
async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
263-
# initialize Connector object for connections to AlloyDB
299+
# creation function to generate asyncpg connections as 'async_creator' arg
264300
async def getconn() -> asyncpg.Connection:
265301
conn: asyncpg.Connection = await connector.connect(
266302
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
@@ -311,6 +347,39 @@ visit the [official documentation][asyncpg-docs].
311347
The `AsyncConnector` also may be used as an async context manager, removing the
312348
need for explicit calls to `connector.close()` to cleanup resources.
313349

350+
#### Asyncpg Connection Pool
351+
352+
```python
353+
import asyncpg
354+
from google.cloud.alloydb.connector import AsyncConnector
355+
356+
async def main():
357+
# initialize AsyncConnector object for connections to AlloyDB
358+
async with AsyncConnector() as connector:
359+
360+
# creation function to generate asyncpg connections as the 'connect' arg
361+
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
362+
return await connector.connect(
363+
instance_connection_name,
364+
"asyncpg",
365+
user="my-user",
366+
password="my-password",
367+
db="my-db",
368+
)
369+
370+
# create connection pool
371+
pool = await asyncpg.create_pool(
372+
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
373+
connect=getconn,
374+
)
375+
376+
# acquire connection and query AlloyDB database
377+
async with pool.acquire() as conn:
378+
res = await conn.fetch("SELECT NOW()")
379+
```
380+
381+
#### SQLAlchemy Async Engine
382+
314383
```python
315384
import asyncio
316385
import asyncpg
@@ -321,17 +390,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
321390
from google.cloud.alloydb.connector import AsyncConnector
322391

323392
async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
324-
# initialize Connector object for connections to AlloyDB
393+
# creation function to generate asyncpg connections as 'async_creator' arg
325394
async def getconn() -> asyncpg.Connection:
326-
conn: asyncpg.Connection = await connector.connect(
327-
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
328-
"asyncpg",
329-
user="my-user",
330-
password="my-password",
331-
db="my-db-name"
332-
# ... additional database driver args
333-
)
334-
return conn
395+
conn: asyncpg.Connection = await connector.connect(
396+
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
397+
"asyncpg",
398+
user="my-user",
399+
password="my-password",
400+
db="my-db-name"
401+
# ... additional database driver args
402+
)
403+
return conn
335404

336405
# The AlloyDB Python Connector can be used along with SQLAlchemy using the
337406
# 'async_creator' argument to 'create_async_engine'

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
install_requires=dependencies,
7878
extras_require={
7979
"pg8000": ["pg8000>=1.31.1"],
80-
"asyncpg": ["asyncpg>=0.29.0"],
80+
"asyncpg": ["asyncpg>=0.30.0"],
8181
},
8282
python_requires=">=3.9",
8383
include_package_data=True,

tests/system/test_asyncpg_connection.py

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
import os
16-
from typing import Tuple
16+
from typing import Any, Tuple
1717

1818
# [START alloydb_sqlalchemy_connect_async_connector]
1919
import asyncpg
@@ -88,7 +88,65 @@ async def getconn() -> asyncpg.Connection:
8888
# [END alloydb_sqlalchemy_connect_async_connector]
8989

9090

91-
async def test_connection_with_asyncpg() -> None:
91+
async def create_asyncpg_pool(
92+
instance_connection_name: str,
93+
user: str,
94+
password: str,
95+
db: str,
96+
refresh_strategy: str = "background",
97+
) -> tuple[asyncpg.Pool, AsyncConnector]:
98+
"""Creates a native asyncpg connection pool for an AlloyDB instance and
99+
returns the pool and the connector. Callers are responsible for closing the
100+
pool and the connector.
101+
102+
A sample invocation looks like:
103+
104+
pool, connector = await create_asyncpg_pool(
105+
inst_conn_name,
106+
user,
107+
password,
108+
db,
109+
)
110+
async with pool.acquire() as conn:
111+
hello = await conn.fetch("SELECT 'Hello World!'")
112+
# do something with query result
113+
await connector.close()
114+
115+
Args:
116+
instance_connection_name (str):
117+
The instance connection name specifies the instance relative to the
118+
project and region. For example: "my-project:my-region:my-instance"
119+
user (str):
120+
The database user name, e.g., postgres
121+
password (str):
122+
The database user's password, e.g., secret-password
123+
db (str):
124+
The name of the database, e.g., mydb
125+
refresh_strategy (Optional[str]):
126+
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
127+
or "background". For serverless environments use "lazy" to avoid
128+
errors resulting from CPU being throttled.
129+
"""
130+
connector = AsyncConnector(refresh_strategy=refresh_strategy)
131+
132+
async def getconn(
133+
instance_connection_name: str, **kwargs: Any
134+
) -> asyncpg.Connection:
135+
conn: asyncpg.Connection = await connector.connect(
136+
instance_connection_name,
137+
"asyncpg",
138+
user=user,
139+
password=password,
140+
db=db,
141+
)
142+
return conn
143+
144+
# create native asyncpg pool (requires asyncpg version >=0.30.0)
145+
pool = await asyncpg.create_pool(instance_connection_name, connect=getconn)
146+
return pool, connector
147+
148+
149+
async def test_sqlalchemy_connection_with_asyncpg() -> None:
92150
"""Basic test to get time from database."""
93151
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
94152
user = os.environ["ALLOYDB_USER"]
@@ -104,7 +162,7 @@ async def test_connection_with_asyncpg() -> None:
104162
await connector.close()
105163

106164

107-
async def test_lazy_connection_with_asyncpg() -> None:
165+
async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None:
108166
"""Basic test to get time from database."""
109167
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
110168
user = os.environ["ALLOYDB_USER"]
@@ -120,3 +178,35 @@ async def test_lazy_connection_with_asyncpg() -> None:
120178
assert res[0] == 1
121179

122180
await connector.close()
181+
182+
183+
async def test_connection_with_asyncpg() -> None:
184+
"""Basic test to get time from database."""
185+
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
186+
user = os.environ["ALLOYDB_USER"]
187+
password = os.environ["ALLOYDB_PASS"]
188+
db = os.environ["ALLOYDB_DB"]
189+
190+
pool, connector = await create_asyncpg_pool(inst_uri, user, password, db)
191+
192+
async with pool.acquire() as conn:
193+
res = await conn.fetch("SELECT 1")
194+
assert res[0][0] == 1
195+
196+
await connector.close()
197+
198+
199+
async def test_lazy_connection_with_asyncpg() -> None:
200+
"""Basic test to get time from database."""
201+
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
202+
user = os.environ["ALLOYDB_USER"]
203+
password = os.environ["ALLOYDB_PASS"]
204+
db = os.environ["ALLOYDB_DB"]
205+
206+
pool, connector = await create_asyncpg_pool(inst_uri, user, password, db, "lazy")
207+
208+
async with pool.acquire() as conn:
209+
res = await conn.fetch("SELECT 1")
210+
assert res[0][0] == 1
211+
212+
await connector.close()

0 commit comments

Comments
 (0)