Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [1.18.3](https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/compare/v1.18.2...v1.18.3) (2025-07-11)


### Bug Fixes

* suppress lint check for _scopes property ([#1308](https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/issues/1308)) ([821245c](https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/commit/821245c1911fb970e3409b3e249698937a8b7867))

## [1.18.2](https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/compare/v1.18.1...v1.18.2) (2025-05-20)


Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The Cloud SQL Python Connector is a package to be used alongside a database driv
Currently supported drivers are:
- [`pymysql`](https://github.com/PyMySQL/PyMySQL) (MySQL)
- [`pg8000`](https://github.com/tlocke/pg8000) (PostgreSQL)
- [`psycopg`](https://github.com/psycopg/psycopg) (PostgreSQL)
- [`asyncpg`](https://github.com/MagicStack/asyncpg) (PostgreSQL)
- [`pytds`](https://github.com/denisenkom/pytds) (SQL Server)

Expand Down Expand Up @@ -587,7 +588,7 @@ async def main():
# acquire connection and query Cloud SQL database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")

# close Connector
await connector.close_async()
```
Expand Down
34 changes: 29 additions & 5 deletions google/cloud/sql/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@
from google.cloud.sql.connector.lazy import LazyRefreshCache
from google.cloud.sql.connector.monitored_cache import MonitoredCache
import google.cloud.sql.connector.pg8000 as pg8000
import google.cloud.sql.connector.psycopg as psycopg
import google.cloud.sql.connector.pymysql as pymysql
import google.cloud.sql.connector.pytds as pytds
from google.cloud.sql.connector.resolver import DefaultResolver
from google.cloud.sql.connector.resolver import DnsResolver
from google.cloud.sql.connector.utils import format_database_user
from google.cloud.sql.connector.utils import generate_keys
from google.cloud.sql.connector.proxy import start_local_proxy

logger = logging.getLogger(name=__name__)

ASYNC_DRIVERS = ["asyncpg"]
LOCAL_PROXY_DRIVERS = ["psycopg"]
SERVER_PROXY_PORT = 3307
_DEFAULT_SCHEME = "https://"
_DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
Expand Down Expand Up @@ -230,7 +233,7 @@ def connect(
Example: "my-project:us-central1:my-instance"

driver (str): A string representing the database driver to connect
with. Supported drivers are pymysql, pg8000, and pytds.
with. Supported drivers are pymysql, pg8000, psycopg, and pytds.

**kwargs: Any driver-specific arguments to pass to the underlying
driver .connect call.
Expand Down Expand Up @@ -266,7 +269,8 @@ async def connect_async(
Example: "my-project:us-central1:my-instance"

driver (str): A string representing the database driver to connect
with. Supported drivers are pymysql, asyncpg, pg8000, and pytds.
with. Supported drivers are pymysql, asyncpg, pg8000, psycopg, and
pytds.

**kwargs: Any driver-specific arguments to pass to the underlying
driver .connect call.
Expand All @@ -278,7 +282,7 @@ async def connect_async(
ValueError: Connection attempt with built-in database authentication
and then subsequent attempt with IAM database authentication.
KeyError: Unsupported database driver Must be one of pymysql, asyncpg,
pg8000, and pytds.
pg8000, psycopg, and pytds.
"""
if self._keys is None:
self._keys = asyncio.create_task(generate_keys())
Expand Down Expand Up @@ -332,6 +336,7 @@ async def connect_async(
connect_func = {
"pymysql": pymysql.connect,
"pg8000": pg8000.connect,
"psycopg": psycopg.connect,
"asyncpg": asyncpg.connect,
"pytds": pytds.connect,
}
Expand Down Expand Up @@ -380,7 +385,7 @@ async def connect_async(
# async drivers are unblocking and can be awaited directly
if driver in ASYNC_DRIVERS:
return await connector(
ip_address,
host,
await conn_info.create_ssl_context(enable_iam_auth),
**kwargs,
)
Expand All @@ -390,14 +395,26 @@ async def connect_async(
socket.create_connection((ip_address, SERVER_PROXY_PORT)),
server_hostname=ip_address,
)

host = ip_address
# start local proxy if driver needs it
if driver in LOCAL_PROXY_DRIVERS:
local_socket_path = kwargs.pop("local_socket_path", "/tmp/connector-socket")
host = local_socket_path
self._proxy = start_local_proxy(
sock,
socket_path=f"{local_socket_path}/.s.PGSQL.{SERVER_PROXY_PORT}",
loop=self._loop
)

# If this connection was opened using a domain name, then store it
# for later in case we need to forcibly close it on failover.
if conn_info.conn_name.domain_name:
monitored_cache.sockets.append(sock)
# Synchronous drivers are blocking and run using executor
connect_partial = partial(
connector,
ip_address,
host,
sock,
**kwargs,
)
Expand Down Expand Up @@ -469,6 +486,13 @@ async def close_async(self) -> None:
await asyncio.gather(*[cache.close() for cache in self._cache.values()])
if self._client:
await self._client.close()
if self._proxy:
proxy_task = asyncio.gather(self._proxy)
try:
await asyncio.wait_for(proxy_task, timeout=0.1)
except TimeoutError:
pass # This task runs forever so it is expected to throw this exception



async def create_async_connector(
Expand Down
1 change: 1 addition & 0 deletions google/cloud/sql/connector/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class DriverMapping(Enum):

ASYNCPG = "POSTGRES"
PG8000 = "POSTGRES"
PSYCOPG = "POSTGRES"
PYMYSQL = "MYSQL"
PYTDS = "SQLSERVER"

Expand Down
6 changes: 6 additions & 0 deletions google/cloud/sql/connector/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,9 @@ class CacheClosedError(Exception):
Exception to be raised when a ConnectionInfoCache can not be accessed after
it is closed.
"""


class LocalProxyStartupError(Exception):
"""
Exception to be raised when a the local UNIX-socket based proxy can not be started.
"""
93 changes: 93 additions & 0 deletions google/cloud/sql/connector/proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Copyright 2025 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import socket
import os
import ssl
import asyncio
from pathlib import Path
from typing import Optional

from google.cloud.sql.connector.exceptions import LocalProxyStartupError

SERVER_PROXY_PORT = 3307
LOCAL_PROXY_MAX_MESSAGE_SIZE = 10485760

def start_local_proxy(
ssl_sock: ssl.SSLSocket,
socket_path: Optional[str] = "/tmp/connector-socket",
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> asyncio.Task:
"""Helper function to start a UNIX based local proxy for
transport messages through the SSL Socket.

Args:
ssl_sock (ssl.SSLSocket): An SSLSocket object created from the Cloud SQL
server CA cert and ephemeral cert.
socket_path: A system path that is going to be used to store the socket.
loop (asyncio.AbstractEventLoop): Event loop to run asyncio tasks.

Returns:
asyncio.Task: The asyncio task containing the proxy server process.

Raises:
LocalProxyStartupError: Local UNIX socket based proxy was not able to
get started.
"""
unix_socket = None

try:
path_parts = socket_path.rsplit('/', 1)
parent_directory = '/'.join(path_parts[:-1])

desired_path = Path(parent_directory)
desired_path.mkdir(parents=True, exist_ok=True)

if os.path.exists(socket_path):
os.remove(socket_path)
unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

unix_socket.bind(socket_path)
unix_socket.listen(1)
unix_socket.setblocking(False)
os.chmod(socket_path, 0o600)
except Exception:
raise LocalProxyStartupError(
'Local UNIX socket based proxy was not able to get started.'
)

return loop.create_task(local_communication(unix_socket, ssl_sock, socket_path, loop))


async def local_communication(
unix_socket, ssl_sock, socket_path, loop
):
try:
client, _ = await loop.sock_accept(unix_socket)

while True:
data = await loop.sock_recv(client, LOCAL_PROXY_MAX_MESSAGE_SIZE)
if not data:
client.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the code for sending/receiving data based on best practices described in Socket Programming in Python

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference suggests to create our own event loop. It is probably better to just use asyncio in a way to allow the proxy to receive multiple collections (working on it), but I won't be following 100% what is said in the ref since we are already using asyncio in other parts of our codebase.

break
ssl_sock.sendall(data)
response = ssl_sock.recv(LOCAL_PROXY_MAX_MESSAGE_SIZE)
await loop.sock_sendall(client, response)
except Exception:
pass
finally:
client.close()
os.remove(socket_path) # Clean up the socket file
63 changes: 63 additions & 0 deletions google/cloud/sql/connector/psycopg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Copyright 2025 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import ssl
from typing import Any, TYPE_CHECKING
import threading

SERVER_PROXY_PORT = 3307

if TYPE_CHECKING:
import psycopg


def connect(
host: str, sock: ssl.SSLSocket, **kwargs: Any
) -> "psycopg.Connection":
"""Helper function to create a psycopg DB-API connection object.
Args:
host (str): A string containing the socket path used by the local proxy.
sock (ssl.SSLSocket): An SSLSocket object created from the Cloud SQL
server CA cert and ephemeral cert.
kwargs: Additional arguments to pass to the psycopg connect method.
Returns:
psycopg.Connection: A psycopg connection to the Cloud SQL
instance.
Raises:
ImportError: The psycopg module cannot be imported.
"""
try:
from psycopg import Connection
except ImportError:
raise ImportError(
'Unable to import module "psycopg." Please install and try again.'
)

user = kwargs.pop("user")
db = kwargs.pop("db")
passwd = kwargs.pop("password", None)

kwargs.pop("timeout", None)

conn = Connection.connect(
f"host={host} port={SERVER_PROXY_PORT} dbname={db} user={user} password={passwd} sslmode=require",
**kwargs
)

return conn
2 changes: 1 addition & 1 deletion google/cloud/sql/connector/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "1.18.2"
__version__ = "1.18.3"
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Changelog = "https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/b
[project.optional-dependencies]
pymysql = ["PyMySQL>=1.1.0"]
pg8000 = ["pg8000>=1.31.1"]
psycopg = ["psycopg>=3.2.9"]
pytds = ["python-tds>=1.15.0"]
asyncpg = ["asyncpg>=0.30.0"]

Expand Down
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ sqlalchemy-pytds==1.0.2
sqlalchemy-stubs==0.4
PyMySQL==1.1.1
pg8000==1.31.2
psycopg[binary]==3.2.9
asyncpg==0.30.0
python-tds==1.16.1
aioresponses==0.7.8
Expand Down
Loading
Loading