Skip to content

Commit e4b21cd

Browse files
feat: support native asyncpg connection pools
1 parent 5dedce7 commit e4b21cd

File tree

3 files changed

+180
-15
lines changed

3 files changed

+180
-15
lines changed

README.md

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,8 @@ The `create_async_connector` allows all the same input arguments as the
502502
Once a `Connector` object is returned by `create_async_connector` you can call
503503
its `connect_async` method, just as you would the `connect` method:
504504

505+
#### SQLAlchemy Async Engine
506+
505507
```python
506508
import asyncpg
507509

@@ -511,7 +513,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
511513
from google.cloud.sql.connector import Connector, create_async_connector
512514

513515
async def init_connection_pool(connector: Connector) -> AsyncEngine:
514-
# initialize Connector object for connections to Cloud SQL
516+
# creation function to generate asyncpg connections as 'async_creator' arg
515517
async def getconn() -> asyncpg.Connection:
516518
conn: asyncpg.Connection = await connector.connect_async(
517519
"project:region:instance", # Cloud SQL instance connection name
@@ -549,6 +551,40 @@ async def main():
549551
await pool.dispose()
550552
```
551553

554+
#### Asyncpg Connection Pool
555+
556+
```python
557+
import asyncpg
558+
from google.cloud.sql.connector import Connector, create_async_connector
559+
560+
async def main():
561+
# initialize Connector object for connections to Cloud SQL
562+
connector = create_async_connector()
563+
564+
# creation function to generate asyncpg connections as the 'connect' arg
565+
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
566+
return await connector.connect_async(
567+
instance_connection_name,
568+
"asyncpg",
569+
user="my-user",
570+
password="my-password",
571+
db="my-db",
572+
**kwargs, # ... additional asyncpg args
573+
)
574+
575+
# initialize connection pool
576+
pool = await asyncpg.create_pool(
577+
"my-project:my-region:my-instance", connect=getconn
578+
)
579+
580+
# acquire connection and query Cloud SQL database
581+
async with pool.acquire() as conn:
582+
res = await conn.fetch("SELECT NOW()")
583+
584+
# close Connector
585+
await connector.close_async()
586+
```
587+
552588
For more details on additional database arguments with an `asyncpg.Connection`
553589
, please visit the
554590
[official documentation](https://magicstack.github.io/asyncpg/current/api/index.html).
@@ -564,6 +600,8 @@ calls to `connector.close_async()` to cleanup resources.
564600
> This alternative requires that the running event loop be
565601
> passed in as the `loop` argument to `Connector()`.
566602
603+
#### SQLAlchemy Async Engine
604+
567605
```python
568606
import asyncio
569607
import asyncpg
@@ -574,17 +612,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
574612
from google.cloud.sql.connector import Connector
575613

576614
async def init_connection_pool(connector: Connector) -> AsyncEngine:
577-
# initialize Connector object for connections to Cloud SQL
615+
# creation function to generate asyncpg connections as 'async_creator' arg
578616
async def getconn() -> asyncpg.Connection:
579-
conn: asyncpg.Connection = await connector.connect_async(
580-
"project:region:instance", # Cloud SQL instance connection name
581-
"asyncpg",
582-
user="my-user",
583-
password="my-password",
584-
db="my-db-name"
585-
# ... additional database driver args
586-
)
587-
return conn
617+
conn: asyncpg.Connection = await connector.connect_async(
618+
"project:region:instance", # Cloud SQL instance connection name
619+
"asyncpg",
620+
user="my-user",
621+
password="my-password",
622+
db="my-db-name"
623+
# ... additional database driver args
624+
)
625+
return conn
588626

589627
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
590628
# 'async_creator' argument to 'create_async_engine'
@@ -609,6 +647,38 @@ async def main():
609647
await pool.dispose()
610648
```
611649

650+
#### Asyncpg Connection Pool
651+
652+
```python
653+
import asyncpg
654+
from google.cloud.sql.connector import Connector, create_async_connector
655+
656+
async def main():
657+
# initialize Connector object for connections to Cloud SQL
658+
loop = asyncio.get_running_loop()
659+
async with Connector(loop=loop) as connector:
660+
661+
# creation function to generate asyncpg connections as the 'connect' arg
662+
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
663+
return await connector.connect_async(
664+
instance_connection_name,
665+
"asyncpg",
666+
user="my-user",
667+
password="my-password",
668+
db="my-db",
669+
**kwargs, # ... additional asyncpg args
670+
)
671+
672+
# create connection pool
673+
pool = await asyncpg.create_pool(
674+
"my-project:my-region:my-instance", connect=getconn
675+
)
676+
677+
# acquire connection and query Cloud SQL database
678+
async with pool.acquire() as conn:
679+
res = await conn.fetch("SELECT NOW()")
680+
```
681+
612682
### Debug Logging
613683

614684
The Cloud SQL Python Connector uses the standard [Python logging module][python-logging]

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080
"pymysql": ["PyMySQL>=1.1.0"],
8181
"pg8000": ["pg8000>=1.31.1"],
8282
"pytds": ["python-tds>=1.15.0"],
83-
"asyncpg": ["asyncpg>=0.29.0"],
83+
"asyncpg": ["asyncpg>=0.30.0"],
8484
},
8585
python_requires=">=3.9",
8686
include_package_data=True,

