diff --git a/tests/system/test_asyncpg_connection.py b/tests/system/test_asyncpg_connection.py index e64bbc90c..d7602e537 100644 --- a/tests/system/test_asyncpg_connection.py +++ b/tests/system/test_asyncpg_connection.py @@ -32,6 +32,7 @@ async def create_sqlalchemy_engine( user: str, password: str, db: str, + ip_type: str = "public", refresh_strategy: str = "background", resolver: Union[type[DefaultResolver], type[DnsResolver]] = DefaultResolver, ) -> tuple[sqlalchemy.ext.asyncio.engine.AsyncEngine, Connector]: @@ -63,6 +64,9 @@ async def create_sqlalchemy_engine( The database user's password, e.g., secret-password db (str): The name of the database, e.g., mydb + ip_type (str): + The IP type of the Cloud SQL instance to connect to. Can be one + of "public", "private", or "psc". refresh_strategy (Optional[str]): Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" or "background". For serverless environments use "lazy" to avoid @@ -87,7 +91,7 @@ async def create_sqlalchemy_engine( user=user, password=password, db=db, - ip_type="public", # can also be "private" or "psc" + ip_type=ip_type, # can be "public", "private" or "psc" ), execution_options={"isolation_level": "AUTOCOMMIT"}, ) @@ -99,6 +103,7 @@ async def create_asyncpg_pool( user: str, password: str, db: str, + ip_type: str = "public", refresh_strategy: str = "background", ) -> tuple[asyncpg.Pool, Connector]: """Creates a native asyncpg connection pool for a Cloud SQL instance and @@ -128,6 +133,9 @@ async def create_asyncpg_pool( The database user's password, e.g., secret-password db (str): The name of the database, e.g., mydb + ip_type (str): + The IP type of the Cloud SQL instance to connect to. Can be one + of "public", "private", or "psc". refresh_strategy (Optional[str]): Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" or "background". For serverless environments use "lazy" to avoid @@ -145,7 +153,7 @@ async def getconn( user=user, password=password, db=db, - ip_type="public", # can also be "private" or "psc", + ip_type=ip_type, # can be "public", "private" or "psc" **kwargs, ) return conn @@ -161,8 +169,11 @@ async def test_sqlalchemy_connection_with_asyncpg() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - pool, connector = await create_sqlalchemy_engine(inst_conn_name, user, password, db) + pool, connector = await create_sqlalchemy_engine( + inst_conn_name, user, password, db, ip_type + ) async with pool.connect() as conn: res = (await conn.execute(sqlalchemy.text("SELECT 1"))).fetchone() @@ -177,9 +188,10 @@ async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") pool, connector = await create_sqlalchemy_engine( - inst_conn_name, user, password, db, "lazy" + inst_conn_name, user, password, db, ip_type, "lazy" ) async with pool.connect() as conn: @@ -195,9 +207,10 @@ async def test_custom_SAN_with_dns_sqlalchemy_connection_with_asyncpg() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_CUSTOMER_CAS_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") pool, connector = await create_sqlalchemy_engine( - inst_conn_name, user, password, db, resolver=DnsResolver + inst_conn_name, user, password, db, ip_type, resolver=DnsResolver ) async with pool.connect() as conn: @@ -213,8 +226,11 @@ async def test_connection_with_asyncpg() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - pool, connector = await create_asyncpg_pool(inst_conn_name, user, password, db) + pool, connector = await create_asyncpg_pool( + inst_conn_name, user, password, db, ip_type + ) async with pool.acquire() as conn: res = await conn.fetch("SELECT 1") @@ -229,9 +245,10 @@ async def test_lazy_connection_with_asyncpg() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") pool, connector = await create_asyncpg_pool( - inst_conn_name, user, password, db, "lazy" + inst_conn_name, user, password, db, ip_type, "lazy" ) async with pool.acquire() as conn: diff --git a/tests/system/test_asyncpg_iam_auth.py b/tests/system/test_asyncpg_iam_auth.py index ddf6b5e63..0e0c01e83 100644 --- a/tests/system/test_asyncpg_iam_auth.py +++ b/tests/system/test_asyncpg_iam_auth.py @@ -27,6 +27,7 @@ async def create_sqlalchemy_engine( instance_connection_name: str, user: str, db: str, + ip_type: str = "public", refresh_strategy: str = "background", ) -> tuple[sqlalchemy.ext.asyncio.engine.AsyncEngine, Connector]: """Creates a connection pool for a Cloud SQL instance and returns the pool @@ -55,6 +56,9 @@ async def create_sqlalchemy_engine( e.g., my-email@test.com, service-account@project-id.iam db (str): The name of the database, e.g., mydb + ip_type (str): + The IP type of the Cloud SQL instance to connect to. Can be one + of "public", "private", or "psc". refresh_strategy (Optional[str]): Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" or "background". For serverless environments use "lazy" to avoid @@ -71,7 +75,7 @@ async def create_sqlalchemy_engine( "asyncpg", user=user, db=db, - ip_type="public", # can also be "private" or "psc" + ip_type=ip_type, # can be "public", "private" or "psc" enable_iam_auth=True, ), execution_options={"isolation_level": "AUTOCOMMIT"}, @@ -84,8 +88,9 @@ async def test_iam_authn_connection_with_asyncpg() -> None: inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] user = os.environ["POSTGRES_IAM_USER"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - pool, connector = await create_sqlalchemy_engine(inst_conn_name, user, db) + pool, connector = await create_sqlalchemy_engine(inst_conn_name, user, db, ip_type) async with pool.connect() as conn: res = (await conn.execute(sqlalchemy.text("SELECT 1"))).fetchone() @@ -99,8 +104,11 @@ async def test_lazy_iam_authn_connection_with_asyncpg() -> None: inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] user = os.environ["POSTGRES_IAM_USER"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - pool, connector = await create_sqlalchemy_engine(inst_conn_name, user, db, "lazy") + pool, connector = await create_sqlalchemy_engine( + inst_conn_name, user, db, ip_type, "lazy" + ) async with pool.connect() as conn: res = (await conn.execute(sqlalchemy.text("SELECT 1"))).fetchone() diff --git a/tests/system/test_connector_object.py b/tests/system/test_connector_object.py index 258b80aaf..66fe9c25c 100644 --- a/tests/system/test_connector_object.py +++ b/tests/system/test_connector_object.py @@ -39,6 +39,7 @@ def getconn() -> pymysql.connections.Connection: user=os.environ["MYSQL_USER"], password=os.environ["MYSQL_PASS"], db=os.environ["MYSQL_DB"], + ip_type=os.environ.get("IP_TYPE", "public"), ) return conn diff --git a/tests/system/test_pg8000_connection.py b/tests/system/test_pg8000_connection.py index f5d161cc8..42a8d9894 100644 --- a/tests/system/test_pg8000_connection.py +++ b/tests/system/test_pg8000_connection.py @@ -32,6 +32,7 @@ def create_sqlalchemy_engine( user: str, password: str, db: str, + ip_type: str = "public", refresh_strategy: str = "background", resolver: Union[type[DefaultResolver], type[DnsResolver]] = DefaultResolver, ) -> tuple[sqlalchemy.engine.Engine, Connector]: @@ -64,6 +65,9 @@ def create_sqlalchemy_engine( The database user's password, e.g., secret-password db (str): The name of the database, e.g., mydb + ip_type (str): + The IP type of the Cloud SQL instance to connect to. Can be one + of "public", "private", or "psc". refresh_strategy (Optional[str]): Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" or "background". For serverless environments use "lazy" to avoid @@ -85,7 +89,7 @@ def create_sqlalchemy_engine( user=user, password=password, db=db, - ip_type="public", # can also be "private" or "psc" + ip_type=ip_type, # can be "public", "private" or "psc" ), ) return engine, connector @@ -100,8 +104,11 @@ def test_pg8000_connection() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - engine, connector = create_sqlalchemy_engine(inst_conn_name, user, password, db) + engine, connector = create_sqlalchemy_engine( + inst_conn_name, user, password, db, ip_type + ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() conn.commit() @@ -116,9 +123,10 @@ def test_lazy_pg8000_connection() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") engine, connector = create_sqlalchemy_engine( - inst_conn_name, user, password, db, "lazy" + inst_conn_name, user, password, db, ip_type, "lazy" ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() @@ -134,8 +142,11 @@ def test_CAS_pg8000_connection() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_CAS_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - engine, connector = create_sqlalchemy_engine(inst_conn_name, user, password, db) + engine, connector = create_sqlalchemy_engine( + inst_conn_name, user, password, db, ip_type + ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() conn.commit() @@ -150,8 +161,11 @@ def test_customer_managed_CAS_pg8000_connection() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_CUSTOMER_CAS_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - engine, connector = create_sqlalchemy_engine(inst_conn_name, user, password, db) + engine, connector = create_sqlalchemy_engine( + inst_conn_name, user, password, db, ip_type + ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() conn.commit() @@ -166,9 +180,10 @@ def test_custom_SAN_with_dns_pg8000_connection() -> None: user = os.environ["POSTGRES_USER"] password = os.environ["POSTGRES_CUSTOMER_CAS_PASS"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") engine, connector = create_sqlalchemy_engine( - inst_conn_name, user, password, db, resolver=DnsResolver + inst_conn_name, user, password, db, ip_type, resolver=DnsResolver ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() diff --git a/tests/system/test_pg8000_iam_auth.py b/tests/system/test_pg8000_iam_auth.py index 902d9eb90..38ee76612 100644 --- a/tests/system/test_pg8000_iam_auth.py +++ b/tests/system/test_pg8000_iam_auth.py @@ -26,6 +26,7 @@ def create_sqlalchemy_engine( instance_connection_name: str, user: str, db: str, + ip_type: str = "public", refresh_strategy: str = "background", ) -> tuple[sqlalchemy.engine.Engine, Connector]: """Creates a connection pool for a Cloud SQL instance and returns the pool @@ -55,6 +56,9 @@ def create_sqlalchemy_engine( e.g., my-email@test.com, service-account@project-id.iam db (str): The name of the database, e.g., mydb + ip_type (str): + The IP type of the Cloud SQL instance to connect to. Can be one + of "public", "private", or "psc". refresh_strategy (Optional[str]): Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" or "background". For serverless environments use "lazy" to avoid @@ -70,7 +74,7 @@ def create_sqlalchemy_engine( "pg8000", user=user, db=db, - ip_type="public", # can also be "private" or "psc" + ip_type=ip_type, # can be "public", "private" or "psc" enable_iam_auth=True, ), ) @@ -82,8 +86,9 @@ def test_pg8000_iam_authn_connection() -> None: inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] user = os.environ["POSTGRES_IAM_USER"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - engine, connector = create_sqlalchemy_engine(inst_conn_name, user, db) + engine, connector = create_sqlalchemy_engine(inst_conn_name, user, db, ip_type) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() conn.commit() @@ -97,8 +102,11 @@ def test_lazy_pg8000_iam_authn_connection() -> None: inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"] user = os.environ["POSTGRES_IAM_USER"] db = os.environ["POSTGRES_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - engine, connector = create_sqlalchemy_engine(inst_conn_name, user, db, "lazy") + engine, connector = create_sqlalchemy_engine( + inst_conn_name, user, db, ip_type, "lazy" + ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() conn.commit() diff --git a/tests/system/test_pymysql_connection.py b/tests/system/test_pymysql_connection.py index 7d7edadc8..2748a8d77 100644 --- a/tests/system/test_pymysql_connection.py +++ b/tests/system/test_pymysql_connection.py @@ -28,6 +28,7 @@ def create_sqlalchemy_engine( user: str, password: str, db: str, + ip_type: str = "public", refresh_strategy: str = "background", ) -> tuple[sqlalchemy.engine.Engine, Connector]: """Creates a connection pool for a Cloud SQL instance and returns the pool @@ -59,6 +60,9 @@ def create_sqlalchemy_engine( The database user's password, e.g., secret-password db (str): The name of the database, e.g., mydb + ip_type (str): + The IP type of the Cloud SQL instance to connect to. Can be one + of "public", "private", or "psc". refresh_strategy (Optional[str]): Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" or "background". For serverless environments use "lazy" to avoid @@ -75,7 +79,7 @@ def create_sqlalchemy_engine( user=user, password=password, db=db, - ip_type="public", # can also be "private" or "psc" + ip_type=ip_type, # can be "public", "private" or "psc" ), ) return engine, connector @@ -90,8 +94,11 @@ def test_pymysql_connection() -> None: user = os.environ["MYSQL_USER"] password = os.environ["MYSQL_PASS"] db = os.environ["MYSQL_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - engine, connector = create_sqlalchemy_engine(inst_conn_name, user, password, db) + engine, connector = create_sqlalchemy_engine( + inst_conn_name, user, password, db, ip_type + ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() conn.commit() @@ -106,9 +113,10 @@ def test_lazy_pymysql_connection() -> None: user = os.environ["MYSQL_USER"] password = os.environ["MYSQL_PASS"] db = os.environ["MYSQL_DB"] + ip_type = os.environ.get("IP_TYPE", "public") engine, connector = create_sqlalchemy_engine( - inst_conn_name, user, password, db, "lazy" + inst_conn_name, user, password, db, ip_type, "lazy" ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() diff --git a/tests/system/test_pymysql_iam_auth.py b/tests/system/test_pymysql_iam_auth.py index 56e26d2b5..39af5c717 100644 --- a/tests/system/test_pymysql_iam_auth.py +++ b/tests/system/test_pymysql_iam_auth.py @@ -26,6 +26,7 @@ def create_sqlalchemy_engine( instance_connection_name: str, user: str, db: str, + ip_type: str = "public", refresh_strategy: str = "background", ) -> tuple[sqlalchemy.engine.Engine, Connector]: """Creates a connection pool for a Cloud SQL instance and returns the pool @@ -55,6 +56,9 @@ def create_sqlalchemy_engine( e.g., my-email@test.com -> my-email db (str): The name of the database, e.g., mydb + ip_type (str): + The IP type of the Cloud SQL instance to connect to. Can be one + of "public", "private", or "psc". refresh_strategy (Optional[str]): Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" or "background". For serverless environments use "lazy" to avoid @@ -70,7 +74,7 @@ def create_sqlalchemy_engine( "pymysql", user=user, db=db, - ip_type="public", # can also be "private" or "psc" + ip_type=ip_type, # can be "public", "private" or "psc" enable_iam_auth=True, ), ) @@ -82,8 +86,9 @@ def test_pymysql_iam_authn_connection() -> None: inst_conn_name = os.environ["MYSQL_CONNECTION_NAME"] user = os.environ["MYSQL_IAM_USER"] db = os.environ["MYSQL_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - engine, connector = create_sqlalchemy_engine(inst_conn_name, user, db) + engine, connector = create_sqlalchemy_engine(inst_conn_name, user, db, ip_type) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() conn.commit() @@ -97,8 +102,11 @@ def test_lazy_pymysql_iam_authn_connection() -> None: inst_conn_name = os.environ["MYSQL_CONNECTION_NAME"] user = os.environ["MYSQL_IAM_USER"] db = os.environ["MYSQL_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - engine, connector = create_sqlalchemy_engine(inst_conn_name, user, db, "lazy") + engine, connector = create_sqlalchemy_engine( + inst_conn_name, user, db, ip_type, "lazy" + ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() conn.commit() diff --git a/tests/system/test_pytds_connection.py b/tests/system/test_pytds_connection.py index a75b3da4e..198c3307f 100644 --- a/tests/system/test_pytds_connection.py +++ b/tests/system/test_pytds_connection.py @@ -27,6 +27,7 @@ def create_sqlalchemy_engine( user: str, password: str, db: str, + ip_type: str = "public", refresh_strategy: str = "background", ) -> tuple[sqlalchemy.engine.Engine, Connector]: """Creates a connection pool for a Cloud SQL instance and returns the pool @@ -57,6 +58,9 @@ def create_sqlalchemy_engine( The database user's password, e.g., secret-password db (str): The name of the database, e.g., mydb + ip_type (str): + The IP type of the Cloud SQL instance to connect to. Can be one + of "public", "private", or "psc". refresh_strategy (Optional[str]): Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" or "background". For serverless environments use "lazy" to avoid @@ -73,7 +77,7 @@ def create_sqlalchemy_engine( user=user, password=password, db=db, - ip_type="public", # can also be "private" or "psc" + ip_type=ip_type, # can be "public", "private" or "psc" ), ) return engine, connector @@ -88,8 +92,11 @@ def test_pytds_connection() -> None: user = os.environ["SQLSERVER_USER"] password = os.environ["SQLSERVER_PASS"] db = os.environ["SQLSERVER_DB"] + ip_type = os.environ.get("IP_TYPE", "public") - engine, connector = create_sqlalchemy_engine(inst_conn_name, user, password, db) + engine, connector = create_sqlalchemy_engine( + inst_conn_name, user, password, db, ip_type + ) with engine.connect() as conn: res = conn.execute(sqlalchemy.text("SELECT 1")).fetchone() conn.commit() @@ -103,9 +110,10 @@ def test_lazy_pytds_connection() -> None: user = os.environ["SQLSERVER_USER"] password = os.environ["SQLSERVER_PASS"] db = os.environ["SQLSERVER_DB"] + ip_type = os.environ.get("IP_TYPE", "public") engine, connector = create_sqlalchemy_engine( - inst_conn_name, user, password, db, "lazy" + inst_conn_name, user, password, db, ip_type, "lazy" ) with engine.connect() as conn: res = conn.execute(sqlalchemy.text("SELECT 1")).fetchone()