From ace035dea0834f5ca015b0885c230cb262df96d3 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 29 Apr 2025 19:45:53 +0000 Subject: [PATCH 01/15] add jdbc yaml test --- sdks/python/apache_beam/yaml/tests/jdbc.yaml | 55 ++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/jdbc.yaml diff --git a/sdks/python/apache_beam/yaml/tests/jdbc.yaml b/sdks/python/apache_beam/yaml/tests/jdbc.yaml new file mode 100644 index 000000000000..fffdf96f463d --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/jdbc.yaml @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_sqlite_database" + +pipelines: + # Jdbc write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToJdbc + config: + url: "{TEMP_DB}" + driver_class_name: "org.sqlite.JDBC" + query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)" + + # Jdbc read pipeline + - pipeline: + type: chain + transforms: + - type: ReadFromJdbc + config: + url: "{TEMP_DB}" + driver_class_name: "org.sqlite.JDBC" + query: "SELECT * FROM tmp_table" + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + From 804c38d9b769fb08d1445490f62d528743c311f8 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 29 Apr 2025 19:46:11 +0000 Subject: [PATCH 02/15] add mysql yaml test --- sdks/python/apache_beam/yaml/tests/mysql.yaml | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/mysql.yaml diff --git a/sdks/python/apache_beam/yaml/tests/mysql.yaml b/sdks/python/apache_beam/yaml/tests/mysql.yaml new file mode 100644 index 000000000000..19c6774b2252 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/mysql.yaml @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_mysql_database" + +pipelines: + # MySql write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToMySql + config: + url: "{TEMP_DB}" + query: "INSERT INTO tmp_table (value, `rank`) VALUES(?,?)" + + # MySql read pipeline + - pipeline: + type: chain + transforms: + - type: ReadFromMySql + config: + url: "{TEMP_DB}" + query: "SELECT * FROM tmp_table" + driver_class_name: "com.mysql.cj.jdbc.Driver" + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + \ No newline at end of file From 25f548b008e9b80774ea4dbc9fa7bcd08e5d5c12 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 29 Apr 2025 19:46:24 +0000 Subject: [PATCH 03/15] add postgres yaml test --- .../apache_beam/yaml/tests/postgres.yaml | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/postgres.yaml diff --git a/sdks/python/apache_beam/yaml/tests/postgres.yaml b/sdks/python/apache_beam/yaml/tests/postgres.yaml new file mode 100644 index 000000000000..9ecc2f481677 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/postgres.yaml @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_postgres_database" + +pipelines: + # Postgres write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToPostgres + config: + url: "{TEMP_DB}" + query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)" + + # Postgres read pipeline + - pipeline: + type: chain + transforms: + - type: ReadFromPostgres + config: + url: "{TEMP_DB}" + query: "SELECT * FROM tmp_table" + driver_class_name: "org.postgresql.Driver" + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + \ No newline at end of file From 019259f1afa02aa5a40ca35edaa7f5e0a44434b4 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 29 Apr 2025 19:46:38 +0000 Subject: [PATCH 04/15] add sqlserver yaml test --- .../apache_beam/yaml/tests/sqlserver.yaml | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/sqlserver.yaml diff --git a/sdks/python/apache_beam/yaml/tests/sqlserver.yaml b/sdks/python/apache_beam/yaml/tests/sqlserver.yaml new file mode 100644 index 000000000000..9e5ba2ab2ec2 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/sqlserver.yaml @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_sqlserver_database" + +pipelines: + # SqlServer write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToSqlServer + config: + url: "{TEMP_DB}" + query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)" + + # SqlServer read pipeline + - pipeline: + type: chain + transforms: + - type: ReadFromSqlServer + config: + url: "{TEMP_DB}" + query: "SELECT * FROM tmp_table" + driver_class_name: "com.microsoft.sqlserver.jdbc.SQLServerDriver" + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + \ No newline at end of file From dcc2d802a4b79aa607f9e9d40f9b3a20e57b19e7 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 29 Apr 2025 19:46:58 +0000 Subject: [PATCH 05/15] add windowinto yaml test --- .../apache_beam/yaml/tests/windowinto.yaml | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/tests/windowinto.yaml diff --git a/sdks/python/apache_beam/yaml/tests/windowinto.yaml b/sdks/python/apache_beam/yaml/tests/windowinto.yaml new file mode 100644 index 000000000000..d35d291d4f45 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/windowinto.yaml @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + - pipeline: + type: composite + transforms: + # setup + - type: Create + config: + elements: + - {k: "x", t: 1} + - {k: "x", t: 8} + - {k: "x", t: 11} + - {k: "y", t: 101} + - type: AssignTimestamps + input: Create + config: + timestamp: t + + # global windowing + - type: chain + name: Global + input: AssignTimestamps + transforms: + - type: WindowInto + config: + windowing: + type: global + - type: Combine + config: + group_by: 'k' + combine: + t: sum + - type: AssertEqual + config: + elements: + - {k: "x", t: 20} + - {k: "y", t: 101} + + # fixed windowing + - type: chain + name: Fixed + input: AssignTimestamps + transforms: + + - type: WindowInto + config: + windowing: + type: fixed + size: 10s + offset: 5s + - type: Combine + config: + group_by: 'k' + combine: + t: sum + - type: AssertEqual + config: + elements: + - {k: "x", t: 1} + - {k: "x", t: 19} + - {k: "y", t: 101} + + # sliding windowing + - type: chain + name: Sliding + input: AssignTimestamps + transforms: + - type: WindowInto + config: + windowing: + type: sliding + size: 20s + period: 10s + - type: Combine + config: + group_by: 'k' + combine: + t: sum + - type: AssertEqual + config: + elements: + - {k: "x", t: 9} # [-10, 10) + - {k: "x", t: 20} # [ 0, 20) + - {k: "x", t: 11} # [ 10, 30) + - {k: "y", t: 101} # [90, 110) + - {k: "y", t: 101} # [100, 120) + + # session windowing + - type: chain + name: Sessions + input: AssignTimestamps + transforms: + - type: WindowInto + config: + windowing: + type: sessions + gap: 5s + - type: Combine + config: + group_by: 'k' + combine: + t: sum + - type: AssertEqual + config: + elements: + - {k: "x", t: 1} + - {k: "x", t: 19} + - {k: "y", t: 101} + From aa6f00c56695299d6ad14ae149073faebd365c99 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 29 Apr 2025 19:47:36 +0000 Subject: [PATCH 06/15] add additional db connectors for mssql and mysql --- sdks/python/setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 2b21d0463c98..1215d25cd1ef 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -422,6 +422,8 @@ def get_portability_package_data(): 'cryptography>=41.0.2', 'hypothesis>5.0.0,<7.0.0', 'virtualenv-clone>=0.5,<1.0', + 'mysql-connector-python>=9.3.0', + 'pymssql>=2.3.4' ], 'gcp': [ 'cachetools>=3.1.0,<6', From 64f7f3120a7dd3b13f3a468fe6352e0c3ed20db8 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 29 Apr 2025 19:48:07 +0000 Subject: [PATCH 07/15] add support for sqlite --- sdks/java/extensions/schemaio-expansion-service/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/extensions/schemaio-expansion-service/build.gradle b/sdks/java/extensions/schemaio-expansion-service/build.gradle index b534af38cd42..14dd0f1570f5 100644 --- a/sdks/java/extensions/schemaio-expansion-service/build.gradle +++ b/sdks/java/extensions/schemaio-expansion-service/build.gradle @@ -58,6 +58,7 @@ dependencies { permitUnusedDeclared 'com.google.cloud:alloydb-jdbc-connector:1.2.0' testImplementation library.java.junit testImplementation library.java.mockito_core + runtimeOnly ("org.xerial:sqlite-jdbc:3.49.1.0") } task runExpansionService (type: JavaExec) { From 09f240f02a86e837a1454d88fbb8b04f15041f51 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 29 Apr 2025 20:00:58 +0000 Subject: [PATCH 08/15] fix lint issues --- .../apache_beam/yaml/integration_tests.py | 316 ++++++++++++++++++ 1 file changed, 316 insertions(+) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 818fc9b4c4ce..c94fbd805123 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -27,6 +27,10 @@ import uuid import mock +import mysql.connector +import psycopg2 +import pymssql +import sqlite3 import yaml import apache_beam as beam @@ -38,10 +42,26 @@ from apache_beam.utils import python_callable from apache_beam.yaml import yaml_provider from apache_beam.yaml import yaml_transform +from testcontainers.mssql import SqlServerContainer +from testcontainers.mysql import MySqlContainer +from testcontainers.postgres import PostgresContainer @contextlib.contextmanager def gcs_temp_dir(bucket): + """Context manager to create and clean up a temporary GCS directory. + + Creates a unique temporary directory within the specified GCS bucket + and yields the path. Upon exiting the context, the directory and its + contents are deleted. + + Args: + bucket (str): The GCS bucket name (e.g., 'gs://my-bucket'). + + Yields: + str: The full path to the created temporary GCS directory. + Example: 'gs://my-bucket/yaml-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' + """ gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4()) yield gcs_tempdir filesystems.FileSystems.delete([gcs_tempdir]) @@ -49,6 +69,25 @@ def gcs_temp_dir(bucket): @contextlib.contextmanager def temp_spanner_table(project, prefix='temp_spanner_db_'): + """Context manager to create and clean up a temporary Spanner database and + table. + + Creates a unique temporary Spanner database within the specified project + and a predefined table named 'tmp_table' with columns ['UserId', 'Key']. + It yields connection details for the created resources. Upon exiting the + context, the temporary database (and its table) is deleted. + + Args: + project (str): The Google Cloud project ID. + prefix (str): A prefix to use for the temporary database name. + Defaults to 'temp_spanner_db_'. + + Yields: + list[str]: A list containing connection details: + [project_id, instance_id, database_id, table_name, list_of_columns]. + Example: ['my-project', 'beam-test', 'temp_spanner_db_...', 'tmp_table', + ['UserId', 'Key']] + """ spanner_client = SpannerWrapper(project) spanner_client._create_database() instance = "beam-test" @@ -65,6 +104,26 @@ def temp_spanner_table(project, prefix='temp_spanner_db_'): @contextlib.contextmanager def temp_bigquery_table(project, prefix='yaml_bq_it_'): + """Context manager to create and clean up a temporary BigQuery dataset. + + Creates a unique temporary BigQuery dataset within the specified project. + It yields a placeholder table name string within that dataset (e.g., + 'project.dataset_id.tmp_table'). The actual table is expected to be + created by the test using this context. + + Upon exiting the context, the temporary dataset and all its contents + (including any tables created within it) are deleted. + + Args: + project (str): The Google Cloud project ID. + prefix (str): A prefix to use for the temporary dataset name. + Defaults to 'yaml_bq_it_'. + + Yields: + str: The full path for a temporary BigQuery table within the created + dataset. + Example: 'my-project.yaml_bq_it_a1b2c3d4e5f6...tmp_table' + """ bigquery_client = BigQueryWrapper() dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex) bigquery_client.get_or_create_dataset(project, dataset_id) @@ -76,6 +135,263 @@ def temp_bigquery_table(project, prefix='yaml_bq_it_'): bigquery_client.client.datasets.Delete(request) +@contextlib.contextmanager +def temp_sqlite_database(prefix='yaml_jdbc_it_'): + """Context manager to provide a temporary SQLite database via JDBC for + testing. + + This function creates a temporary SQLite database file on the local + filesystem. It establishes a connection using 'sqlite3', creates a predefined + 'tmp_table', and then yields a JDBC connection string suitable for use in + tests that require a generic JDBC connection (specifically configured for + SQLite in this case). + + The SQLite database file is automatically cleaned up (closed and deleted) + when the context manager exits. + + Args: + prefix (str): A prefix to use for the temporary database file name. + + Yields: + str: A JDBC connection string for the temporary SQLite database. + Example format: "jdbc:sqlite:" + + Raises: + sqlite3.Error: If there's an error connecting to or interacting with + the SQLite database during setup. + Exception: Any other exception encountered during the setup or cleanup + process. + """ + conn = cursor = None + try: + # Establish connection to the temp file + db_name = f'{prefix}{uuid.uuid4().hex}.db' + conn = sqlite3.connect(db_name) + cursor = conn.cursor() + + # Create a temp table for tests + cursor.execute( + ''' + CREATE TABLE tmp_table ( + value INTEGER PRIMARY KEY, + rank INTEGER + ) + ''') + conn.commit() + yield f'jdbc:sqlite:{db_name}' + except (sqlite3.Error, Exception) as err: + logging.error("Error interacting with temporary SQLite DB: %s", err) + raise err + finally: + # Close connections + if cursor: + cursor.close() + if conn: + conn.close() + try: + if os.path.exists(db_name): + os.remove(db_name) + except Exception as err: + logging.error("Error deleting temporary SQLite DB: %s", err) + raise err + + +@contextlib.contextmanager +def temp_mysql_database(): + """Context manager to provide a temporary MySQL database for testing. + + This function utilizes the 'testcontainers' library to spin up a + MySQL instance within a Docker container. It then connects + to this temporary database using 'mysql.connector', creates a predefined + 'tmp_table', and yields a JDBC connection string suitable for use in tests. + + The Docker container and the database instance are automatically managed + and torn down when the context manager exits. + + Yields: + str: A JDBC connection string for the temporary MySQL database. + Example format: + "jdbc:mysql://:/? + user=&password=" + + Raises: + mysql.connector.Error: If there's an error connecting to or interacting + with the MySQL database during setup. + Exception: Any other exception encountered during the setup process. + """ + with MySqlContainer() as mysql_container: + conn = cursor = None + try: + # Retrieve connection details from the running container + host = mysql_container.get_container_host_ip() + port = mysql_container.get_exposed_port(mysql_container.port_to_expose) + user = mysql_container.MYSQL_USER + password = mysql_container.MYSQL_PASSWORD + db_name = mysql_container.MYSQL_DATABASE + + # Make connection to temp database + conn = mysql.connector.connect( + host=host, port=port, user=user, password=password, database=db_name) + cursor = conn.cursor() + + # Create temp table for tests + cursor.execute("CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);") + conn.commit() + + # Construct the JDBC url for connections later on by tests + jdbc_url = f"jdbc:mysql://{host}:{port}/{db_name}? + \ + user={user}&password={password}" + + yield jdbc_url + except mysql.connector.Error as err: + logging.error("Error interacting with temporary MySQL DB: %s", err) + raise err + finally: + # Close connections + if cursor: + cursor.close() + if conn: + conn.close() + + +@contextlib.contextmanager +def temp_postgres_database(): + """Context manager to provide a temporary PostgreSQL database for testing. + + This function utilizes the 'testcontainers' library to spin up a + PostgreSQL instance within a Docker container. It then connects + to this temporary database using 'psycopg2', creates a predefined 'tmp_table', + and yields a JDBC connection string suitable for use in tests. + + The Docker container and the database instance are automatically managed + and torn down when the context manager exits. + + Yields: + str: A JDBC connection string for the temporary PostgreSQL database. + Example format: + "jdbc:postgresql://:/? + user=&password=" + + Raises: + psycopg2.Error: If there's an error connecting to or interacting with + the PostgreSQL database during setup. + Exception: Any other exception encountered during the setup process. + """ + # default port + port = 5432 + + # Start the postgress container using testcontainers + with PostgresContainer(port=port) as postgres_container: + conn = cursor = None + try: + # Retrieve connection details from the running container + host = postgres_container.get_container_host_ip() + port = postgres_container.get_exposed_port(port) + user = postgres_container.POSTGRES_USER + password = postgres_container.POSTGRES_PASSWORD + db_name = postgres_container.POSTGRES_DB + + # Make connection to temp database + conn = psycopg2.connect( + host=host, port=port, user=user, password=password, database=db_name) + cursor = conn.cursor() + + # Create a temp table for tests + cursor.execute("CREATE TABLE tmp_table (value INTEGER, rank INTEGER);") + conn.commit() + + # Construct the JDBC url for connections later on by tests + jdbc_url = f"jdbc:postgresql://{host}:{port}/{db_name}? + \ + user={user}&password={password}" + + yield jdbc_url + except (psycopg2.Error, Exception) as err: + logging.error("Error interacting with temporary Postgres DB: %s", err) + raise err + finally: + # Close connections + if cursor: + cursor.close() + if conn: + conn.close() + + +@contextlib.contextmanager +def temp_sqlserver_database(): + """Context manager to provide a temporary SQL Server database for testing. + + This function utilizes the 'testcontainers' library to spin up a + Microsoft SQL Server instance within a Docker container. It then connects + to this temporary database using 'pymssql', creates a predefined 'tmp_table', + and yields a JDBC connection string suitable for use in tests. + + The Docker container and the database instance are automatically managed + and torn down when the context manager exits. + + Yields: + str: A JDBC connection string for the temporary SQL Server database. + Example format: + "jdbc:sqlserver://:; + databaseName=; + user=; + password=; + encrypt=false; + trustServerCertificate=true" + + Raises: + pymssql.Error: If there's an error connecting to or interacting with + the SQL Server database during setup. + Exception: Any other exception encountered during the setup process. + """ + # Default port + port = 1433 + + # Start the sql server using testcontainers + with SqlServerContainer(port=port) as sqlserver_container: + conn = cursor = None + try: + # Retrieve connection details from the running container + host = sqlserver_container.get_container_host_ip() + port = sqlserver_container.get_exposed_port(port) + user = sqlserver_container.SQLSERVER_USER + password = sqlserver_container.SQLSERVER_PASSWORD + db_name = sqlserver_container.SQLSERVER_DBNAME + + # Make connection to temp database + conn = pymssql.connect( + server=host, + port=port, + user=user, + password=password, + database=db_name) + cursor = conn.cursor() + + # Create a temp table for tests + cursor.execute("CREATE TABLE tmp_table (value INTEGER, rank INTEGER);") + conn.commit() + + # Construct the JDBC url for connections later on by tests + # NOTE: encrypt=false and trustServerCertificate=true is generally + # needed for test container connections without proper certificates setup + jdbc_url = f"jdbc:sqlserver://{host}:{port}; + \ + databaseName={db_name}; + \ + user={user}; + \ + password={password}; + \ + encrypt=false; + \ + trustServerCertificate=true" + + yield jdbc_url + except (pymssql.Error, Exception) as err: + logging.error("Error interacting with temporary SQL Server DB: %s", err) + raise err + finally: + # Close connections + if cursor: + cursor.close() + if conn: + conn.close() + + def replace_recursive(spec, vars): if isinstance(spec, dict): return { From 0fdb2aed161721ac6c21b6df2312b0e6b12439e1 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 30 Apr 2025 14:05:16 +0000 Subject: [PATCH 09/15] change to pytds connector for sqlserver --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 1215d25cd1ef..db8864624301 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -423,7 +423,7 @@ def get_portability_package_data(): 'hypothesis>5.0.0,<7.0.0', 'virtualenv-clone>=0.5,<1.0', 'mysql-connector-python>=9.3.0', - 'pymssql>=2.3.4' + 'python-tds>=1.16.1' ], 'gcp': [ 'cachetools>=3.1.0,<6', From bfa843d1658bb734a010cd4ac1540f7dd383cd0f Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 30 Apr 2025 14:43:46 +0000 Subject: [PATCH 10/15] fix lint issues --- .../apache_beam/yaml/integration_tests.py | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index c94fbd805123..71dbbfe280b4 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -23,15 +23,18 @@ import itertools import logging import os +import sqlite3 import unittest import uuid import mock import mysql.connector import psycopg2 -import pymssql -import sqlite3 +import pytds import yaml +from testcontainers.mssql import SqlServerContainer +from testcontainers.mysql import MySqlContainer +from testcontainers.postgres import PostgresContainer import apache_beam as beam from apache_beam.io import filesystems @@ -42,9 +45,6 @@ from apache_beam.utils import python_callable from apache_beam.yaml import yaml_provider from apache_beam.yaml import yaml_transform -from testcontainers.mssql import SqlServerContainer -from testcontainers.mysql import MySqlContainer -from testcontainers.postgres import PostgresContainer @contextlib.contextmanager @@ -239,8 +239,9 @@ def temp_mysql_database(): conn.commit() # Construct the JDBC url for connections later on by tests - jdbc_url = f"jdbc:mysql://{host}:{port}/{db_name}? + \ - user={user}&password={password}" + jdbc_url = ( + f"jdbc:mysql://{host}:{port}/{db_name}?" + f"user={user}&password={password}") yield jdbc_url except mysql.connector.Error as err: @@ -277,16 +278,15 @@ def temp_postgres_database(): the PostgreSQL database during setup. Exception: Any other exception encountered during the setup process. """ - # default port - port = 5432 + default_port = 5432 # Start the postgress container using testcontainers - with PostgresContainer(port=port) as postgres_container: + with PostgresContainer(port=default_port) as postgres_container: conn = cursor = None try: # Retrieve connection details from the running container host = postgres_container.get_container_host_ip() - port = postgres_container.get_exposed_port(port) + port = postgres_container.get_exposed_port(default_port) user = postgres_container.POSTGRES_USER password = postgres_container.POSTGRES_PASSWORD db_name = postgres_container.POSTGRES_DB @@ -301,8 +301,9 @@ def temp_postgres_database(): conn.commit() # Construct the JDBC url for connections later on by tests - jdbc_url = f"jdbc:postgresql://{host}:{port}/{db_name}? + \ - user={user}&password={password}" + jdbc_url = ( + f"jdbc:postgresql://{host}:{port}/{db_name}?" + f"user={user}&password={password}") yield jdbc_url except (psycopg2.Error, Exception) as err: @@ -343,22 +344,21 @@ def temp_sqlserver_database(): the SQL Server database during setup. Exception: Any other exception encountered during the setup process. """ - # Default port - port = 1433 + default_port = 1433 # Start the sql server using testcontainers - with SqlServerContainer(port=port) as sqlserver_container: + with SqlServerContainer(port=default_port) as sqlserver_container: conn = cursor = None try: # Retrieve connection details from the running container host = sqlserver_container.get_container_host_ip() - port = sqlserver_container.get_exposed_port(port) + port = int(sqlserver_container.get_exposed_port(default_port)) user = sqlserver_container.SQLSERVER_USER password = sqlserver_container.SQLSERVER_PASSWORD db_name = sqlserver_container.SQLSERVER_DBNAME # Make connection to temp database - conn = pymssql.connect( + conn = pytds.connect( server=host, port=port, user=user, @@ -373,15 +373,16 @@ def temp_sqlserver_database(): # Construct the JDBC url for connections later on by tests # NOTE: encrypt=false and trustServerCertificate=true is generally # needed for test container connections without proper certificates setup - jdbc_url = f"jdbc:sqlserver://{host}:{port}; + \ - databaseName={db_name}; + \ - user={user}; + \ - password={password}; + \ - encrypt=false; + \ - trustServerCertificate=true" + jdbc_url = ( + f"jdbc:sqlserver://{host}:{port};" + f"databaseName={db_name};" + f"user={user};" + f"password={password};" + f"encrypt=true;" + f"trustServerCertificate=true") yield jdbc_url - except (pymssql.Error, Exception) as err: + except (pytds.Error, Exception) as err: logging.error("Error interacting with temporary SQL Server DB: %s", err) raise err finally: From 8c98d7c9ea72f9f99eb6cb668ae0ecd292e2800f Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 30 Apr 2025 17:21:35 +0000 Subject: [PATCH 11/15] add sqlchemy package --- sdks/python/setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index db8864624301..74df467f2b17 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -423,7 +423,8 @@ def get_portability_package_data(): 'hypothesis>5.0.0,<7.0.0', 'virtualenv-clone>=0.5,<1.0', 'mysql-connector-python>=9.3.0', - 'python-tds>=1.16.1' + 'python-tds>=1.16.1', + 'sqlalchemy-pytds>=1.0.2' ], 'gcp': [ 'cachetools>=3.1.0,<6', From 633349c2809b4710ffad2836574236ec7ce2bbec Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 30 Apr 2025 17:22:47 +0000 Subject: [PATCH 12/15] fix yapf issues --- sdks/python/apache_beam/yaml/integration_tests.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 71dbbfe280b4..52cfc5e501a1 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -323,7 +323,7 @@ def temp_sqlserver_database(): This function utilizes the 'testcontainers' library to spin up a Microsoft SQL Server instance within a Docker container. It then connects - to this temporary database using 'pymssql', creates a predefined 'tmp_table', + to this temporary database using 'pytds', creates a predefined 'tmp_table', and yields a JDBC connection string suitable for use in tests. The Docker container and the database instance are automatically managed @@ -340,14 +340,15 @@ def temp_sqlserver_database(): trustServerCertificate=true" Raises: - pymssql.Error: If there's an error connecting to or interacting with + pytds.Error: If there's an error connecting to or interacting with the SQL Server database during setup. Exception: Any other exception encountered during the setup process. """ default_port = 1433 # Start the sql server using testcontainers - with SqlServerContainer(port=default_port) as sqlserver_container: + with SqlServerContainer(port=default_port, + dialect='mssql+pytds') as sqlserver_container: conn = cursor = None try: # Retrieve connection details from the running container From f62b9c98079c947802ba74f60f9dc4475f89f609 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 14 May 2025 16:44:01 +0000 Subject: [PATCH 13/15] update DBs connection to sqlalchemy --- .../apache_beam/yaml/integration_tests.py | 48 ++++++++----------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 52cfc5e501a1..b6afc19df9f0 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -23,6 +23,7 @@ import itertools import logging import os +import sqlalchemy import sqlite3 import unittest import uuid @@ -229,14 +230,12 @@ def temp_mysql_database(): password = mysql_container.MYSQL_PASSWORD db_name = mysql_container.MYSQL_DATABASE - # Make connection to temp database - conn = mysql.connector.connect( - host=host, port=port, user=user, password=password, database=db_name) - cursor = conn.cursor() - - # Create temp table for tests - cursor.execute("CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);") - conn.commit() + # Make connection to temp database and create tmp table + engine = sqlalchemy.create_engine(mysql_container.get_connection_url()) + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + "CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);")) # Construct the JDBC url for connections later on by tests jdbc_url = ( @@ -291,14 +290,12 @@ def temp_postgres_database(): password = postgres_container.POSTGRES_PASSWORD db_name = postgres_container.POSTGRES_DB - # Make connection to temp database - conn = psycopg2.connect( - host=host, port=port, user=user, password=password, database=db_name) - cursor = conn.cursor() - - # Create a temp table for tests - cursor.execute("CREATE TABLE tmp_table (value INTEGER, rank INTEGER);") - conn.commit() + # Make connection to temp database and create tmp table + engine = sqlalchemy.create_engine(postgres_container.get_connection_url()) + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + "CREATE TABLE tmp_table (value INTEGER, rank INTEGER);")) # Construct the JDBC url for connections later on by tests jdbc_url = ( @@ -358,18 +355,13 @@ def temp_sqlserver_database(): password = sqlserver_container.SQLSERVER_PASSWORD db_name = sqlserver_container.SQLSERVER_DBNAME - # Make connection to temp database - conn = pytds.connect( - server=host, - port=port, - user=user, - password=password, - database=db_name) - cursor = conn.cursor() - - # Create a temp table for tests - cursor.execute("CREATE TABLE tmp_table (value INTEGER, rank INTEGER);") - conn.commit() + # Make connection to temp database and create tmp table + engine = sqlalchemy.create_engine( + sqlserver_container.get_connection_url()) + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + "CREATE TABLE tmp_table (value INTEGER, rank INTEGER);")) # Construct the JDBC url for connections later on by tests # NOTE: encrypt=false and trustServerCertificate=true is generally From ee8f0c284d7f591ccdfee905bf32156606b28805 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 14 May 2025 17:03:42 +0000 Subject: [PATCH 14/15] yapf changes --- .../apache_beam/yaml/integration_tests.py | 65 +++++-------------- 1 file changed, 15 insertions(+), 50 deletions(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index b6afc19df9f0..dc5920ba676c 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -221,15 +221,7 @@ def temp_mysql_database(): Exception: Any other exception encountered during the setup process. """ with MySqlContainer() as mysql_container: - conn = cursor = None try: - # Retrieve connection details from the running container - host = mysql_container.get_container_host_ip() - port = mysql_container.get_exposed_port(mysql_container.port_to_expose) - user = mysql_container.MYSQL_USER - password = mysql_container.MYSQL_PASSWORD - db_name = mysql_container.MYSQL_DATABASE - # Make connection to temp database and create tmp table engine = sqlalchemy.create_engine(mysql_container.get_connection_url()) with engine.begin() as connection: @@ -239,19 +231,16 @@ def temp_mysql_database(): # Construct the JDBC url for connections later on by tests jdbc_url = ( - f"jdbc:mysql://{host}:{port}/{db_name}?" - f"user={user}&password={password}") + f"jdbc:mysql://{mysql_container.get_container_host_ip()}:" + f"{mysql_container.get_exposed_port(mysql_container.port_to_expose)}/" + f"{mysql_container.MYSQL_DATABASE}?" + f"user={mysql_container.MYSQL_USER}&" + f"password={mysql_container.MYSQL_PASSWORD}") yield jdbc_url except mysql.connector.Error as err: logging.error("Error interacting with temporary MySQL DB: %s", err) raise err - finally: - # Close connections - if cursor: - cursor.close() - if conn: - conn.close() @contextlib.contextmanager @@ -281,15 +270,7 @@ def temp_postgres_database(): # Start the postgress container using testcontainers with PostgresContainer(port=default_port) as postgres_container: - conn = cursor = None try: - # Retrieve connection details from the running container - host = postgres_container.get_container_host_ip() - port = postgres_container.get_exposed_port(default_port) - user = postgres_container.POSTGRES_USER - password = postgres_container.POSTGRES_PASSWORD - db_name = postgres_container.POSTGRES_DB - # Make connection to temp database and create tmp table engine = sqlalchemy.create_engine(postgres_container.get_connection_url()) with engine.begin() as connection: @@ -299,19 +280,16 @@ def temp_postgres_database(): # Construct the JDBC url for connections later on by tests jdbc_url = ( - f"jdbc:postgresql://{host}:{port}/{db_name}?" - f"user={user}&password={password}") + f"jdbc:postgresql://{postgres_container.get_container_host_ip()}:" + f"{postgres_container.get_exposed_port(default_port)}/" + f"{postgres_container.POSTGRES_DB}?" + f"user={postgres_container.POSTGRES_USER}&" + f"password={postgres_container.POSTGRES_PASSWORD}") yield jdbc_url except (psycopg2.Error, Exception) as err: logging.error("Error interacting with temporary Postgres DB: %s", err) raise err - finally: - # Close connections - if cursor: - cursor.close() - if conn: - conn.close() @contextlib.contextmanager @@ -346,15 +324,7 @@ def temp_sqlserver_database(): # Start the sql server using testcontainers with SqlServerContainer(port=default_port, dialect='mssql+pytds') as sqlserver_container: - conn = cursor = None try: - # Retrieve connection details from the running container - host = sqlserver_container.get_container_host_ip() - port = int(sqlserver_container.get_exposed_port(default_port)) - user = sqlserver_container.SQLSERVER_USER - password = sqlserver_container.SQLSERVER_PASSWORD - db_name = sqlserver_container.SQLSERVER_DBNAME - # Make connection to temp database and create tmp table engine = sqlalchemy.create_engine( sqlserver_container.get_connection_url()) @@ -367,10 +337,11 @@ def temp_sqlserver_database(): # NOTE: encrypt=false and trustServerCertificate=true is generally # needed for test container connections without proper certificates setup jdbc_url = ( - f"jdbc:sqlserver://{host}:{port};" - f"databaseName={db_name};" - f"user={user};" - f"password={password};" + f"jdbc:sqlserver://{sqlserver_container.get_container_host_ip()}:" + f"{int(sqlserver_container.get_exposed_port(default_port))};" + f"databaseName={sqlserver_container.SQLSERVER_DBNAME};" + f"user={sqlserver_container.SQLSERVER_USER};" + f"password={sqlserver_container.SQLSERVER_PASSWORD};" f"encrypt=true;" f"trustServerCertificate=true") @@ -378,12 +349,6 @@ def temp_sqlserver_database(): except (pytds.Error, Exception) as err: logging.error("Error interacting with temporary SQL Server DB: %s", err) raise err - finally: - # Close connections - if cursor: - cursor.close() - if conn: - conn.close() def replace_recursive(spec, vars): From 5a78cdbcf01f8a1d6c712517f149470b6741c23a Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Wed, 14 May 2025 17:51:39 +0000 Subject: [PATCH 15/15] fix pylint issue --- sdks/python/apache_beam/yaml/integration_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index dc5920ba676c..d005bfe0f797 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -23,7 +23,6 @@ import itertools import logging import os -import sqlalchemy import sqlite3 import unittest import uuid @@ -32,6 +31,7 @@ import mysql.connector import psycopg2 import pytds +import sqlalchemy import yaml from testcontainers.mssql import SqlServerContainer from testcontainers.mysql import MySqlContainer