1818import logging
1919from typing import Optional , TYPE_CHECKING
2020
21- import aiohttp
2221from cryptography import x509
22+ from google .api_core .client_options import ClientOptions
23+ from google .api_core .gapic_v1 .client_info import ClientInfo
2324from google .auth .credentials import TokenState
2425from google .auth .transport import requests
2526
27+ from google .cloud import alloydb_v1beta
2628from google .cloud .alloydb .connector .connection_info import ConnectionInfo
2729from google .cloud .alloydb .connector .version import __version__ as version
30+ from google .protobuf import duration_pb2
2831
2932if TYPE_CHECKING :
3033 from google .auth .credentials import Credentials
@@ -55,7 +58,7 @@ def __init__(
5558 alloydb_api_endpoint : str ,
5659 quota_project : Optional [str ],
5760 credentials : Credentials ,
58- client : Optional [aiohttp . ClientSession ] = None ,
61+ client : Optional [alloydb_v1beta . AlloyDBAdminAsyncClient ] = None ,
5962 driver : Optional [str ] = None ,
6063 user_agent : Optional [str ] = None ,
6164 ) -> None :
@@ -72,21 +75,23 @@ def __init__(
7275 A credentials object created from the google-auth Python library.
7376 Must have the AlloyDB Admin scopes. For more info check out
7477 https://google-auth.readthedocs.io/en/latest/.
75- client (aiohttp.ClientSession ): Async client used to make requests to
76- AlloyDB APIs.
78+ client (alloydb_v1.AlloyDBAdminAsyncClient ): Async client used to
79+ make requests to AlloyDB APIs.
7780 Optional, defaults to None and creates new client.
7881 driver (str): Database driver to be used by the client.
7982 """
8083 user_agent = _format_user_agent (driver , user_agent )
81- headers = {
82- "x-goog-api-client" : user_agent ,
83- "User-Agent" : user_agent ,
84- "Content-Type" : "application/json" ,
85- }
86- if quota_project :
87- headers ["x-goog-user-project" ] = quota_project
8884
89- self ._client = client if client else aiohttp .ClientSession (headers = headers )
85+ self ._client = client if client else alloydb_v1beta .AlloyDBAdminAsyncClient (
86+ credentials = credentials ,
87+ client_options = ClientOptions (
88+ api_endpoint = alloydb_api_endpoint ,
89+ quota_project_id = quota_project ,
90+ ),
91+ client_info = ClientInfo (
92+ user_agent = user_agent ,
93+ ),
94+ )
9095 self ._credentials = credentials
9196 self ._alloydb_api_endpoint = alloydb_api_endpoint
9297 # asyncpg does not currently support using metadata exchange
@@ -118,35 +123,33 @@ async def _get_metadata(
118123 Returns:
119124 dict: IP addresses of the AlloyDB instance.
120125 """
121- headers = {
122- "Authorization" : f"Bearer { self ._credentials .token } " ,
123- }
124-
125- url = f"{ self ._alloydb_api_endpoint } /{ API_VERSION } /projects/{ project } /locations/{ region } /clusters/{ cluster } /instances/{ name } /connectionInfo"
126-
127- resp = await self ._client .get (url , headers = headers )
128- # try to get response json for better error message
129- try :
130- resp_dict = await resp .json ()
131- if resp .status >= 400 :
132- # if detailed error message is in json response, use as error message
133- message = resp_dict .get ("error" , {}).get ("message" )
134- if message :
135- resp .reason = message
136- # skip, raise_for_status will catch all errors in finally block
137- except Exception :
138- pass
139- finally :
140- resp .raise_for_status ()
126+ parent = f"{ self ._alloydb_api_endpoint } /{ API_VERSION } /projects/{ project } /locations/{ region } /clusters/{ cluster } /instances/{ name } "
127+
128+ req = alloydb_v1beta .GetConnectionInfoRequest (parent = parent )
129+ resp = await self ._client .get_connection_info (request = req )
130+ resp = await resp
131+ # # try to get response json for better error message
132+ # try:
133+ # resp_dict = await resp.json()
134+ # if resp.status >= 400:
135+ # # if detailed error message is in json response, use as error message
136+ # message = resp_dict.get("error", {}).get("message")
137+ # if message:
138+ # resp.reason = message
139+ # # skip, raise_for_status will catch all errors in finally block
140+ # except Exception:
141+ # pass
142+ # finally:
143+ # resp.raise_for_status()
141144
142145 # Remove trailing period from PSC DNS name.
143- psc_dns = resp_dict . get ( "pscDnsName" )
146+ psc_dns = resp . psc_dns_name
144147 if psc_dns :
145148 psc_dns = psc_dns .rstrip ("." )
146149
147150 return {
148- "PRIVATE" : resp_dict . get ( "ipAddress" ) ,
149- "PUBLIC" : resp_dict . get ( "publicIpAddress" ) ,
151+ "PRIVATE" : resp . ip_address ,
152+ "PUBLIC" : resp . public_ip_address ,
150153 "PSC" : psc_dns ,
151154 }
152155
@@ -175,34 +178,32 @@ async def _get_client_certificate(
175178 tuple[str, list[str]]: tuple containing the CA certificate
176179 and certificate chain for the AlloyDB instance.
177180 """
178- headers = {
179- "Authorization" : f"Bearer { self ._credentials .token } " ,
180- }
181-
182- url = f"{ self ._alloydb_api_endpoint } /{ API_VERSION } /projects/{ project } /locations/{ region } /clusters/{ cluster } :generateClientCertificate"
183-
184- data = {
185- "publicKey" : pub_key ,
186- "certDuration" : "3600s" ,
187- "useMetadataExchange" : self ._use_metadata ,
188- }
189-
190- resp = await self ._client .post (url , headers = headers , json = data )
191- # try to get response json for better error message
192- try :
193- resp_dict = await resp .json ()
194- if resp .status >= 400 :
195- # if detailed error message is in json response, use as error message
196- message = resp_dict .get ("error" , {}).get ("message" )
197- if message :
198- resp .reason = message
199- # skip, raise_for_status will catch all errors in finally block
200- except Exception :
201- pass
202- finally :
203- resp .raise_for_status ()
204-
205- return (resp_dict ["caCert" ], resp_dict ["pemCertificateChain" ])
181+ parent = f"{ self ._alloydb_api_endpoint } /{ API_VERSION } /projects/{ project } /locations/{ region } /clusters/{ cluster } "
182+ dur = duration_pb2 .Duration ()
183+ dur .seconds = 3600
184+ req = alloydb_v1beta .GenerateClientCertificateRequest (
185+ parent = parent ,
186+ cert_duration = dur ,
187+ public_key = pub_key ,
188+ use_metadata_exchange = self ._use_metadata ,
189+ )
190+ resp = await self ._client .generate_client_certificate (request = req )
191+ resp = await resp
192+ # # try to get response json for better error message
193+ # try:
194+ # resp_dict = await resp.json()
195+ # if resp.status >= 400:
196+ # # if detailed error message is in json response, use as error message
197+ # message = resp_dict.get("error", {}).get("message")
198+ # if message:
199+ # resp.reason = message
200+ # # skip, raise_for_status will catch all errors in finally block
201+ # except Exception:
202+ # pass
203+ # finally:
204+ # resp.raise_for_status()
205+
206+ return (resp .ca_cert , resp .pem_certificate_chain )
206207
207208 async def get_connection_info (
208209 self ,
@@ -271,5 +272,4 @@ async def get_connection_info(
271272 async def close (self ) -> None :
272273 """Close AlloyDBClient gracefully."""
273274 logger .debug ("Waiting for connector's http client to close" )
274- await self ._client .close ()
275275 logger .debug ("Closed connector's http client" )
0 commit comments