tests/system/test_asyncpg_connection.py

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import asyncio
1818
import os
19-
from typing import Tuple
19+
from typing import Any, Tuple
2020

2121
import asyncpg
2222
import sqlalchemy
@@ -88,7 +88,68 @@ async def getconn() -> asyncpg.Connection:
8888
return engine, connector
8989

9090

91-
async def test_connection_with_asyncpg() -> None:
91+
async def create_asyncpg_pool(
92+
instance_connection_name: str,
93+
user: str,
94+
password: str,
95+
db: str,
96+
refresh_strategy: str = "background",
97+
) -> Tuple[asyncpg.Pool, Connector]:
98+
"""Creates a native asyncpg connection pool for a Cloud SQL instance and
99+
returns the pool and the connector. Callers are responsible for closing the
100+
pool and the connector.
101+
102+
A sample invocation looks like:
103+
104+
pool, connector = await create_asyncpg_pool(
105+
inst_conn_name,
106+
user,
107+
password,
108+
db,
109+
)
110+
async with pool.acquire() as conn:
111+
hello = await conn.fetch("SELECT 'Hello World!'")
112+
# do something with query result
113+
await connector.close_async()
114+
115+
Args:
116+
instance_connection_name (str):
117+
The instance connection name specifies the instance relative to the
118+
project and region. For example: "my-project:my-region:my-instance"
119+
user (str):
120+
The database user name, e.g., postgres
121+
password (str):
122+
The database user's password, e.g., secret-password
123+
db (str):
124+
The name of the database, e.g., mydb
125+
refresh_strategy (Optional[str]):
126+
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
127+
or "background". For serverless environments use "lazy" to avoid
128+
errors resulting from CPU being throttled.
129+
"""
130+
loop = asyncio.get_running_loop()
131+
connector = Connector(loop=loop, refresh_strategy=refresh_strategy)
132+
133+
async def getconn(
134+
instance_connection_name: str, **kwargs: Any
135+
) -> asyncpg.Connection:
136+
conn: asyncpg.Connection = await connector.connect_async(
137+
instance_connection_name,
138+
"asyncpg",
139+
user=user,
140+
password=password,
141+
db=db,
142+
ip_type="public", # can also be "private" or "psc",
143+
**kwargs
144+
)
145+
return conn
146+
147+
# create native asyncpg pool (requires asyncpg version >=0.30.0)
148+
pool = await asyncpg.create_pool(instance_connection_name, connect=getconn)
149+
return pool, connector
150+
151+
152+
async def test_sqlalchemy_connection_with_asyncpg() -> None:
92153
"""Basic test to get time from database."""
93154
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
94155
user = os.environ["POSTGRES_USER"]
@@ -104,7 +165,7 @@ async def test_connection_with_asyncpg() -> None:
104165
await connector.close_async()
105166

106167

107-
async def test_lazy_connection_with_asyncpg() -> None:
168+
async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None:
108169
"""Basic test to get time from database."""
109170
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
110171
user = os.environ["POSTGRES_USER"]
@@ -120,3 +181,37 @@ async def test_lazy_connection_with_asyncpg() -> None:
120181
assert res[0] == 1
121182

122183
await connector.close_async()
184+
185+
186+
async def test_connection_with_asyncpg() -> None:
187+
"""Basic test to get time from database."""
188+
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
189+
user = os.environ["POSTGRES_USER"]
190+
password = os.environ["POSTGRES_PASS"]
191+
db = os.environ["POSTGRES_DB"]
192+
193+
pool, connector = await create_asyncpg_pool(inst_conn_name, user, password, db)
194+
195+
async with pool.acquire() as conn:
196+
res = await conn.fetch("SELECT 1")
197+
assert res[0][0] == 1
198+
199+
await connector.close_async()
200+
201+
202+
async def test_lazy_connection_with_asyncpg() -> None:
203+
"""Basic test to get time from database."""
204+
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
205+
user = os.environ["POSTGRES_USER"]
206+
password = os.environ["POSTGRES_PASS"]
207+
db = os.environ["POSTGRES_DB"]
208+
209+
pool, connector = await create_asyncpg_pool(
210+
inst_conn_name, user, password, db, "lazy"
211+
)
212+
213+
async with pool.acquire() as conn:
214+
res = await conn.fetch("SELECT 1")
215+
assert res[0][0] == 1
216+
217+
await connector.close_async()

0 commit comments

Comments
 (0)