Skip to content

Commit d8af354

Browse files
authored
Yaml IT - Phase 3b (#35022)
* add kafka yaml test * add oracle yaml test * add pubsub yaml test * add runinference yaml test * fix rebase confilct * remove env vars parameter that shouldn't be exposed * add kafka dependency * fix rebase confilct * fix lint issue * update todo from derrickaw to issue #
1 parent ebdaa4a commit d8af354

File tree

7 files changed

+398
-7
lines changed

7 files changed

+398
-7
lines changed

sdks/python/apache_beam/yaml/integration_tests.py

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import itertools
2424
import logging
2525
import os
26+
import random
2627
import sqlite3
28+
import string
2729
import unittest
2830
import uuid
2931

@@ -33,6 +35,11 @@
3335
import pytds
3436
import sqlalchemy
3537
import yaml
38+
from google.cloud import pubsub_v1
39+
from testcontainers.core.container import DockerContainer
40+
from testcontainers.core.waiting_utils import wait_for_logs
41+
from testcontainers.google import PubSubContainer
42+
from testcontainers.kafka import KafkaContainer
3643
from testcontainers.mssql import SqlServerContainer
3744
from testcontainers.mysql import MySqlContainer
3845
from testcontainers.postgres import PostgresContainer
@@ -351,6 +358,129 @@ def temp_sqlserver_database():
351358
raise err
352359

353360

361+
class OracleTestContainer(DockerContainer):
362+
"""
363+
OracleTestContainer is an updated version of OracleDBContainer that goes
364+
ahead and sets the oracle password, waits for logs to establish that the
365+
container is ready before calling get_exposed_port, and uses a more modern
366+
oracle driver.
367+
"""
368+
def __init__(self):
369+
super().__init__("gvenzl/oracle-xe:21-slim")
370+
self.with_env("ORACLE_PASSWORD", "oracle")
371+
self.with_exposed_ports(1521)
372+
373+
def start(self):
374+
super().start()
375+
wait_for_logs(self, "DATABASE IS READY TO USE!", timeout=300)
376+
return self
377+
378+
def get_connection_url(self):
379+
port = self.get_exposed_port(1521)
380+
return \
381+
f"oracle+oracledb://system:oracle@localhost:{port}/?service_name=XEPDB1"
382+
383+
384+
@contextlib.contextmanager
385+
def temp_oracle_database():
386+
"""Context manager to provide a temporary Oracle database for testing.
387+
388+
This function utilizes the 'testcontainers' library to spin up an
389+
Oracle Database instance within a Docker container. It then connects
390+
to this temporary database using 'oracledb', creates a predefined
391+
392+
NOTE: A custom OracleTestContainer class was created due to the current
393+
version (OracleDBContainer) that calls get_exposed_port too soon causing the
394+
service to hang until timeout.
395+
396+
Yields:
397+
str: A JDBC connection string for the temporary Oracle database.
398+
Example format:
399+
"jdbc:oracle:thin:system/oracle@localhost:{port}/XEPDB1"
400+
401+
Raises:
402+
oracledb.Error: If there's an error connecting to or interacting with
403+
the Oracle database during setup.
404+
Exception: Any other exception encountered during the setup process.
405+
"""
406+
with OracleTestContainer() as oracle:
407+
engine = sqlalchemy.create_engine(oracle.get_connection_url())
408+
with engine.connect() as connection:
409+
connection.execute(
410+
sqlalchemy.text(
411+
"""
412+
CREATE TABLE tmp_table (
413+
value NUMBER PRIMARY KEY,
414+
rank NUMBER
415+
)
416+
"""))
417+
connection.commit()
418+
port = oracle.get_exposed_port(1521)
419+
yield f"jdbc:oracle:thin:system/oracle@localhost:{port}/XEPDB1"
420+
421+
422+
@contextlib.contextmanager
423+
def temp_kafka_server():
424+
"""Context manager to provide a temporary Kafka server for testing.
425+
426+
This function utilizes the 'testcontainers' library to spin up a Kafka
427+
instance within a Docker container. It then yields the bootstrap server
428+
string, which can be used by Kafka clients to connect to this temporary
429+
server.
430+
431+
The Docker container and the Kafka instance are automatically managed
432+
and torn down when the context manager exits.
433+
434+
Yields:
435+
str: The bootstrap server string for the temporary Kafka instance.
436+
Example format: "localhost:XXXXX" or "PLAINTEXT://localhost:XXXXX"
437+
438+
Raises:
439+
Exception: If there's an error starting the Kafka container or
440+
interacting with the temporary Kafka server.
441+
"""
442+
try:
443+
with KafkaContainer() as kafka_container:
444+
yield kafka_container.get_bootstrap_server()
445+
except Exception as err:
446+
logging.error("Error interacting with temporary Kakfa Server: %s", err)
447+
raise err
448+
449+
450+
@contextlib.contextmanager
451+
def temp_pubsub_emulator(project_id="apache-beam-testing"):
452+
"""
453+
Context manager to provide a temporary Pub/Sub emulator for testing.
454+
455+
This function uses 'testcontainers' to spin up a Google Cloud SDK
456+
container running the Pub/Sub emulator. It yields the emulator host
457+
string (e.g., "localhost:xxxxx") that can be used to configure Pub/Sub
458+
clients.
459+
460+
The Docker container is automatically managed and torn down when the
461+
context manager exits.
462+
463+
Args:
464+
project_id (str): The GCP project ID to be used by the emulator.
465+
This doesn't need to be a real project for the emulator.
466+
467+
Yields:
468+
str: The host and port for the Pub/Sub emulator (e.g., "localhost:xxxx").
469+
This will be the address to point your Pub/Sub client to.
470+
471+
Raises:
472+
Exception: If the container fails to start or the emulator isn't ready.
473+
"""
474+
with PubSubContainer(project=project_id) as pubsub_container:
475+
publisher = pubsub_v1.PublisherClient()
476+
random_front_charactor = random.choice(string.ascii_lowercase)
477+
topic_id = f"{random_front_charactor}{uuid.uuid4().hex[:8]}"
478+
topic_name_to_create = \
479+
f"projects/{pubsub_container.project}/topics/{topic_id}"
480+
created_topic_object = publisher.create_topic(name=topic_name_to_create)
481+
yield created_topic_object.name
482+
483+
354484
def replace_recursive(spec, vars):
355485
if isinstance(spec, dict):
356486
return {
@@ -359,7 +489,10 @@ def replace_recursive(spec, vars):
359489
}
360490
elif isinstance(spec, list):
361491
return [replace_recursive(value, vars) for value in spec]
362-
elif isinstance(spec, str) and '{' in spec and '{\n' not in spec:
492+
# TODO(https://github.com/apache/beam/issues/35067): Consider checking for
493+
# callable in the if branch above instead of checking lambda here.
494+
elif isinstance(
495+
spec, str) and '{' in spec and '{\n' not in spec and 'lambda' not in spec:
363496
try:
364497
return spec.format(**vars)
365498
except Exception as exn:
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
fixtures:
19+
- name: TEMP_BOOTSTAP_SERVER
20+
type: "apache_beam.yaml.integration_tests.temp_kafka_server"
21+
22+
pipelines:
23+
# Kafka write pipeline
24+
- pipeline:
25+
type: chain
26+
transforms:
27+
- type: Create
28+
config:
29+
elements:
30+
- {value: 123}
31+
- {value: 456}
32+
- {value: 789}
33+
- type: MapToFields
34+
config:
35+
language: python
36+
fields:
37+
value:
38+
callable: |
39+
lambda row: str(row.value).encode('utf-8')
40+
output_type: bytes
41+
- type: WriteToKafka
42+
config:
43+
format: "RAW"
44+
topic: "silly_topic"
45+
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
46+
producer_config_updates:
47+
linger.ms: "0"
48+
49+
# Kafka read pipeline
50+
# Need a separate read pipeline to make sure the write pipeline is flushed
51+
- pipeline:
52+
type: chain
53+
transforms:
54+
- type: ReadFromKafka
55+
config:
56+
format: "RAW"
57+
topic: "silly_topic"
58+
bootstrap_servers: "{TEMP_BOOTSTAP_SERVER}"
59+
consumer_config:
60+
auto.offset.reset: "earliest"
61+
group.id: "yaml-kafka-test-group"
62+
max_read_time_seconds: 10 # will read forever if not set
63+
- type: MapToFields
64+
config:
65+
language: python
66+
fields:
67+
value:
68+
callable: |
69+
# Kafka RAW format reads messages as bytes in the 'payload' field of a Row
70+
lambda row: row.payload.decode('utf-8')
71+
output_type: string
72+
- type: AssertEqual
73+
config:
74+
elements:
75+
- {value: "123"}
76+
- {value: "456"}
77+
- {value: "789"}
78+
79+
# TODO: Error handling hard to trigger upon initial investigations. Need to
80+
# investigate more.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
fixtures:
19+
- name: TEMP_DB
20+
type: "apache_beam.yaml.integration_tests.temp_oracle_database"
21+
22+
pipelines:
23+
# Oracle write pipeline
24+
- pipeline:
25+
type: chain
26+
transforms:
27+
- type: Create
28+
config:
29+
elements:
30+
- {value: 123, rank: 0}
31+
- {value: 456, rank: 1}
32+
- {value: 789, rank: 2}
33+
- type: WriteToOracle
34+
config:
35+
url: "{TEMP_DB}"
36+
query: "INSERT INTO tmp_table (value, rank) VALUES(?,?)"
37+
38+
# Oracle read pipeline
39+
# Need a separate read pipeline to make sure the write pipeline is flushed
40+
- pipeline:
41+
type: chain
42+
transforms:
43+
- type: ReadFromOracle
44+
config:
45+
url: "{TEMP_DB}"
46+
query: "SELECT * FROM tmp_table"
47+
driver_class_name: "oracle.jdbc.OracleDriver"
48+
- type: MapToFields
49+
config:
50+
language: python
51+
fields:
52+
value:
53+
callable: "lambda x: int(x.VALUE)"
54+
output_type: integer
55+
rank:
56+
callable: "lambda x: int(x.RANK)"
57+
output_type: integer
58+
- type: AssertEqual
59+
config:
60+
elements:
61+
- {value: 123, rank: 0}
62+
- {value: 456, rank: 1}
63+
- {value: 789, rank: 2}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
fixtures:
19+
- name: PS_TOPIC
20+
type: "apache_beam.yaml.integration_tests.temp_pubsub_emulator"
21+
config:
22+
project_id: "apache-beam-testing"
23+
24+
pipelines:
25+
# Pubsub write pipeline
26+
- pipeline:
27+
type: chain
28+
transforms:
29+
- type: Create
30+
config:
31+
elements:
32+
- {value: "11a"}
33+
- {value: "37a"}
34+
- {value: "389a"}
35+
- type: WriteToPubSub
36+
config:
37+
topic: "{PS_TOPIC}"
38+
format: "RAW"
39+
40+
options:
41+
streaming: true
42+
43+
44+
# TODO: Current PubSubIO doesn't have a max_read_time_seconds parameter like
45+
# Kafka does. Without it, the ReadFromPubSub will run forever. This is not a
46+
# trival change. For now, we will live with the mocked tests located
47+
# [here](https://github.com/apache/beam/blob/bea04446b41c86856c24d0a9761622092ed9936f/sdks/python/apache_beam/yaml/yaml_io_test.py#L83).
48+
49+
# - pipeline:
50+
# type: chain
51+
# transforms:
52+
# - type: ReadFromPubSub
53+
# config:
54+
# topic: "{PS_TOPIC}"
55+
# format: "RAW"
56+
# # ...
57+
58+
59+
# options:
60+
# streaming: true
61+

0 commit comments

Comments
 (0)