Skip to content

Commit f8391aa

Browse files
author
Uziel Silva
committed
feat: replace ssl_object with sslcontext and build a socket manually
1 parent b071556 commit f8391aa

File tree

2 files changed

+51
-85
lines changed

2 files changed

+51
-85
lines changed

google/cloud/sql/connector/connector.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from functools import partial
2121
import logging
2222
import os
23+
import socket
2324
from threading import Thread
2425
from types import TracebackType
2526
from typing import Any, Callable, Optional, Union
@@ -514,7 +515,11 @@ async def connect_async(
514515
instance_connection_string, asyncio.Protocol, **kwargs
515516
)
516517
# See https://docs.python.org/3/library/asyncio-protocol.html#asyncio.BaseTransport.get_extra_info
517-
sock = tx.get_extra_info("ssl_object")
518+
ctx = tx.get_extra_info("sslcontext")
519+
sock = ctx.wrap_socket(
520+
socket.create_connection((ip_address, SERVER_PROXY_PORT)),
521+
server_hostname=ip_address,
522+
)
518523
connect_partial = partial(connector, ip_address, sock, **kwargs)
519524
return await self._loop.run_in_executor(None, connect_partial)
520525

tests/system/test_psycopg_connection.py

Lines changed: 45 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414
limitations under the License.
1515
"""
1616

17+
import asyncio
1718
from datetime import datetime
1819
import os
1920

2021
# [START cloud_sql_connector_postgres_psycopg]
2122
from typing import Union
2223

2324
from psycopg import Connection
25+
import pytest
26+
import logging
2427
import sqlalchemy
2528

2629
from google.cloud.sql.connector import Connector
@@ -29,98 +32,56 @@
2932

3033
SERVER_PROXY_PORT = 3307
3134

32-
async def create_sqlalchemy_engine(
33-
instance_connection_name: str,
34-
user: str,
35-
password: str,
36-
db: str,
37-
ip_type: str = "public",
38-
refresh_strategy: str = "background",
39-
resolver: Union[type[DefaultResolver], type[DnsResolver]] = DefaultResolver,
40-
) -> tuple[sqlalchemy.engine.Engine, Connector]:
41-
"""Creates a connection pool for a Cloud SQL instance and returns the pool
42-
and the connector. Callers are responsible for closing the pool and the
43-
connector.
44-
45-
A sample invocation looks like:
46-
47-
engine, connector = create_sqlalchemy_engine(
48-
inst_conn_name,
49-
user,
50-
password,
51-
db,
52-
)
53-
with engine.connect() as conn:
54-
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
55-
conn.commit()
56-
curr_time = time[0]
57-
# do something with query result
58-
connector.close()
59-
60-
Args:
61-
instance_connection_name (str):
62-
The instance connection name specifies the instance relative to the
63-
project and region. For example: "my-project:my-region:my-instance"
64-
user (str):
65-
The database user name, e.g., root
66-
password (str):
67-
The database user's password, e.g., secret-password
68-
db (str):
69-
The name of the database, e.g., mydb
70-
ip_type (str):
71-
The IP type of the Cloud SQL instance to connect to. Can be one
72-
of "public", "private", or "psc".
73-
refresh_strategy (Optional[str]):
74-
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
75-
or "background". For serverless environments use "lazy" to avoid
76-
errors resulting from CPU being throttled.
77-
resolver (Optional[google.cloud.sql.connector.DefaultResolver]):
78-
Resolver class for resolving instance connection name. Use
79-
google.cloud.sql.connector.DnsResolver when resolving DNS domain
80-
names or google.cloud.sql.connector.DefaultResolver for regular
81-
instance connection names ("my-project:my-region:my-instance").
82-
"""
83-
connector = Connector(refresh_strategy=refresh_strategy, resolver=resolver)
84-
unix_socket_folder = "/tmp/conn"
85-
unix_socket_path = f"{unix_socket_folder}/.s.PGSQL.3307"
86-
await connector.start_unix_socket_proxy_async(
87-
instance_connection_name,
88-
unix_socket_path,
89-
ip_type=ip_type, # can be "public", "private" or "psc"
90-
)
91-
92-
# create SQLAlchemy connection pool
93-
engine = sqlalchemy.create_engine(
94-
"postgresql+psycopg://",
95-
creator=lambda: Connection.connect(
96-
f"host={unix_socket_folder} port={SERVER_PROXY_PORT} dbname={db} user={user} password={password} sslmode=require",
97-
user=user,
98-
password=password,
99-
dbname=db,
100-
autocommit=True,
101-
)
102-
)
103-
104-
return engine, connector
105-
35+
logger = logging.getLogger(name=__name__)
10636

10737
# [END cloud_sql_connector_postgres_psycopg]
10838

10939

40+
@pytest.mark.asyncio
11041
async def test_psycopg_connection() -> None:
11142
"""Basic test to get time from database."""
112-
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
43+
instance_connection_name = os.environ["POSTGRES_CONNECTION_NAME"]
11344
user = os.environ["POSTGRES_USER"]
11445
password = os.environ["POSTGRES_PASS"]
11546
db = os.environ["POSTGRES_DB"]
11647
ip_type = os.environ.get("IP_TYPE", "public")
11748

118-
engine, connector = await create_sqlalchemy_engine(
119-
inst_conn_name, user, password, db, ip_type
120-
)
121-
with engine.connect() as conn:
122-
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
123-
conn.commit()
124-
curr_time = time[0]
125-
assert type(curr_time) is datetime
126-
connector.close()
49+
unix_socket_folder = "/tmp/conn"
50+
unix_socket_path = f"{unix_socket_folder}/.s.PGSQL.3307"
51+
52+
async with Connector(
53+
refresh_strategy='lazy', resolver=DefaultResolver
54+
) as connector:
55+
# Open proxy connection
56+
# start the proxy server
57+
58+
await connector.start_unix_socket_proxy_async(
59+
instance_connection_name,
60+
unix_socket_path,
61+
driver="psycopg",
62+
user=user,
63+
password=password,
64+
db=db,
65+
ip_type=ip_type, # can be "public", "private" or "psc"
66+
)
67+
68+
# Wait for server to start
69+
await asyncio.sleep(0.5)
70+
71+
engine = sqlalchemy.create_engine(
72+
"postgresql+psycopg://",
73+
creator=lambda: Connection.connect(
74+
f"host={unix_socket_folder} port={SERVER_PROXY_PORT} dbname={db} user={user} password={password} sslmode=require",
75+
user=user,
76+
password=password,
77+
dbname=db,
78+
autocommit=True,
79+
)
80+
)
81+
82+
with engine.connect() as conn:
83+
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
84+
conn.commit()
85+
curr_time = time[0]
86+
assert type(curr_time) is datetime
87+
connector.close()

0 commit comments

Comments
 (0)