Skip to content

Commit 6b06823

Browse files
authored
[yaml]: Phases 3-4 Moderate-High Usage Yaml examples (#35423)
* rebase * update infra and input data to current * add phases 3-4 examples * remove other PR code * remove other pubsub previous pr code * add streaming option * similfy code from previous PR into these code changes * address comments
1 parent 3f3f214 commit 6b06823

File tree

10 files changed

+548
-71
lines changed

10 files changed

+548
-71
lines changed

sdks/python/apache_beam/yaml/examples/testing/examples_test.py

Lines changed: 103 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,8 @@ def _wordcount_test_preprocessor(
418418
env.input_file('kinglear.txt', '\n'.join(lines)))
419419

420420

421-
@YamlExamplesTestSuite.register_test_preprocessor('test_kafka_yaml')
421+
@YamlExamplesTestSuite.register_test_preprocessor(
422+
['test_kafka_yaml', 'test_kafka_to_iceberg_yaml'])
422423
def _kafka_test_preprocessor(
423424
test_spec: dict, expected: List[str], env: TestEnvironment):
424425

@@ -448,7 +449,15 @@ def _kafka_test_preprocessor(
448449
'test_pubsub_topic_to_bigquery_yaml',
449450
'test_pubsub_subscription_to_bigquery_yaml',
450451
'test_jdbc_to_bigquery_yaml',
451-
'test_spanner_to_avro_yaml'
452+
'test_spanner_to_avro_yaml',
453+
'test_gcs_text_to_bigquery_yaml',
454+
'test_sqlserver_to_bigquery_yaml',
455+
'test_postgres_to_bigquery_yaml',
456+
'test_kafka_to_iceberg_yaml',
457+
'test_pubsub_to_iceberg_yaml',
458+
'test_oracle_to_bigquery_yaml',
459+
'test_mysql_to_bigquery_yaml',
460+
'test_spanner_to_bigquery_yaml'
452461
])
453462
def _io_write_test_preprocessor(
454463
test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -482,8 +491,11 @@ def _io_write_test_preprocessor(
482491
return test_spec
483492

484493

485-
@YamlExamplesTestSuite.register_test_preprocessor(
486-
['test_simple_filter_yaml', 'test_simple_filter_and_combine_yaml'])
494+
@YamlExamplesTestSuite.register_test_preprocessor([
495+
'test_simple_filter_yaml',
496+
'test_simple_filter_and_combine_yaml',
497+
'test_gcs_text_to_bigquery_yaml'
498+
])
487499
def _file_io_read_test_preprocessor(
488500
test_spec: dict, expected: List[str], env: TestEnvironment):
489501
"""
@@ -560,7 +572,8 @@ def _iceberg_io_read_test_preprocessor(
560572
@YamlExamplesTestSuite.register_test_preprocessor([
561573
'test_spanner_read_yaml',
562574
'test_enrich_spanner_with_bigquery_yaml',
563-
"test_spanner_to_avro_yaml"
575+
'test_spanner_to_avro_yaml',
576+
'test_spanner_to_bigquery_yaml'
564577
])
565578
def _spanner_io_read_test_preprocessor(
566579
test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -642,13 +655,13 @@ def _enrichment_test_preprocessor(
642655

643656
@YamlExamplesTestSuite.register_test_preprocessor([
644657
'test_pubsub_topic_to_bigquery_yaml',
645-
'test_pubsub_subscription_to_bigquery_yaml'
658+
'test_pubsub_subscription_to_bigquery_yaml',
659+
'test_pubsub_to_iceberg_yaml'
646660
])
647661
def _pubsub_io_read_test_preprocessor(
648662
test_spec: dict, expected: List[str], env: TestEnvironment):
649663
"""
650664
Preprocessor for tests that involve reading from Pub/Sub.
651-
652665
This preprocessor replaces any ReadFromPubSub transform with a Create
653666
transform that reads from a predefined in-memory list of messages.
654667
This allows the test to verify the pipeline's correctness without relying
@@ -668,27 +681,91 @@ def _pubsub_io_read_test_preprocessor(
668681
def _jdbc_io_read_test_preprocessor(
669682
test_spec: dict, expected: List[str], env: TestEnvironment):
670683
"""
671-
Preprocessor for tests that involve reading from JDBC.
684+
Preprocessor for tests that involve reading from generic Jdbc.
685+
url syntax: 'jdbc:<database-type>://<host>:<port>/<database>'
686+
"""
687+
return _db_io_read_test_processor(
688+
test_spec, lambda url: url.split('/')[-1], 'Jdbc')
672689

673-
This preprocessor replaces any ReadFromJdbc transform with a Create
674-
transform that reads from a predefined in-memory list of records.
675-
This allows the test to verify the pipeline's correctness without
676-
relying on an active JDBC connection.
690+
691+
@YamlExamplesTestSuite.register_test_preprocessor([
692+
'test_sqlserver_to_bigquery_yaml',
693+
])
694+
def __sqlserver_io_read_test_preprocessor(
695+
test_spec: dict, expected: List[str], env: TestEnvironment):
696+
"""
697+
Preprocessor for tests that involve reading from SqlServer.
698+
url syntax: 'jdbc:sqlserver://<host>:<port>;databaseName=<database>;
699+
user=<user>;password=<password>;encrypt=false;trustServerCertificate=true'
700+
"""
701+
return _db_io_read_test_processor(
702+
test_spec, lambda url: url.split(';')[1].split('=')[-1], 'SqlServer')
703+
704+
705+
@YamlExamplesTestSuite.register_test_preprocessor([
706+
'test_postgres_to_bigquery_yaml',
707+
])
708+
def __postgres_io_read_test_preprocessor(
709+
test_spec: dict, expected: List[str], env: TestEnvironment):
710+
"""
711+
Preprocessor for tests that involve reading from Postgres.
712+
url syntax: 'jdbc:postgresql://<host>:<port>/shipment?user=<user>&
713+
password=<password>'
714+
"""
715+
return _db_io_read_test_processor(
716+
test_spec, lambda url: url.split('/')[3].split('?')[0], 'Postgres')
717+
718+
719+
@YamlExamplesTestSuite.register_test_preprocessor([
720+
'test_oracle_to_bigquery_yaml',
721+
])
722+
def __oracle_io_read_test_preprocessor(
723+
test_spec: dict, expected: List[str], env: TestEnvironment):
724+
"""
725+
Preprocessor for tests that involve reading from Oracle.
726+
url syntax: 'jdbc:oracle:thin:system/oracle@<host>:{port}/<database>'
727+
"""
728+
return _db_io_read_test_processor(
729+
test_spec, lambda url: url.split('/')[2], 'Oracle')
730+
731+
732+
@YamlExamplesTestSuite.register_test_preprocessor([
733+
'test_mysql_to_bigquery_yaml',
734+
])
735+
def __mysql_io_read_test_preprocessor(
736+
test_spec: dict, expected: List[str], env: TestEnvironment):
737+
"""
738+
Preprocessor for tests that involve reading from MySql.
739+
url syntax: 'jdbc:mysql://<host>:<port>/<database>?user=<user>&
740+
password=<password>'
741+
"""
742+
return _db_io_read_test_processor(
743+
test_spec, lambda url: url.split('/')[3].split('?')[0], 'MySql')
744+
745+
746+
def _db_io_read_test_processor(
747+
test_spec: dict, database_url_fn: Callable, database_type: str):
748+
"""
749+
This preprocessor replaces any ReadFrom<database> transform with a Create
750+
transform that reads from a predefined in-memory list of records. This allows
751+
the test to verify the pipeline's correctness without relying on an active
752+
database.
677753
"""
678754
if pipeline := test_spec.get('pipeline', None):
679755
for transform in pipeline.get('transforms', []):
680-
if transform.get('type', '').startswith('ReadFromJdbc'):
756+
transform_name = f"ReadFrom{database_type}"
757+
if transform.get('type', '').startswith(transform_name):
681758
config = transform['config']
682759
url = config['url']
683-
database = url.split('/')[-1]
760+
database = database_url_fn(url)
684761
if (table := config.get('table', None)) is None:
685762
table = config.get('query', '').split('FROM')[-1].strip()
686763
transform['type'] = 'Create'
687764
transform['config'] = {
688765
k: v
689766
for k, v in config.items() if k.startswith('__')
690767
}
691-
elements = INPUT_TABLES[("Jdbc", database, table)]
768+
elements = INPUT_TABLES[(database_type, database, table)]
692769
if config.get('query', None):
693770
config['query'].replace('select ',
694771
'SELECT ').replace(' from ', ' FROM ')
@@ -705,17 +782,24 @@ def _jdbc_io_read_test_preprocessor(
705782
return test_spec
706783

707784

708-
INPUT_FILES = {'products.csv': input_data.products_csv()}
785+
INPUT_FILES = {
786+
'products.csv': input_data.products_csv(),
787+
'kinglear.txt': input_data.text_data()
788+
}
789+
709790
INPUT_TABLES = {
710-
('shipment-test', 'shipment', 'shipments'): input_data.
711-
spanner_shipments_data(),
791+
('shipment-test', 'shipment', 'shipments'): input_data.shipments_data(),
712792
('orders-test', 'order-database', 'orders'): input_data.
713793
spanner_orders_data(),
714794
('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(),
715795
('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data.
716796
bigtable_data(),
717797
('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data(),
718-
('Jdbc', 'shipment', 'shipments'): input_data.jdbc_shipments_data()
798+
('Jdbc', 'shipment', 'shipments'): input_data.shipments_data(),
799+
('SqlServer', 'shipment', 'shipments'): input_data.shipments_data(),
800+
('Postgres', 'shipment', 'shipments'): input_data.shipments_data(),
801+
('Oracle', 'shipment', 'shipments'): input_data.shipments_data(),
802+
('MySql', 'shipment', 'shipments'): input_data.shipments_data()
719803
}
720804
YAML_DOCS_DIR = os.path.join(os.path.dirname(__file__))
721805

sdks/python/apache_beam/yaml/examples/testing/input_data.py

Lines changed: 1 addition & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -78,58 +78,7 @@ def spanner_orders_data():
7878
}]
7979

8080

81-
def spanner_shipments_data():
82-
return [{
83-
'shipment_id': 'S1',
84-
'customer_id': 'C1',
85-
'shipment_date': '2023-05-01',
86-
'shipment_cost': 150.0,
87-
'customer_name': 'Alice',
88-
'customer_email': 'alice@example.com'
89-
},
90-
{
91-
'shipment_id': 'S2',
92-
'customer_id': 'C2',
93-
'shipment_date': '2023-06-12',
94-
'shipment_cost': 300.0,
95-
'customer_name': 'Bob',
96-
'customer_email': 'bob@example.com'
97-
},
98-
{
99-
'shipment_id': 'S3',
100-
'customer_id': 'C1',
101-
'shipment_date': '2023-05-10',
102-
'shipment_cost': 20.0,
103-
'customer_name': 'Alice',
104-
'customer_email': 'alice@example.com'
105-
},
106-
{
107-
'shipment_id': 'S4',
108-
'customer_id': 'C4',
109-
'shipment_date': '2024-07-01',
110-
'shipment_cost': 150.0,
111-
'customer_name': 'Derek',
112-
'customer_email': 'derek@example.com'
113-
},
114-
{
115-
'shipment_id': 'S5',
116-
'customer_id': 'C5',
117-
'shipment_date': '2023-05-09',
118-
'shipment_cost': 300.0,
119-
'customer_name': 'Erin',
120-
'customer_email': 'erin@example.com'
121-
},
122-
{
123-
'shipment_id': 'S6',
124-
'customer_id': 'C4',
125-
'shipment_date': '2024-07-02',
126-
'shipment_cost': 150.0,
127-
'customer_name': 'Derek',
128-
'customer_email': 'derek@example.com'
129-
}]
130-
131-
132-
def jdbc_shipments_data():
81+
def shipments_data():
13382
return [{
13483
'shipment_id': 'S1',
13584
'customer_id': 'C1',
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# coding=utf-8
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
# This is an example of a Beam YAML pipeline that reads from spanner database
19+
# and writes to GCS avro files. This matches the Dataflow Template located
20+
# here - https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-to-avro
21+
22+
pipeline:
23+
type: chain
24+
transforms:
25+
# Step 1: Reading data from GCS
26+
- type: ReadFromText
27+
name: ReadFromGCS
28+
config:
29+
path: gs://dataflow-samples/shakespeare/kinglear.txt
30+
# Step 2: Write records out to BigQuery
31+
- type: WriteToBigQuery
32+
name: WriteWords
33+
config:
34+
table: "apache-beam-testing.yaml_test.words"
35+
create_disposition: "CREATE_NEVER"
36+
write_disposition: "WRITE_APPEND"
37+
num_streams: 1
38+
39+
40+
# Expected:
41+
# Row(line='Fool\tThou shouldst not have been old till thou hadst')
42+
# Row(line='\tbeen wise.')
43+
# Row(line='KING LEAR\tNothing will come of nothing: speak again.')
44+
# Row(line='\tNever, never, never, never, never!')
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# coding=utf-8
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
# A pipeline that both writes to and reads from the same Kafka topic.
20+
21+
pipeline:
22+
type: chain
23+
transforms:
24+
# Step 1: Reading data from Kafka
25+
- type: ReadFromKafka
26+
name: ReadFromMyTopic
27+
config:
28+
format: "RAW"
29+
topic: "{{ TOPIC }}"
30+
bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
31+
auto_offset_reset_config: earliest
32+
consumer_config:
33+
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
34+
username={{ USERNAME }} \
35+
password={{ PASSWORD }};"
36+
security.protocol: "SASL_PLAINTEXT"
37+
sasl.mechanism: "PLAIN"
38+
# Step 2: Convert Kafka records
39+
- type: MapToFields
40+
name: ParseKafkaRecords
41+
config:
42+
language: python
43+
fields:
44+
text:
45+
callable: |
46+
def func(row):
47+
# Kafka RAW format reads messages as bytes
48+
# in the 'payload' field of a Row
49+
return row.payload.decode('utf-8')
50+
# Step 3: Write records out to Iceberg
51+
- type: WriteToIceberg
52+
name: WriteToAnIcebergTable
53+
config:
54+
# Dynamic destinations
55+
table: "db.users.{zip}"
56+
catalog_name: "hadoop_catalog"
57+
catalog_properties:
58+
type: "hadoop"
59+
warehouse: "gs://MY-WAREHOUSE"
60+
# Hadoop catalog config required to run pipeline locally
61+
# Omit if running on Dataflow
62+
config_properties:
63+
"fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
64+
"fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"
65+
66+
options:
67+
streaming: true
68+
69+
# Expected:
70+
# Row(text='Fool\tThou shouldst not have been old till thou hadst')
71+
# Row(text='\tbeen wise.')
72+
# Row(text='KING LEAR\tNothing will come of nothing: speak again.')
73+
# Row(text='\tNever, never, never, never, never!')

0 commit comments

Comments
 (0)