Skip to content

Commit e22fdc1

Browse files
authored
Yaml IT - Phase 3a (#34782)
* add jdbc yaml test * add mysql yaml test * add postgres yaml test * add sqlserver yaml test * add windowinto yaml test * add additional db connectors for mssql and mysql * add support for sqlite * fix lint issues * change to pytds connector for sqlserver * fix lint issues * add sqlchemy package * fix yapf issues * update DBs connection to sqlalchemy * yapf changes * fix pylint issue
1 parent cbd9c87 commit e22fdc1

File tree

8 files changed

+618
-0
lines changed

8 files changed

+618
-0
lines changed

sdks/java/extensions/schemaio-expansion-service/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ dependencies {
6464
permitUnusedDeclared 'com.google.cloud:alloydb-jdbc-connector:1.2.0'
6565
testImplementation library.java.junit
6666
testImplementation library.java.mockito_core
67+
runtimeOnly ("org.xerial:sqlite-jdbc:3.49.1.0")
6768
}
6869

6970
task runExpansionService (type: JavaExec) {

sdks/python/apache_beam/yaml/integration_tests.py

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,19 @@
2323
import itertools
2424
import logging
2525
import os
26+
import sqlite3
2627
import unittest
2728
import uuid
2829

2930
import mock
31+
import mysql.connector
32+
import psycopg2
33+
import pytds
34+
import sqlalchemy
3035
import yaml
36+
from testcontainers.mssql import SqlServerContainer
37+
from testcontainers.mysql import MySqlContainer
38+
from testcontainers.postgres import PostgresContainer
3139

3240
import apache_beam as beam
3341
from apache_beam.io import filesystems
@@ -42,13 +50,45 @@
4250

4351
@contextlib.contextmanager
4452
def gcs_temp_dir(bucket):
53+
"""Context manager to create and clean up a temporary GCS directory.
54+
55+
Creates a unique temporary directory within the specified GCS bucket
56+
and yields the path. Upon exiting the context, the directory and its
57+
contents are deleted.
58+
59+
Args:
60+
bucket (str): The GCS bucket name (e.g., 'gs://my-bucket').
61+
62+
Yields:
63+
str: The full path to the created temporary GCS directory.
64+
Example: 'gs://my-bucket/yaml-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
65+
"""
4566
gcs_tempdir = bucket + '/yaml-' + str(uuid.uuid4())
4667
yield gcs_tempdir
4768
filesystems.FileSystems.delete([gcs_tempdir])
4869

4970

5071
@contextlib.contextmanager
5172
def temp_spanner_table(project, prefix='temp_spanner_db_'):
73+
"""Context manager to create and clean up a temporary Spanner database and
74+
table.
75+
76+
Creates a unique temporary Spanner database within the specified project
77+
and a predefined table named 'tmp_table' with columns ['UserId', 'Key'].
78+
It yields connection details for the created resources. Upon exiting the
79+
context, the temporary database (and its table) is deleted.
80+
81+
Args:
82+
project (str): The Google Cloud project ID.
83+
prefix (str): A prefix to use for the temporary database name.
84+
Defaults to 'temp_spanner_db_'.
85+
86+
Yields:
87+
list[str]: A list containing connection details:
88+
[project_id, instance_id, database_id, table_name, list_of_columns].
89+
Example: ['my-project', 'beam-test', 'temp_spanner_db_...', 'tmp_table',
90+
['UserId', 'Key']]
91+
"""
5292
spanner_client = SpannerWrapper(project)
5393
spanner_client._create_database()
5494
instance = "beam-test"
@@ -65,6 +105,26 @@ def temp_spanner_table(project, prefix='temp_spanner_db_'):
65105

66106
@contextlib.contextmanager
67107
def temp_bigquery_table(project, prefix='yaml_bq_it_'):
108+
"""Context manager to create and clean up a temporary BigQuery dataset.
109+
110+
Creates a unique temporary BigQuery dataset within the specified project.
111+
It yields a placeholder table name string within that dataset (e.g.,
112+
'project.dataset_id.tmp_table'). The actual table is expected to be
113+
created by the test using this context.
114+
115+
Upon exiting the context, the temporary dataset and all its contents
116+
(including any tables created within it) are deleted.
117+
118+
Args:
119+
project (str): The Google Cloud project ID.
120+
prefix (str): A prefix to use for the temporary dataset name.
121+
Defaults to 'yaml_bq_it_'.
122+
123+
Yields:
124+
str: The full path for a temporary BigQuery table within the created
125+
dataset.
126+
Example: 'my-project.yaml_bq_it_a1b2c3d4e5f6...tmp_table'
127+
"""
68128
bigquery_client = BigQueryWrapper()
69129
dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex)
70130
bigquery_client.get_or_create_dataset(project, dataset_id)
@@ -76,6 +136,221 @@ def temp_bigquery_table(project, prefix='yaml_bq_it_'):
76136
bigquery_client.client.datasets.Delete(request)
77137

78138

139+
@contextlib.contextmanager
140+
def temp_sqlite_database(prefix='yaml_jdbc_it_'):
141+
"""Context manager to provide a temporary SQLite database via JDBC for
142+
testing.
143+
144+
This function creates a temporary SQLite database file on the local
145+
filesystem. It establishes a connection using 'sqlite3', creates a predefined
146+
'tmp_table', and then yields a JDBC connection string suitable for use in
147+
tests that require a generic JDBC connection (specifically configured for
148+
SQLite in this case).
149+
150+
The SQLite database file is automatically cleaned up (closed and deleted)
151+
when the context manager exits.
152+
153+
Args:
154+
prefix (str): A prefix to use for the temporary database file name.
155+
156+
Yields:
157+
str: A JDBC connection string for the temporary SQLite database.
158+
Example format: "jdbc:sqlite:<path_to_db_file>"
159+
160+
Raises:
161+
sqlite3.Error: If there's an error connecting to or interacting with
162+
the SQLite database during setup.
163+
Exception: Any other exception encountered during the setup or cleanup
164+
process.
165+
"""
166+
conn = cursor = None
167+
try:
168+
# Establish connection to the temp file
169+
db_name = f'{prefix}{uuid.uuid4().hex}.db'
170+
conn = sqlite3.connect(db_name)
171+
cursor = conn.cursor()
172+
173+
# Create a temp table for tests
174+
cursor.execute(
175+
'''
176+
CREATE TABLE tmp_table (
177+
value INTEGER PRIMARY KEY,
178+
rank INTEGER
179+
)
180+
''')
181+
conn.commit()
182+
yield f'jdbc:sqlite:{db_name}'
183+
except (sqlite3.Error, Exception) as err:
184+
logging.error("Error interacting with temporary SQLite DB: %s", err)
185+
raise err
186+
finally:
187+
# Close connections
188+
if cursor:
189+
cursor.close()
190+
if conn:
191+
conn.close()
192+
try:
193+
if os.path.exists(db_name):
194+
os.remove(db_name)
195+
except Exception as err:
196+
logging.error("Error deleting temporary SQLite DB: %s", err)
197+
raise err
198+
199+
200+
@contextlib.contextmanager
201+
def temp_mysql_database():
202+
"""Context manager to provide a temporary MySQL database for testing.
203+
204+
This function utilizes the 'testcontainers' library to spin up a
205+
MySQL instance within a Docker container. It then connects
206+
to this temporary database using 'mysql.connector', creates a predefined
207+
'tmp_table', and yields a JDBC connection string suitable for use in tests.
208+
209+
The Docker container and the database instance are automatically managed
210+
and torn down when the context manager exits.
211+
212+
Yields:
213+
str: A JDBC connection string for the temporary MySQL database.
214+
Example format:
215+
"jdbc:mysql://<host>:<port>/<db_name>?
216+
user=<user>&password=<password>"
217+
218+
Raises:
219+
mysql.connector.Error: If there's an error connecting to or interacting
220+
with the MySQL database during setup.
221+
Exception: Any other exception encountered during the setup process.
222+
"""
223+
with MySqlContainer() as mysql_container:
224+
try:
225+
# Make connection to temp database and create tmp table
226+
engine = sqlalchemy.create_engine(mysql_container.get_connection_url())
227+
with engine.begin() as connection:
228+
connection.execute(
229+
sqlalchemy.text(
230+
"CREATE TABLE tmp_table (value INTEGER, `rank` INTEGER);"))
231+
232+
# Construct the JDBC url for connections later on by tests
233+
jdbc_url = (
234+
f"jdbc:mysql://{mysql_container.get_container_host_ip()}:"
235+
f"{mysql_container.get_exposed_port(mysql_container.port_to_expose)}/"
236+
f"{mysql_container.MYSQL_DATABASE}?"
237+
f"user={mysql_container.MYSQL_USER}&"
238+
f"password={mysql_container.MYSQL_PASSWORD}")
239+
240+
yield jdbc_url
241+
except mysql.connector.Error as err:
242+
logging.error("Error interacting with temporary MySQL DB: %s", err)
243+
raise err
244+
245+
246+
@contextlib.contextmanager
247+
def temp_postgres_database():
248+
"""Context manager to provide a temporary PostgreSQL database for testing.
249+
250+
This function utilizes the 'testcontainers' library to spin up a
251+
PostgreSQL instance within a Docker container. It then connects
252+
to this temporary database using 'psycopg2', creates a predefined 'tmp_table',
253+
and yields a JDBC connection string suitable for use in tests.
254+
255+
The Docker container and the database instance are automatically managed
256+
and torn down when the context manager exits.
257+
258+
Yields:
259+
str: A JDBC connection string for the temporary PostgreSQL database.
260+
Example format:
261+
"jdbc:postgresql://<host>:<port>/<db_name>?
262+
user=<user>&password=<password>"
263+
264+
Raises:
265+
psycopg2.Error: If there's an error connecting to or interacting with
266+
the PostgreSQL database during setup.
267+
Exception: Any other exception encountered during the setup process.
268+
"""
269+
default_port = 5432
270+
271+
# Start the postgress container using testcontainers
272+
with PostgresContainer(port=default_port) as postgres_container:
273+
try:
274+
# Make connection to temp database and create tmp table
275+
engine = sqlalchemy.create_engine(postgres_container.get_connection_url())
276+
with engine.begin() as connection:
277+
connection.execute(
278+
sqlalchemy.text(
279+
"CREATE TABLE tmp_table (value INTEGER, rank INTEGER);"))
280+
281+
# Construct the JDBC url for connections later on by tests
282+
jdbc_url = (
283+
f"jdbc:postgresql://{postgres_container.get_container_host_ip()}:"
284+
f"{postgres_container.get_exposed_port(default_port)}/"
285+
f"{postgres_container.POSTGRES_DB}?"
286+
f"user={postgres_container.POSTGRES_USER}&"
287+
f"password={postgres_container.POSTGRES_PASSWORD}")
288+
289+
yield jdbc_url
290+
except (psycopg2.Error, Exception) as err:
291+
logging.error("Error interacting with temporary Postgres DB: %s", err)
292+
raise err
293+
294+
295+
@contextlib.contextmanager
296+
def temp_sqlserver_database():
297+
"""Context manager to provide a temporary SQL Server database for testing.
298+
299+
This function utilizes the 'testcontainers' library to spin up a
300+
Microsoft SQL Server instance within a Docker container. It then connects
301+
to this temporary database using 'pytds', creates a predefined 'tmp_table',
302+
and yields a JDBC connection string suitable for use in tests.
303+
304+
The Docker container and the database instance are automatically managed
305+
and torn down when the context manager exits.
306+
307+
Yields:
308+
str: A JDBC connection string for the temporary SQL Server database.
309+
Example format:
310+
"jdbc:sqlserver://<host>:<port>;
311+
databaseName=<db_name>;
312+
user=<user>;
313+
password=<password>;
314+
encrypt=false;
315+
trustServerCertificate=true"
316+
317+
Raises:
318+
pytds.Error: If there's an error connecting to or interacting with
319+
the SQL Server database during setup.
320+
Exception: Any other exception encountered during the setup process.
321+
"""
322+
default_port = 1433
323+
324+
# Start the sql server using testcontainers
325+
with SqlServerContainer(port=default_port,
326+
dialect='mssql+pytds') as sqlserver_container:
327+
try:
328+
# Make connection to temp database and create tmp table
329+
engine = sqlalchemy.create_engine(
330+
sqlserver_container.get_connection_url())
331+
with engine.begin() as connection:
332+
connection.execute(
333+
sqlalchemy.text(
334+
"CREATE TABLE tmp_table (value INTEGER, rank INTEGER);"))
335+
336+
# Construct the JDBC url for connections later on by tests
337+
# NOTE: encrypt=false and trustServerCertificate=true is generally
338+
# needed for test container connections without proper certificates setup
339+
jdbc_url = (
340+
f"jdbc:sqlserver://{sqlserver_container.get_container_host_ip()}:"
341+
f"{int(sqlserver_container.get_exposed_port(default_port))};"
342+
f"databaseName={sqlserver_container.SQLSERVER_DBNAME};"
343+
f"user={sqlserver_container.SQLSERVER_USER};"
344+
f"password={sqlserver_container.SQLSERVER_PASSWORD};"
345+
f"encrypt=true;"
346+
f"trustServerCertificate=true")
347+
348+
yield jdbc_url
349+
except (pytds.Error, Exception) as err:
350+
logging.error("Error interacting with temporary SQL Server DB: %s", err)
351+
raise err
352+
353+
79354
def replace_recursive(spec, vars):
80355
if isinstance(spec, dict):
81356
return {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
19+
fixtures:
20+
- name: TEMP_DB
21+
type: "apache_beam.yaml.integration_tests.temp_sqlite_database"
22+
23+
pipelines:
24+
# Jdbc write pipeline
25+
- pipeline:
26+
type: chain
27+
transforms:
28+
- type: Create
29+
config:
30+
elements:
31+
- {value: 123, rank: 0}
32+
- {value: 456, rank: 1}
33+
- {value: 789, rank: 2}
34+
- type: WriteToJdbc
35+
config:
36+
url: "{TEMP_DB}"
37+
driver_class_name: "org.sqlite.JDBC"
38+
query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)"
39+
40+
# Jdbc read pipeline
41+
- pipeline:
42+
type: chain
43+
transforms:
44+
- type: ReadFromJdbc
45+
config:
46+
url: "{TEMP_DB}"
47+
driver_class_name: "org.sqlite.JDBC"
48+
query: "SELECT * FROM tmp_table"
49+
- type: AssertEqual
50+
config:
51+
elements:
52+
- {value: 123, rank: 0}
53+
- {value: 456, rank: 1}
54+
- {value: 789, rank: 2}
55+

0 commit comments

Comments
 (0)