Skip to content

Conversation

@UzielSilva
Copy link

@UzielSilva UzielSilva commented Jul 16, 2025

See #214
Changelog:

  • Add proxy for connections that can only be made through an unix socket, to support the TLS connection
  • Add support for psycopg, using the proxy server
  • Add unit and integration tests
  • Update docs

@UzielSilva UzielSilva requested a review from a team as a code owner July 16, 2025 23:42
@google-cla
Copy link

google-cla bot commented Jul 16, 2025

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>
@UzielSilva UzielSilva force-pushed the issues/214-psycopg-support branch 4 times, most recently from 9da84d8 to 0ba0fb7 Compare July 17, 2025 00:11
@UzielSilva UzielSilva changed the title Issue #214: Add support for psycopg feat(main): Add support for psycopg (Issue #214) Jul 17, 2025
@hessjcg
Copy link
Collaborator

hessjcg commented Jul 17, 2025

Thank you for the contribution! We are excited to add this to the code. I will give this careful review and get back in touch soon with some additional comments.

Copy link
Collaborator

@hessjcg hessjcg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work so far. This is going to be a very popular enhancement.

You may want to look at a similar local unix proxy implementation in the node connector.

There are a few things I'd like you to add to this PR. This will get it closer to production-worthy code:

  1. The socket path should be a configuration parameter local_socket_path on connector.connect() instead of being hard-coded. This will allow an application to open many connectors to different database instances.
  2. When the listener opens the unix socket, it should set the file permissions to be readable only by the user.
  3. Handle exceptions from creating and binding the unix socket, log an appropriate error, and raise a LocalProxyStartupError. Define this new exception in google/cloud/sql/connector/exceptions.py
  4. Consider using asyncio instead of a thread to copy data to and from the proxy.
  5. When the connector is closed, the local_communication loop must be stopped and the socket file removed.

Again, I'm grateful that you put in the time and wrote this feature. Thank you.

@enocom
Copy link
Member

enocom commented Jul 17, 2025

cc @rhatgadkar-goog who owns the AlloyDB Python Connector and will be interested to see this.

Big +1 on what @hessjcg. This will be a huge win for people wanting to use psycopg and Cloud SQL (or AlloyDB).

@UzielSilva
Copy link
Author

All your comments were addressed on my last commit, I even improve the integration test to use SQLAlchemy like other tests.
I only have this issue when I ran the integration test:

Task was destroyed but it is pending!
task: <Task pending name='Task-12' coro=<local_communication() done, defined at /home/uziel/Enterprises/TSA/cloud-sql-python-connector/google/cloud/sql/connector/proxy.py:72> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_read_done(20, handle=<Handle BaseS...0>, 10485760)>)(), Task.task_wakeup()]>>

Not sure how to handle or if it is expected, this is caused because the local_communication is still expecting your input when you close the connection. Not sure if there is a more friendly way to stop it, but oh well.
Let me know if this is good enough for production.

Uziel Silva added 2 commits July 18, 2025 13:06
Changelog:
- Add proxy for connections that can only be made through an unix socket, to support the TLS connection
- Add support for psycopg, using the proxy server
- Add unit and integration tests
- Update docs
Changelog:
- Make local_socket_path configurable
- Set right file permissions
- Handle exceptions properly
- Use asyncio and the main loop to stop the local proxy and clear the file when the connector is stopped
@UzielSilva
Copy link
Author

FYI: The last issue I mentioned is only a warning, so, actually, it is not affecting the feature.

@rhatgadkar-goog
Copy link

rhatgadkar-goog commented Jul 23, 2025

All your comments were addressed on my last commit, I even improve the integration test to use SQLAlchemy like other tests. I only have this issue when I ran the integration test:

Task was destroyed but it is pending!
task: <Task pending name='Task-12' coro=<local_communication() done, defined at /home/uziel/Enterprises/TSA/cloud-sql-python-connector/google/cloud/sql/connector/proxy.py:72> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_read_done(20, handle=<Handle BaseS...0>, 10485760)>)(), Task.task_wakeup()]>>

