1616
1717import  asyncio 
1818import  os 
19- from  typing  import  Tuple 
19+ from  typing  import  Any ,  Tuple 
2020
2121import  asyncpg 
2222import  sqlalchemy 
@@ -88,7 +88,68 @@ async def getconn() -> asyncpg.Connection:
8888    return  engine , connector 
8989
9090
91- async  def  test_connection_with_asyncpg () ->  None :
91+ async  def  create_asyncpg_pool (
92+     instance_connection_name : str ,
93+     user : str ,
94+     password : str ,
95+     db : str ,
96+     refresh_strategy : str  =  "background" ,
97+ ) ->  Tuple [asyncpg .Pool , Connector ]:
98+     """Creates a native asyncpg connection pool for a Cloud SQL instance and 
99+     returns the pool and the connector. Callers are responsible for closing the 
100+     pool and the connector. 
101+ 
102+     A sample invocation looks like: 
103+ 
104+         pool, connector = await create_asyncpg_pool( 
105+             inst_conn_name, 
106+             user, 
107+             password, 
108+             db, 
109+         ) 
110+         async with pool.acquire() as conn: 
111+             hello = await conn.fetch("SELECT 'Hello World!'") 
112+             # do something with query result 
113+             await connector.close_async() 
114+ 
115+     Args: 
116+         instance_connection_name (str): 
117+             The instance connection name specifies the instance relative to the 
118+             project and region. For example: "my-project:my-region:my-instance" 
119+         user (str): 
120+             The database user name, e.g., postgres 
121+         password (str): 
122+             The database user's password, e.g., secret-password 
123+         db (str): 
124+             The name of the database, e.g., mydb 
125+         refresh_strategy (Optional[str]): 
126+             Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" 
127+             or "background". For serverless environments use "lazy" to avoid 
128+             errors resulting from CPU being throttled. 
129+     """ 
130+     loop  =  asyncio .get_running_loop ()
131+     connector  =  Connector (loop = loop , refresh_strategy = refresh_strategy )
132+ 
133+     async  def  getconn (
134+         instance_connection_name : str , ** kwargs : Any 
135+     ) ->  asyncpg .Connection :
136+         conn : asyncpg .Connection  =  await  connector .connect_async (
137+             instance_connection_name ,
138+             "asyncpg" ,
139+             user = user ,
140+             password = password ,
141+             db = db ,
142+             ip_type = "public" ,  # can also be "private" or "psc", 
143+             ** kwargs 
144+         )
145+         return  conn 
146+ 
147+     # create native asyncpg pool (requires asyncpg version >=0.30.0) 
148+     pool  =  await  asyncpg .create_pool (instance_connection_name , connect = getconn )
149+     return  pool , connector 
150+ 
151+ 
152+ async  def  test_sqlalchemy_connection_with_asyncpg () ->  None :
92153    """Basic test to get time from database.""" 
93154    inst_conn_name  =  os .environ ["POSTGRES_CONNECTION_NAME" ]
94155    user  =  os .environ ["POSTGRES_USER" ]
@@ -104,7 +165,7 @@ async def test_connection_with_asyncpg() -> None:
104165    await  connector .close_async ()
105166
106167
107- async  def  test_lazy_connection_with_asyncpg () ->  None :
168+ async  def  test_lazy_sqlalchemy_connection_with_asyncpg () ->  None :
108169    """Basic test to get time from database.""" 
109170    inst_conn_name  =  os .environ ["POSTGRES_CONNECTION_NAME" ]
110171    user  =  os .environ ["POSTGRES_USER" ]
@@ -120,3 +181,37 @@ async def test_lazy_connection_with_asyncpg() -> None:
120181        assert  res [0 ] ==  1 
121182
122183    await  connector .close_async ()
184+ 
185+ 
186+ async  def  test_connection_with_asyncpg () ->  None :
187+     """Basic test to get time from database.""" 
188+     inst_conn_name  =  os .environ ["POSTGRES_CONNECTION_NAME" ]
189+     user  =  os .environ ["POSTGRES_USER" ]
190+     password  =  os .environ ["POSTGRES_PASS" ]
191+     db  =  os .environ ["POSTGRES_DB" ]
192+ 
193+     pool , connector  =  await  create_asyncpg_pool (inst_conn_name , user , password , db )
194+ 
195+     async  with  pool .acquire () as  conn :
196+         res  =  await  conn .fetch ("SELECT 1" )
197+         assert  res [0 ][0 ] ==  1 
198+ 
199+     await  connector .close_async ()
200+ 
201+ 
202+ async  def  test_lazy_connection_with_asyncpg () ->  None :
203+     """Basic test to get time from database.""" 
204+     inst_conn_name  =  os .environ ["POSTGRES_CONNECTION_NAME" ]
205+     user  =  os .environ ["POSTGRES_USER" ]
206+     password  =  os .environ ["POSTGRES_PASS" ]
207+     db  =  os .environ ["POSTGRES_DB" ]
208+ 
209+     pool , connector  =  await  create_asyncpg_pool (
210+         inst_conn_name , user , password , db , "lazy" 
211+     )
212+ 
213+     async  with  pool .acquire () as  conn :
214+         res  =  await  conn .fetch ("SELECT 1" )
215+         assert  res [0 ][0 ] ==  1 
216+ 
217+     await  connector .close_async ()
0 commit comments