Skip to content

Commit 0bed674

Browse files
feat: support native asyncpg connection pools
This changed is analogous to CloudSQL's change: GoogleCloudPlatform/cloud-sql-python-connector#1182
1 parent 40bada7 commit 0bed674

File tree

3 files changed

+176
-14
lines changed

3 files changed

+176
-14
lines changed

README.md

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,42 @@ currently supports the following asyncio database drivers:
251251

252252
[asyncio]: https://docs.python.org/3/library/asyncio.html
253253

254+
#### Asyncpg Connection Pool
255+
256+
```python
257+
import asyncpg
258+
from google.cloud.alloydb.connector import AsyncConnector
259+
260+
async def main():
261+
# initialize Connector object for connections to AlloyDB
262+
connector = AsyncConnector()
263+
264+
# creation function to generate asyncpg connections as the 'connect' arg
265+
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
266+
return await connector.connect(
267+
instance_connection_name,
268+
"asyncpg",
269+
user="my-user",
270+
password="my-password",
271+
db="my-db",
272+
)
273+
274+
# initialize connection pool
275+
pool = await asyncpg.create_pool(
276+
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
277+
connect=getconn,
278+
)
279+
280+
# acquire connection and query AlloyDB database
281+
async with pool.acquire() as conn:
282+
res = await conn.fetch("SELECT NOW()")
283+
284+
# close Connector
285+
await connector.close()
286+
```
287+
288+
#### SQLAlchemy Async Engine
289+
254290
```python
255291
import asyncpg
256292

@@ -260,7 +296,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
260296
from google.cloud.alloydb.connector import AsyncConnector
261297

262298
async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
263-
# initialize Connector object for connections to AlloyDB
299+
# creation function to generate asyncpg connections as 'async_creator' arg
264300
async def getconn() -> asyncpg.Connection:
265301
conn: asyncpg.Connection = await connector.connect(
266302
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
@@ -311,6 +347,39 @@ visit the [official documentation][asyncpg-docs].
311347
The `AsyncConnector` also may be used as an async context manager, removing the
312348
need for explicit calls to `connector.close()` to cleanup resources.
313349

350+
#### Asyncpg Connection Pool
351+
352+
```python
353+
import asyncpg
354+
from google.cloud.alloydb.connector import AsyncConnector
355+
356+
async def main():
357+
# initialize AsyncConnector object for connections to AlloyDB
358+
async with AsyncConnector() as connector:
359+
360+
# creation function to generate asyncpg connections as the 'connect' arg
361+
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
362+
return await connector.connect(
363+
instance_connection_name,
364+
"asyncpg",
365+
user="my-user",
366+
password="my-password",
367+
db="my-db",
368+
)
369+
370+
# create connection pool
371+
pool = await asyncpg.create_pool(
372+
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
373+
connect=getconn,
374+
)
375+
376+
# acquire connection and query AlloyDB database
377+
async with pool.acquire() as conn:
378+
res = await conn.fetch("SELECT NOW()")
379+
```
380+
381+
#### SQLAlchemy Async Engine
382+
314383
```python
315384
import asyncio
316385
import asyncpg
@@ -321,17 +390,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
321390
from google.cloud.alloydb.connector import AsyncConnector
322391

323392
async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
324-
# initialize Connector object for connections to AlloyDB
393+
# creation function to generate asyncpg connections as 'async_creator' arg
325394
async def getconn() -> asyncpg.Connection:
326-
conn: asyncpg.Connection = await connector.connect(
327-
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
328-
"asyncpg",
329-
user="my-user",
330-
password="my-password",
331-
db="my-db-name"
332-
# ... additional database driver args
333-
)
334-
return conn
395+
conn: asyncpg.Connection = await connector.connect(
396+
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
397+
"asyncpg",
398+
user="my-user",
399+
password="my-password",
400+
db="my-db-name"
401+
# ... additional database driver args
402+
)
403+
return conn
335404

336405
# The AlloyDB Python Connector can be used along with SQLAlchemy using the
337406
# 'async_creator' argument to 'create_async_engine'

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
install_requires=dependencies,
7878
extras_require={
7979
"pg8000": ["pg8000>=1.31.1"],
80-
"asyncpg": ["asyncpg>=0.29.0"],
80+
"asyncpg": ["asyncpg>=0.30.0"],
8181
},
8282
python_requires=">=3.9",
8383
include_package_data=True,

