Skip to content

Commit eb95a9d

Browse files
feat: support native asyncpg connection pools (#1182)
As of asyncpg v0.30.0, asyncpg native connection pools now support a creation function (callable) via its connect argument, similar to SQLAlchemy's async_creator argument to generate connections. Adding integration test and usage samples to README. Also bumping asyncpg min supported version in setup.py to 0.30.0 to force supported version for this feature.
1 parent 5dedce7 commit eb95a9d

File tree

3 files changed

+180
-15
lines changed

3 files changed

+180
-15
lines changed

README.md

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,42 @@ The `create_async_connector` allows all the same input arguments as the
502502
Once a `Connector` object is returned by `create_async_connector` you can call
503503
its `connect_async` method, just as you would the `connect` method:
504504

505+
#### Asyncpg Connection Pool
506+
507+
```python
508+
import asyncpg
509+
from google.cloud.sql.connector import Connector, create_async_connector
510+
511+
async def main():
512+
# initialize Connector object for connections to Cloud SQL
513+
connector = create_async_connector()
514+
515+
# creation function to generate asyncpg connections as the 'connect' arg
516+
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
517+
return await connector.connect_async(
518+
instance_connection_name,
519+
"asyncpg",
520+
user="my-user",
521+
password="my-password",
522+
db="my-db",
523+
**kwargs, # ... additional asyncpg args
524+
)
525+
526+
# initialize connection pool
527+
pool = await asyncpg.create_pool(
528+
"my-project:my-region:my-instance", connect=getconn
529+
)
530+
531+
# acquire connection and query Cloud SQL database
532+
async with pool.acquire() as conn:
533+
res = await conn.fetch("SELECT NOW()")
534+
535+
# close Connector
536+
await connector.close_async()
537+
```
538+
539+
#### SQLAlchemy Async Engine
540+
505541
```python
506542
import asyncpg
507543

@@ -511,7 +547,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
511547
from google.cloud.sql.connector import Connector, create_async_connector
512548

513549
async def init_connection_pool(connector: Connector) -> AsyncEngine:
514-
# initialize Connector object for connections to Cloud SQL
550+
# creation function to generate asyncpg connections as 'async_creator' arg
515551
async def getconn() -> asyncpg.Connection:
516552
conn: asyncpg.Connection = await connector.connect_async(
517553
"project:region:instance", # Cloud SQL instance connection name
@@ -564,6 +600,40 @@ calls to `connector.close_async()` to cleanup resources.
564600
> This alternative requires that the running event loop be
565601
> passed in as the `loop` argument to `Connector()`.
566602
603+
#### Asyncpg Connection Pool
604+
605+
```python
606+
import asyncpg
607+
from google.cloud.sql.connector import Connector, create_async_connector
608+
609+
async def main():
610+
# initialize Connector object for connections to Cloud SQL
611+
loop = asyncio.get_running_loop()
612+
async with Connector(loop=loop) as connector:
613+
614+
# creation function to generate asyncpg connections as the 'connect' arg
615+
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
616+
return await connector.connect_async(
617+
instance_connection_name,
618+
"asyncpg",
619+
user="my-user",
620+
password="my-password",
621+
db="my-db",
622+
**kwargs, # ... additional asyncpg args
623+
)
624+
625+
# create connection pool
626+
pool = await asyncpg.create_pool(
627+
"my-project:my-region:my-instance", connect=getconn
628+
)
629+
630+
# acquire connection and query Cloud SQL database
631+
async with pool.acquire() as conn:
632+
res = await conn.fetch("SELECT NOW()")
633+
```
634+
635+
#### SQLAlchemy Async Engine
636+
567637
```python
568638
import asyncio
569639
import asyncpg
@@ -574,17 +644,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
574644
from google.cloud.sql.connector import Connector
575645

576646
async def init_connection_pool(connector: Connector) -> AsyncEngine:
577-
# initialize Connector object for connections to Cloud SQL
647+
# creation function to generate asyncpg connections as 'async_creator' arg
578648
async def getconn() -> asyncpg.Connection:
579-
conn: asyncpg.Connection = await connector.connect_async(
580-
"project:region:instance", # Cloud SQL instance connection name
581-
"asyncpg",
582-
user="my-user",
583-
password="my-password",
584-
db="my-db-name"
585-
# ... additional database driver args
586-
)
587-
return conn
649+
conn: asyncpg.Connection = await connector.connect_async(
650+
"project:region:instance", # Cloud SQL instance connection name
651+
"asyncpg",
652+
user="my-user",
653+
password="my-password",
654+
db="my-db-name"
655+
# ... additional database driver args
656+
)
657+
return conn
588658

589659
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
590660
# 'async_creator' argument to 'create_async_engine'

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
"pymysql": ["PyMySQL>=1.1.0"],
8181
"pg8000": ["pg8000>=1.31.1"],
8282
"pytds": ["python-tds>=1.15.0"],
83-
"asyncpg": ["asyncpg>=0.29.0"],
83+
"asyncpg": ["asyncpg>=0.30.0"],
8484
},
8585
python_requires=">=3.9",
8686
include_package_data=True,

