diff --git a/sdks/java/extensions/schemaio-expansion-service/build.gradle b/sdks/java/extensions/schemaio-expansion-service/build.gradle index b534af38cd42..14dd0f1570f5 100644 --- a/sdks/java/extensions/schemaio-expansion-service/build.gradle +++ b/sdks/java/extensions/schemaio-expansion-service/build.gradle @@ -58,6 +58,7 @@ dependencies { permitUnusedDeclared 'com.google.cloud:alloydb-jdbc-connector:1.2.0' testImplementation library.java.junit testImplementation library.java.mockito_core + runtimeOnly ("org.xerial:sqlite-jdbc:3.49.1.0") } task runExpansionService (type: JavaExec) { diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index 818fc9b4c4ce..d005bfe0f797 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -23,11 +23,19 @@ import itertools import logging import os +import sqlite3 import unittest import uuid import mock +import mysql.connector +import psycopg2 +import pytds +import sqlalchemy import yaml +from testcontainers.mssql import SqlServerContainer +from testcontainers.mysql import MySqlContainer +from testcontainers.postgres import PostgresContainer import apache_beam as beam from apache_beam.io import filesystems @@ -42,6 +50,19 @@ @contextlib.contextmanager def gcs_temp_dir(bucket): + """Context manager to create and clean up a temporary GCS directory. + + Creates a unique temporary directory within the specified GCS bucket + and yields the path. Upon exiting the context, the directory and its + contents are deleted. + + Args: + bucket (str): The GCS bucket name (e.g., 'gs://my-bucket'). + + Yields: + str: The full path to the created temporary GCS directory. + Example: 'gs://my-bucket/yaml-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' + """ gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4()) yield gcs_tempdir filesystems.FileSystems.delete([gcs_tempdir]) @@ -49,6 +70,25 @@ def gcs_temp_dir(bucket): @contextlib.contextmanager def temp_spanner_table(project, prefix='temp_spanner_db_'): + """Context manager to create and clean up a temporary Spanner database and + table. + + Creates a unique temporary Spanner database within the specified project + and a predefined table named 'tmp_table' with columns ['UserId', 'Key']. + It yields connection details for the created resources. Upon exiting the + context, the temporary database (and its table) is deleted. + + Args: + project (str): The Google Cloud project ID. + prefix (str): A prefix to use for the temporary database name. + Defaults to 'temp_spanner_db_'. + + Yields: + list[str]: A list containing connection details: + [project_id, instance_id, database_id, table_name, list_of_columns]. + Example: ['my-project', 'beam-test', 'temp_spanner_db_...', 'tmp_table', + ['UserId', 'Key']] + """ spanner_client = SpannerWrapper(project) spanner_client._create_database() instance = "beam-test" @@ -65,6 +105,26 @@ def temp_spanner_table(project, prefix='temp_spanner_db_'): @contextlib.contextmanager def temp_bigquery_table(project, prefix='yaml_bq_it_'): + """Context manager to create and clean up a temporary BigQuery dataset. + + Creates a unique temporary BigQuery dataset within the specified project. + It yields a placeholder table name string within that dataset (e.g., + 'project.dataset_id.tmp_table'). The actual table is expected to be + created by the test using this context. + + Upon exiting the context, the temporary dataset and all its contents + (including any tables created within it) are deleted. + + Args: + project (str): The Google Cloud project ID. + prefix (str): A prefix to use for the temporary dataset name. + Defaults to 'yaml_bq_it_'. + + Yields: + str: The full path for a temporary BigQuery table within the created + dataset. + Example: 'my-project.yaml_bq_it_a1b2c3d4e5f6...tmp_table' + """ bigquery_client = BigQueryWrapper() dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex) bigquery_client.get_or_create_dataset(project, dataset_id) @@ -76,6 +136,221 @@ def temp_bigquery_table(project, prefix='yaml_bq_it_'): bigquery_client.client.datasets.Delete(request) +@contextlib.contextmanager +def temp_sqlite_database(prefix='yaml_jdbc_it_'): + """Context manager to provide a temporary SQLite database via JDBC for + testing. + + This function creates a temporary SQLite database file on the local + filesystem. It establishes a connection using 'sqlite3', creates a predefined + 'tmp_table', and then yields a JDBC connection string suitable for use in + tests that require a generic JDBC connection (specifically configured for + SQLite in this case). + + The SQLite database file is automatically cleaned up (closed and deleted) + when the context manager exits. + + Args: + prefix (str): A prefix to use for the temporary database file name. + + Yields: + str: A JDBC connection string for the temporary SQLite database. + Example format: "jdbc:sqlite:" + + Raises: + sqlite3.Error: If there's an error connecting to or interacting with + the SQLite database during setup. + Exception: Any other exception encountered during the setup or cleanup + process. + """ + conn = cursor = None + try: + # Establish connection to the temp file + db_name = f'{prefix}{uuid.uuid4().hex}.db' + conn = sqlite3.connect(db_name) + cursor = conn.cursor() + + # Create a temp table for tests + cursor.execute( + ''' + CREATE TABLE tmp_table ( + value INTEGER PRIMARY KEY, + rank INTEGER + ) + ''') + conn.commit() + yield f'jdbc:sqlite:{db_name}' + except (sqlite3.Error, Exception) as err: + logging.error("Error interacting with temporary SQLite DB: %s", err) + raise err + finally: + # Close connections + if cursor: + cursor.close() + if conn: + conn.close() + try: + if os.path.exists(db_name): + os.remove(db_name) + except Exception as err: + logging.error("Error deleting temporary SQLite DB: %s", err) + raise err + + +@contextlib.contextmanager +def temp_mysql_database(): + """Context manager to provide a temporary MySQL database for testing. + + This function utilizes the 'testcontainers' library to spin up a + MySQL instance within a Docker container. It then connects + to this temporary database using 'mysql.connector', creates a predefined + 'tmp_table', and yields a JDBC connection string suitable for use in tests. + + The Docker container and the database instance are automatically managed + and torn down when the context manager exits. + + Yields: + str: A JDBC connection string for the temporary MySQL database. + Example format: + "jdbc:mysql://:/? + user=&password=" + + Raises: + mysql.connector.Error: If there's an error connecting to or interacting + with the MySQL database during setup. + Exception: Any other exception encountered during the setup process. + """ + with MySqlContainer() as mysql_container: + try: + # Make connection to temp database and create tmp table + engine = sqlalchemy.create_engine(mysql_container.get_connection_url()) + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + "CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);")) + + # Construct the JDBC url for connections later on by tests + jdbc_url = ( + f"jdbc:mysql://{mysql_container.get_container_host_ip()}:" + f"{mysql_container.get_exposed_port(mysql_container.port_to_expose)}/" + f"{mysql_container.MYSQL_DATABASE}?" + f"user={mysql_container.MYSQL_USER}&" + f"password={mysql_container.MYSQL_PASSWORD}") + + yield jdbc_url + except mysql.connector.Error as err: + logging.error("Error interacting with temporary MySQL DB: %s", err) + raise err + + +@contextlib.contextmanager +def temp_postgres_database(): + """Context manager to provide a temporary PostgreSQL database for testing. + + This function utilizes the 'testcontainers' library to spin up a + PostgreSQL instance within a Docker container. It then connects + to this temporary database using 'psycopg2', creates a predefined 'tmp_table', + and yields a JDBC connection string suitable for use in tests. + + The Docker container and the database instance are automatically managed + and torn down when the context manager exits. + + Yields: + str: A JDBC connection string for the temporary PostgreSQL database. + Example format: + "jdbc:postgresql://:/? + user=&password=" + + Raises: + psycopg2.Error: If there's an error connecting to or interacting with + the PostgreSQL database during setup. + Exception: Any other exception encountered during the setup process. + """ + default_port = 5432 + + # Start the postgress container using testcontainers + with PostgresContainer(port=default_port) as postgres_container: + try: + # Make connection to temp database and create tmp table + engine = sqlalchemy.create_engine(postgres_container.get_connection_url()) + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + "CREATE TABLE tmp_table (value INTEGER, rank INTEGER);")) + + # Construct the JDBC url for connections later on by tests + jdbc_url = ( + f"jdbc:postgresql://{postgres_container.get_container_host_ip()}:" + f"{postgres_container.get_exposed_port(default_port)}/" + f"{postgres_container.POSTGRES_DB}?" + f"user={postgres_container.POSTGRES_USER}&" + f"password={postgres_container.POSTGRES_PASSWORD}") + + yield jdbc_url + except (psycopg2.Error, Exception) as err: + logging.error("Error interacting with temporary Postgres DB: %s", err) + raise err + + +@contextlib.contextmanager +def temp_sqlserver_database(): + """Context manager to provide a temporary SQL Server database for testing. + + This function utilizes the 'testcontainers' library to spin up a + Microsoft SQL Server instance within a Docker container. It then connects + to this temporary database using 'pytds', creates a predefined 'tmp_table', + and yields a JDBC connection string suitable for use in tests. + + The Docker container and the database instance are automatically managed + and torn down when the context manager exits. + + Yields: + str: A JDBC connection string for the temporary SQL Server database. + Example format: + "jdbc:sqlserver://:; + databaseName=; + user=; + password=; + encrypt=false; + trustServerCertificate=true" + + Raises: + pytds.Error: If there's an error connecting to or interacting with + the SQL Server database during setup. + Exception: Any other exception encountered during the setup process. + """ + default_port = 1433 + + # Start the sql server using testcontainers + with SqlServerContainer(port=default_port, + dialect='mssql+pytds') as sqlserver_container: + try: + # Make connection to temp database and create tmp table + engine = sqlalchemy.create_engine( + sqlserver_container.get_connection_url()) + with engine.begin() as connection: + connection.execute( + sqlalchemy.text( + "CREATE TABLE tmp_table (value INTEGER, rank INTEGER);")) + + # Construct the JDBC url for connections later on by tests + # NOTE: encrypt=false and trustServerCertificate=true is generally + # needed for test container connections without proper certificates setup + jdbc_url = ( + f"jdbc:sqlserver://{sqlserver_container.get_container_host_ip()}:" + f"{int(sqlserver_container.get_exposed_port(default_port))};" + f"databaseName={sqlserver_container.SQLSERVER_DBNAME};" + f"user={sqlserver_container.SQLSERVER_USER};" + f"password={sqlserver_container.SQLSERVER_PASSWORD};" + f"encrypt=true;" + f"trustServerCertificate=true") + + yield jdbc_url + except (pytds.Error, Exception) as err: + logging.error("Error interacting with temporary SQL Server DB: %s", err) + raise err + + def replace_recursive(spec, vars): if isinstance(spec, dict): return { diff --git a/sdks/python/apache_beam/yaml/tests/jdbc.yaml b/sdks/python/apache_beam/yaml/tests/jdbc.yaml new file mode 100644 index 000000000000..fffdf96f463d --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/jdbc.yaml @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_sqlite_database" + +pipelines: + # Jdbc write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToJdbc + config: + url: "{TEMP_DB}" + driver_class_name: "org.sqlite.JDBC" + query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)" + + # Jdbc read pipeline + - pipeline: + type: chain + transforms: + - type: ReadFromJdbc + config: + url: "{TEMP_DB}" + driver_class_name: "org.sqlite.JDBC" + query: "SELECT * FROM tmp_table" + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + diff --git a/sdks/python/apache_beam/yaml/tests/mysql.yaml b/sdks/python/apache_beam/yaml/tests/mysql.yaml new file mode 100644 index 000000000000..19c6774b2252 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/mysql.yaml @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_mysql_database" + +pipelines: + # MySql write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToMySql + config: + url: "{TEMP_DB}" + query: "INSERT INTO tmp_table (value, `rank`) VALUES(?,?)" + + # MySql read pipeline + - pipeline: + type: chain + transforms: + - type: ReadFromMySql + config: + url: "{TEMP_DB}" + query: "SELECT * FROM tmp_table" + driver_class_name: "com.mysql.cj.jdbc.Driver" + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/tests/postgres.yaml b/sdks/python/apache_beam/yaml/tests/postgres.yaml new file mode 100644 index 000000000000..9ecc2f481677 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/postgres.yaml @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_postgres_database" + +pipelines: + # Postgres write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToPostgres + config: + url: "{TEMP_DB}" + query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)" + + # Postgres read pipeline + - pipeline: + type: chain + transforms: + - type: ReadFromPostgres + config: + url: "{TEMP_DB}" + query: "SELECT * FROM tmp_table" + driver_class_name: "org.postgresql.Driver" + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/tests/sqlserver.yaml b/sdks/python/apache_beam/yaml/tests/sqlserver.yaml new file mode 100644 index 000000000000..9e5ba2ab2ec2 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/sqlserver.yaml @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: TEMP_DB + type: "apache_beam.yaml.integration_tests.temp_sqlserver_database" + +pipelines: + # SqlServer write pipeline + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + - type: WriteToSqlServer + config: + url: "{TEMP_DB}" + query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)" + + # SqlServer read pipeline + - pipeline: + type: chain + transforms: + - type: ReadFromSqlServer + config: + url: "{TEMP_DB}" + query: "SELECT * FROM tmp_table" + driver_class_name: "com.microsoft.sqlserver.jdbc.SQLServerDriver" + - type: AssertEqual + config: + elements: + - {value: 123, rank: 0} + - {value: 456, rank: 1} + - {value: 789, rank: 2} + \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/tests/windowinto.yaml b/sdks/python/apache_beam/yaml/tests/windowinto.yaml new file mode 100644 index 000000000000..d35d291d4f45 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/windowinto.yaml @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pipelines: + - pipeline: + type: composite + transforms: + # setup + - type: Create + config: + elements: + - {k: "x", t: 1} + - {k: "x", t: 8} + - {k: "x", t: 11} + - {k: "y", t: 101} + - type: AssignTimestamps + input: Create + config: + timestamp: t + + # global windowing + - type: chain + name: Global + input: AssignTimestamps + transforms: + - type: WindowInto + config: + windowing: + type: global + - type: Combine + config: + group_by: 'k' + combine: + t: sum + - type: AssertEqual + config: + elements: + - {k: "x", t: 20} + - {k: "y", t: 101} + + # fixed windowing + - type: chain + name: Fixed + input: AssignTimestamps + transforms: + + - type: WindowInto + config: + windowing: + type: fixed + size: 10s + offset: 5s + - type: Combine + config: + group_by: 'k' + combine: + t: sum + - type: AssertEqual + config: + elements: + - {k: "x", t: 1} + - {k: "x", t: 19} + - {k: "y", t: 101} + + # sliding windowing + - type: chain + name: Sliding + input: AssignTimestamps + transforms: + - type: WindowInto + config: + windowing: + type: sliding + size: 20s + period: 10s + - type: Combine + config: + group_by: 'k' + combine: + t: sum + - type: AssertEqual + config: + elements: + - {k: "x", t: 9} # [-10, 10) + - {k: "x", t: 20} # [ 0, 20) + - {k: "x", t: 11} # [ 10, 30) + - {k: "y", t: 101} # [90, 110) + - {k: "y", t: 101} # [100, 120) + + # session windowing + - type: chain + name: Sessions + input: AssignTimestamps + transforms: + - type: WindowInto + config: + windowing: + type: sessions + gap: 5s + - type: Combine + config: + group_by: 'k' + combine: + t: sum + - type: AssertEqual + config: + elements: + - {k: "x", t: 1} + - {k: "x", t: 19} + - {k: "y", t: 101} + diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 2b21d0463c98..74df467f2b17 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -422,6 +422,9 @@ def get_portability_package_data(): 'cryptography>=41.0.2', 'hypothesis>5.0.0,<7.0.0', 'virtualenv-clone>=0.5,<1.0', + 'mysql-connector-python>=9.3.0', + 'python-tds>=1.16.1', + 'sqlalchemy-pytds>=1.0.2' ], 'gcp': [ 'cachetools>=3.1.0,<6',