tests/system/test_asyncpg_connection.py

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import os
16+
from typing import Any
1617
from typing import Tuple
1718

1819
# [START alloydb_sqlalchemy_connect_async_connector]
@@ -88,7 +89,65 @@ async def getconn() -> asyncpg.Connection:
8889
# [END alloydb_sqlalchemy_connect_async_connector]
8990

9091

91-
async def test_connection_with_asyncpg() -> None:
92+
async def create_asyncpg_pool(
93+
instance_connection_name: str,
94+
user: str,
95+
password: str,
96+
db: str,
97+
refresh_strategy: str = "background",
98+
) -> tuple[asyncpg.Pool, AsyncConnector]:
99+
"""Creates a native asyncpg connection pool for an AlloyDB instance and
100+
returns the pool and the connector. Callers are responsible for closing the
101+
pool and the connector.
102+
103+
A sample invocation looks like:
104+
105+
pool, connector = await create_asyncpg_pool(
106+
inst_conn_name,
107+
user,
108+
password,
109+
db,
110+
)
111+
async with pool.acquire() as conn:
112+
hello = await conn.fetch("SELECT 'Hello World!'")
113+
# do something with query result
114+
await connector.close()
115+
116+
Args:
117+
instance_connection_name (str):
118+
The instance connection name specifies the instance relative to the
119+
project and region. For example: "my-project:my-region:my-instance"
120+
user (str):
121+
The database user name, e.g., postgres
122+
password (str):
123+
The database user's password, e.g., secret-password
124+
db (str):
125+
The name of the database, e.g., mydb
126+
refresh_strategy (Optional[str]):
127+
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
128+
or "background". For serverless environments use "lazy" to avoid
129+
errors resulting from CPU being throttled.
130+
"""
131+
connector = AsyncConnector(refresh_strategy=refresh_strategy)
132+
133+
async def getconn(
134+
instance_connection_name: str, **kwargs: Any
135+
) -> asyncpg.Connection:
136+
conn: asyncpg.Connection = await connector.connect(
137+
instance_connection_name,
138+
"asyncpg",
139+
user=user,
140+
password=password,
141+
db=db,
142+
)
143+
return conn
144+
145+
# create native asyncpg pool (requires asyncpg version >=0.30.0)
146+
pool = await asyncpg.create_pool(instance_connection_name, connect=getconn)
147+
return pool, connector
148+
149+
150+
async def test_sqlalchemy_connection_with_asyncpg() -> None:
92151
"""Basic test to get time from database."""
93152
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
94153
user = os.environ["ALLOYDB_USER"]
@@ -104,7 +163,7 @@ async def test_connection_with_asyncpg() -> None:
104163
await connector.close()
105164

106165

107-
async def test_lazy_connection_with_asyncpg() -> None:
166+
async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None:
108167
"""Basic test to get time from database."""
109168
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
110169
user = os.environ["ALLOYDB_USER"]
@@ -120,3 +179,37 @@ async def test_lazy_connection_with_asyncpg() -> None:
120179
assert res[0] == 1
121180

122181
await connector.close()
182+
183+
184+
async def test_connection_with_asyncpg() -> None:
185+
"""Basic test to get time from database."""
186+
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
187+
user = os.environ["POSTGRES_USER"]
188+
password = os.environ["POSTGRES_PASS"]
189+
db = os.environ["POSTGRES_DB"]
190+
191+
pool, connector = await create_asyncpg_pool(inst_conn_name, user, password, db)
192+
193+
async with pool.acquire() as conn:
194+
res = await conn.fetch("SELECT 1")
195+
assert res[0][0] == 1
196+
197+
await connector.close()
198+
199+
200+
async def test_lazy_connection_with_asyncpg() -> None:
201+
"""Basic test to get time from database."""
202+
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
203+
user = os.environ["POSTGRES_USER"]
204+
password = os.environ["POSTGRES_PASS"]
205+
db = os.environ["POSTGRES_DB"]
206+
207+
pool, connector = await create_asyncpg_pool(
208+
inst_conn_name, user, password, db, "lazy"
209+
)
210+
211+
async with pool.acquire() as conn:
212+
res = await conn.fetch("SELECT 1")
213+
assert res[0][0] == 1
214+
215+
await connector.close()

0 commit comments

Comments
 (0)