tests/system/test_asyncpg_connection.py

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import asyncio
1818
import os
19-
from typing import Tuple
19+
from typing import Any, Tuple
2020

2121
import asyncpg
2222
import sqlalchemy
@@ -88,7 +88,68 @@ async def getconn() -> asyncpg.Connection:
8888
return engine, connector
8989

9090

91-
async def test_connection_with_asyncpg() -> None:
91+
async def create_asyncpg_pool(
92+
instance_connection_name: str,
93+
user: str,
94+
password: str,
95+
db: str,
96+
refresh_strategy: str = "background",
97+
) -> Tuple[asyncpg.Pool, Connector]:
98+
"""Creates a native asyncpg connection pool for a Cloud SQL instance and
99+
returns the pool and the connector. Callers are responsible for closing the
100+
pool and the connector.
101+
102+
A sample invocation looks like:
103+
104+
pool, connector = await create_asyncpg_pool(
105+
inst_conn_name,
106+
user,
107+
password,
108+
db,
109+
)
110+
async with pool.acquire() as conn:
111+
hello = await conn.fetch("SELECT 'Hello World!'")
112+
# do something with query result
113+
await connector.close_async()
114+
115+
Args:
116+
instance_connection_name (str):
117+
The instance connection name specifies the instance relative to the
118+
project and region. For example: "my-project:my-region:my-instance"
119+
user (str):
120+
The database user name, e.g., postgres
121+
password (str):
122+
The database user's password, e.g., secret-password
123+
db (str):
124+
The name of the database, e.g., mydb
125+
refresh_strategy (Optional[str]):
126+
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
127+
or "background". For serverless environments use "lazy" to avoid
128+
errors resulting from CPU being throttled.
129+
"""
130+
loop = asyncio.get_running_loop()
131+
connector = Connector(loop=loop, refresh_strategy=refresh_strategy)
132+
133+
async def getconn(
134+
instance_connection_name: str, **kwargs: Any
135+
) -> asyncpg.Connection:
136+
conn: asyncpg.Connection = await connector.connect_async(
137+
instance_connection_name,
138+
"asyncpg",
139+
user=user,
140+
password=password,
141+
db=db,
142+
ip_type="public", # can also be "private" or "psc",
143+
**kwargs
144+
)
145+
return conn
146+
147+
# create native asyncpg pool (requires asyncpg version >=0.30.0)
148+
pool = await asyncpg.create_pool(instance_connection_name, connect=getconn)
149+
return pool, connector
150+
151+
152+
async def test_sqlalchemy_connection_with_asyncpg() -> None:
92153
"""Basic test to get time from database."""
93154
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
94155
user = os.environ["POSTGRES_USER"]
@@ -104,7 +165,7 @@ async def test_connection_with_asyncpg() -> None:
104165
await connector.close_async()
105166

106167

107-
async def test_lazy_connection_with_asyncpg() -> None:
168+
async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None:
108169
"""Basic test to get time from database."""
109170
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
110171
user = os.environ["POSTGRES_USER"]
@@ -120,3 +181,37 @@ async def test_lazy_connection_with_asyncpg() -> None:
120181
assert res[0] == 1
121182

122183
await connector.close_async()
184+
185+
186+
async def test_connection_with_asyncpg() -> None:
187+
"""Basic test to get time from database."""
188+
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
189+
user = os.environ["POSTGRES_USER"]
190+
password = os.environ["POSTGRES_PASS"]
191+
db = os.environ["POSTGRES_DB"]
192+
193+
pool, connector = await create_asyncpg_pool(inst_conn_name, user, password, db)
194+
195+
async with pool.acquire() as conn:
196+
res = await conn.fetch("SELECT 1")
197+
assert res[0][0] == 1
198+
199+
await connector.close_async()
200+
201+
202+
async def test_lazy_connection_with_asyncpg() -> None:
203+
"""Basic test to get time from database."""
204+
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
205+
user = os.environ["POSTGRES_USER"]
206+
password = os.environ["POSTGRES_PASS"]
207+
db = os.environ["POSTGRES_DB"]
208+
209+
pool, connector = await create_asyncpg_pool(
210+
inst_conn_name, user, password, db, "lazy"
211+
)
212+
213+
async with pool.acquire() as conn:
214+
res = await conn.fetch("SELECT 1")
215+
assert res[0][0] == 1
216+
217+
await connector.close_async()

0 commit comments

Comments
 (0)