Not sure how to handle or if it is expected, this is caused because the local_communication is still expecting your input when you close the connection. Not sure if there is a more friendly way to stop it, but oh well. Let me know if this is good enough for production.

Thank you for this PR of adding support for psycopg! We really appreciate this change.

I took a look at this StackOverflow post to help understand what might be happening here: https://stackoverflow.com/questions/71542947/how-can-i-fix-task-was-destroyed-but-it-is-pending.

I think you're getting the error, because when the connector closes, you're not cancelling the local proxy server task and waiting for it to shutdown. So maybe you can try making the following changes:

  • Modify start_local_proxy() function to return the created asyncio task and save this as a member variable in the Connector object.
  • And then in the Connector class's close_async() function, you can cancel the proxy server's asyncio task and gracefully wait for it to cancel. RefreshAheadCache's close() function might be a good example for how to do this.

Changelog:
- Return the asyncio task from `start_local_proxy`
- Handle it in `close_async` to cancel it gracefully
@UzielSilva
Copy link
Author

@rhatgadkar-goog It worked!, now the warning is gone and the server is being stopped gracefully. Thanks!

So, I think the PR is ready for a review, let me know if you want me to do some other modifications. :)

@kgala2
Copy link
Collaborator

kgala2 commented Jul 24, 2025

/gcbrun

Uziel Silva added 2 commits July 24, 2025 20:35
Changelog:
- Fix linting issues
- Define `self.proxy` on the constructor
- Prevent issues with undefined variables
@UzielSilva
Copy link
Author

Also fixed a unit test, test coverage is restored to 100%

@kgala2
Copy link
Collaborator

kgala2 commented Jul 28, 2025

/gcbrun

@UzielSilva
Copy link
Author

Trying to add tests for the proxy server, to increase code coverage.

Changelog:
- Add unit tests for proxy
- Add test case to connector for drivers that require the local proxy
- Make proper adjustments to code
@UzielSilva
Copy link
Author

Tests were added, code coverage is now at 94%

@kgala2
Copy link
Collaborator

kgala2 commented Jul 29, 2025

/gcbrun

@UzielSilva
Copy link
Author

Huh, do I need to also keep support for python 3.9? let me check then the fixes I need to do for it. :)

@UzielSilva
Copy link
Author

Added support for Python 3.9, only needed to add some exceptions to handle when I close the server

@kgala2
Copy link
Collaborator

kgala2 commented Jul 29, 2025

/gcbrun

@UzielSilva
Copy link
Author

Still not able to merge it, it says changes requested

@enocom
Copy link
Member

enocom commented Jul 29, 2025

Only maintainers can merge.

@kgala2 will have to merge.

Great work on this @UzielSilva 👏.

@kgala2
Copy link
Collaborator

kgala2 commented Jul 30, 2025

Hi @UzielSilva,

The overall PR looks good, I am checking for corner cases that might not be covered, I'll comment on this PR if I find anything and have psyopg support available as soon as possible.
cc: @hessjcg

Thanks

@sahibpandori
Copy link

Thanks for working on this @UzielSilva, very excited for this change! Hope this can be merged soon @kgala2 @hessjcg

@UzielSilva
Copy link
Author

Wondering if I should specify the binary flavor for the psycopg optional dependency in the pyproject.toml. I'm using that one for the tests that are working well... not sure how it is going to behave with the purely python based installation.

Ref: https://www.psycopg.org/psycopg3/docs/basic/install.html

CC: @kgala2

Copy link
Collaborator

@hessjcg hessjcg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work so far @UzielSilva. This will fix both #219 and #214

In reviewing the implementation, this is a good prototype, but it needs to handle sockets in a different way in order to be production-worthy. As implemented today, it can only handle a single connection. Socket Programming in Python, particularly the section on "handling multiple connections" explains how the code should handle the unix server socket.

Also, I think we should rename this feature to "Local Unix Socket" instead of "Psychopg support". We are implementing this in a way that is not database driver specific, and will work on any system that supports unix sockets.

First, lets make a new class in proxy.py called Proxy. This will encapsulate the details for the proxy server.

Create a new method on the Connector: Connector.start_unix_socket_proxy_async, which will create a new Proxy object. The Connector should allow Connector.start_unix_socket_proxy_async() to be called multiple times, opening multiple unix sockets as long as the local_socket_path parameter does not conflict with an existing socket.

When the Proxy object is created, it will start an "accept loop" async task which will open the unix server socket and listen for new connections. The Proxy should keep track of these async tasks so that they can be canceled when Connector.close() is called.

When the "accept loop" task accepts a new connection, a new client socket is opened. Then the accept loop task will create a new async task to handle the client socket. This "client socket" task will do two things:

  • call connector.connect() to establish a new socket to the server proxy
  • create async tasks to copy data between the unix socket and the database instance socket

The client socket task will also need to handle the following error cases:

  • When the connector.connect() fails, it should log an error and close the unix socket
  • When the database socket is suddenly closed, we should close the unix socket
  • When the unix client socket is suddenly closed, we should close the database socket

Cleanup: When connector.close() is called, the connector

  • cancel the "accept loop" task
  • close the unix socket server and delete the socket from the filesystem.
  • briefly wait for all in-progress async tasks handling connections to close
  • close all open sockets.

I will added additional comments in-line with the code.

Example of how to use Connector with psychopg and SqlAlchemy

from google.cloud.sql.connector import Connector
async def create_sqlalchemy_engine(
    instance_connection_name: str,
    user: str,
    password: str,
    db: str,
    ip_type: str = "public",
    refresh_strategy: str = "background",
    resolver: Union[type[DefaultResolver], type[DnsResolver]] = DefaultResolver,
) -> tuple[sqlalchemy.engine.Engine, Connector]:
    unix_socket_path="/tmp/conn"

    connector = Connector(refresh_strategy=refresh_strategy, resolver=resolver)
    await connector.start_unix_socket_proxy_async(
        instance_connection_name,
        user=user,
        password=password,
        db=db,
        local_socket_path=unix_socket_path,
        ip_type=ip_type,  # can be "public", "private" or "psc"
        autocommit=True,
    )

    # Construct the connection string
    # Note: The 'host' parameter points to the directory, not the socket file.
    connection_string = f"postgresql+psycopg2://{user}:{password}@{unix_socket_path}/{db}"
    
    # Create the SQLAlchemy engine
    engine = create_engine(connection_string)

    return engine, connector

Thank you again for your hard work on this. I look forward to seeing your next revision.
-Jonathan

while True:
data = await loop.sock_recv(client, LOCAL_PROXY_MAX_MESSAGE_SIZE)
if not data:
client.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the code for sending/receiving data based on best practices described in Socket Programming in Python

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference suggests to create our own event loop. It is probably better to just use asyncio in a way to allow the proxy to receive multiple collections (working on it), but I won't be following 100% what is said in the ref since we are already using asyncio in other parts of our codebase.

@hessjcg
Copy link
Collaborator

hessjcg commented Sep 5, 2025

Hi @UzielSilva, I was able to spend some time on the tricky IO code to set up a local proxy server. This is still a work in progress, but I wanted to share it anyway. Please check out this branch:

https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/tree/feat-local-proxy

Currently there are still some failing tests and edge cases that are not properly handled. Please feel free to continue my work.

@UzielSilva
Copy link
Author

Hi there, sorry for the lack of updates, took some mini vacations.
@hessjcg great stuff!!, I'm moving your proxy server into my branch. This is very helpful. :)

@UzielSilva
Copy link
Author

I'm having issues trying to fix one integration test, the psycopg one actually.

The issue is about a task that got Future attached to a different loop. The thing is that if I inject the running loop to the connector the issue appears to vanish but the connection now gives us a timeout error. All unit and integration tests are now passing, except for that one, and that is the only issue that is stopping me from finishing this PR. Can I get some help?

@UzielSilva
Copy link
Author

NVM, I solved it.

@hessjcg I think I'm ready for a new review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants