Skip to content

Commit a1381ad

Browse files
charlespnhrazvanculea
authored andcommitted
Add more YAML examples involving Kafka and Iceberg (apache#35151)
* Add more YAML examples involving Kafka and Iceberg * Fix some missed details from rebasing * Adding unit tests for YAML examples * Clean up and address PR comments * Formatting * Formatting
1 parent d0c3af6 commit a1381ad

File tree

6 files changed

+476
-17
lines changed

6 files changed

+476
-17
lines changed

sdks/python/apache_beam/yaml/examples/README.md

Lines changed: 154 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,51 @@
2020
# Examples Catalog
2121

2222
<!-- TOC -->
23+
2324
* [Examples Catalog](#examples-catalog)
24-
* [Wordcount](#wordcount)
25-
* [Transforms](#transforms)
26-
* [Aggregation](#aggregation)
27-
* [Blueprints](#blueprints)
28-
* [Element-wise](#element-wise)
29-
* [IO](#io)
30-
* [ML](#ml)
25+
* [Wordcount](#wordcount)
26+
* [Transforms](#transforms)
27+
* [Aggregation](#aggregation)
28+
* [Blueprints](#blueprints)
29+
* [Element-wise](#element-wise)
30+
* [IO](#io)
31+
* [ML](#ml)
3132

3233
<!-- TOC -->
3334

3435
## Prerequistes
36+
3537
Build this jar for running with the run command in the next stage:
38+
3639
```
3740
cd <path_to_beam_repo>/beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar
3841
```
3942

4043
## Example Run
44+
4145
This module contains a series of Beam YAML code samples that can be run using
4246
the command:
47+
4348
```
44-
python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/example.yaml
49+
python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/example.yaml
4550
```
4651

4752
Depending on the yaml pipeline, the output may be emitted to standard output or
4853
a file located in the execution folder used.
4954

5055
## Wordcount
56+
5157
A good starting place is the [Wordcount](wordcount_minimal.yaml) example under
5258
the root example directory.
5359
This example reads in a text file, splits the text on each word, groups by each
5460
word, and counts the occurrence of each word. This is a classic example used in
5561
the other SDK's and shows off many of the functionalities of Beam YAML.
5662

5763
## Testing
64+
5865
A test file is located in the testing folder that will execute all the example
5966
yamls and confirm the expected results.
67+
6068
```
6169
pytest -v testing/
6270
@@ -71,25 +79,160 @@ Examples in this directory show off the various built-in transforms of the Beam
7179
YAML framework.
7280

7381
### Aggregation
82+
7483
These examples leverage the built-in `Combine` transform for performing simple
7584
aggregations including sum, mean, count, etc.
7685

7786
### Blueprints
87+
7888
These examples leverage DF or other existing templates and convert them to yaml
7989
blueprints.
8090

8191
### Element-wise
92+
8293
These examples leverage the built-in mapping transforms including `MapToFields`,
8394
`Filter` and `Explode`. More information can be found about mapping transforms
8495
[here](https://beam.apache.org/documentation/sdks/yaml-udf/).
8596

8697
### IO
87-
These examples leverage the built-in `Spanner_Read` and `Spanner_Write`
88-
transform for performing simple reads and writes from a spanner DB.
98+
99+
#### Spanner
100+
101+
Examples [Spanner Read](transforms/io/spanner_read.yaml) and [Spanner Write](
102+
transforms/io/spanner_write.yaml) leverage the built-in `Spanner_Read` and
103+
`Spanner_Write` transforms for performing simple reads and writes from a
104+
Google Spanner database.
105+
106+
#### Kafka
107+
108+
Examples involving Kafka such as [Kafka Read Write](transforms/io/kafka.yaml)
109+
require users to set up a Kafka cluster that Dataflow runner executing the
110+
Beam pipeline has access to.
111+
Please note that `ReadFromKafka` transform has
112+
a [known issue](https://github.com/apache/beam/issues/22809) when
113+
using non-Dataflow portable runners where reading may get stuck in streaming
114+
pipelines. Hence using the Dataflow runner is recommended for examples that
115+
involve reading from Kafka in a streaming pipeline.
116+
117+
See [here](https://kafka.apache.org/quickstart) for general instructions on
118+
setting up a Kafka cluster. An option is to use [Click to Deploy](
119+
https://console.cloud.google.com/marketplace/details/click-to-deploy-images/kafka?)
120+
to quickly launch a Kafka cluster on [GCE](
121+
https://cloud.google.com/products/compute?hl=en). [SASL/PLAIN](
122+
https://kafka.apache.org/documentation/#security_sasl_plain) authentication
123+
mechanism is configured for the brokers as part of the deployment. See
124+
also [here](
125+
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/dataflow/flex-templates/kafka_to_bigquery)
126+
for an alternative step-by-step guide on setting up Kafka on GCE without the
127+
authentication mechanism.
128+
129+
Let's assume one of the bootstrap servers is on VM instance `kafka-vm-0`
130+
with the internal IP address `123.45.67.89` and port `9092` that the bootstrap
131+
server is listening on. SASL/PLAIN `USERNAME` and `PASSWORD` can be viewed from
132+
the VM instance's metadata on the GCE console, or with gcloud CLI:
133+
134+
```sh
135+
gcloud compute instances describe kafka-vm-0 \
136+
--format='value[](metadata.items.kafka-user)'
137+
gcloud compute instances describe kafka-vm-0 \
138+
--format='value[](metadata.items.kafka-password)'
139+
```
140+
141+
Beam pipeline [Kafka Read Write](transforms/io/kafka.yaml) first writes data to
142+
the Kafka topic using the `WriteToKafka` transform and then reads that data back
143+
using the `ReadFromKafka` transform. Run the pipeline:
144+
145+
```sh
146+
export PROJECT="$(gcloud config get-value project)"
147+
export TEMP_LOCATION="gs://MY-BUCKET/tmp"
148+
export REGION="us-central1"
149+
export JOB_NAME="demo-kafka-`date +%Y%m%d-%H%M%S`"
150+
export NUM_WORKERS="1"
151+
152+
python -m apache_beam.yaml.main \
153+
--yaml_pipeline_file transforms/io/kafka.yaml \
154+
--runner DataflowRunner \
155+
--temp_location $TEMP_LOCATION \
156+
--project $PROJECT \
157+
--region $REGION \
158+
--num_workers $NUM_WORKERS \
159+
--job_name $JOB_NAME \
160+
--jinja_variables '{ "BOOTSTRAP_SERVERS": "123.45.67.89:9092",
161+
"TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'
162+
```
163+
164+
**_Optional_**: If Kafka cluster is set up with no SASL/PLAINTEXT authentication
165+
configured for the brokers, there's no SASL/PLAIN `USERNAME` and `PASSWORD`
166+
needed. In the pipelines, omit the configurations `producer_config_updates` and
167+
`consumer_config` from the `WriteToKafka` and `ReadFromKafka` transforms.
168+
Run the commands above without specifying the username and password in
169+
`--jinja_variables` flag.
170+
171+
#### Iceberg
172+
173+
Beam pipelines [Iceberg Write](transforms/io/iceberg_write.yaml) and
174+
[Iceberg Read](transforms/io/iceberg_read.yaml) are examples of how to interact
175+
with Iceberg tables on GCS storage and with Hadoop catalog configured.
176+
177+
To create a GCS bucket as our warehouse storage,
178+
see [here](https://cloud.google.com/storage/docs/creating-buckets#command-line).
179+
To run the pipelines locally, an option is to create a service account key in
180+
order to access GCS (see
181+
[here](https://cloud.google.com/iam/docs/keys-create-delete#creating)).
182+
Within the pipelines, specify GCS bucket name and the path to the saved service
183+
account key .json file.
184+
185+
**_Note_**: With Hadoop catalog, Iceberg will use Hadoop connector for GCS.
186+
See [here](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md)
187+
for full list of configuration options for Hadoop catalog when use with GCS.
188+
189+
To create and write to Iceberg tables on GCS, run:
190+
191+
```sh
192+
python -m apache_beam.yaml.main \
193+
--yaml_pipeline_file transforms/io/iceberg_write.yaml
194+
```
195+
196+
The pipeline uses [Dynamic destinations](
197+
https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations)
198+
write to dynamically create and select a table destination based on field
199+
values in the incoming records.
200+
201+
To read from a created Iceberg table on GCS, run:
202+
203+
```sh
204+
python -m apache_beam.yaml.main \
205+
--yaml_pipeline_file transforms/io/iceberg_read.yaml
206+
```
207+
208+
**_Optional_**: To run the pipeline on Dataflow, service account key is
209+
[not needed](
210+
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/INSTALL.md).
211+
Omit the authentication settings in the Hadoop catalog configuration `
212+
config_properties`, and run:
213+
214+
```sh
215+
export REGION="us-central1"
216+
export JOB_NAME="demo-iceberg_write-`date +%Y%m%d-%H%M%S`"
217+
218+
gcloud dataflow yaml run $JOB_NAME \
219+
--yaml-pipeline-file transforms/io/iceberg_write.yaml \
220+
--region $REGION
221+
```
222+
223+
```sh
224+
export REGION="us-central1"
225+
export JOB_NAME="demo-iceberg_read-`date +%Y%m%d-%H%M%S`"
226+
227+
gcloud dataflow yaml run $JOB_NAME \
228+
--yaml-pipeline-file transforms/io/iceberg_read.yaml \
229+
--region $REGION
230+
```
89231

90232
### ML
233+
91234
These examples leverage the built-in `Enrichment` transform for performing
92235
ML enrichments.
93236

94237
More information can be found about aggregation transforms
95-
[here](https://beam.apache.org/documentation/sdks/yaml-combine/).
238+
[here](https://beam.apache.org/documentation/sdks/yaml-combine/).

sdks/python/apache_beam/yaml/examples/testing/examples_test.py

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def test_enrichment(
5858
5959
This PTransform simulates the behavior of the Enrichment transform by
6060
looking up data from predefined in-memory tables based on the provided
61-
`enrichment_handler` and `handler_config`.
61+
`enrichment_handler` and `handler_config`.
6262
6363
Note: The Github action that invokes these tests does not have gcp
6464
dependencies installed which is a prerequisite to
@@ -111,10 +111,25 @@ def _fn(row):
111111
return pcoll | beam.Map(_fn)
112112

113113

114+
@beam.ptransform.ptransform_fn
115+
def test_kafka_read(
116+
pcoll,
117+
format,
118+
topic,
119+
bootstrap_servers,
120+
auto_offset_reset_config,
121+
consumer_config):
122+
return (
123+
pcoll | beam.Create(input_data.text_data().split('\n'))
124+
| beam.Map(lambda element: beam.Row(payload=element.encode('utf-8'))))
125+
126+
114127
TEST_PROVIDERS = {
115-
'TestEnrichment': test_enrichment,
128+
'TestEnrichment': test_enrichment, 'TestReadFromKafka': test_kafka_read
116129
}
117130

131+
INPUT_TRANSFORM_TEST_PROVIDERS = ['TestReadFromKafka']
132+
118133

119134
def check_output(expected: List[str]):
120135
"""
@@ -184,7 +199,11 @@ def test_yaml_example(self):
184199
actual = [
185200
yaml_transform.expand_pipeline(
186201
p,
187-
pipeline_spec, [yaml_provider.InlineProvider(TEST_PROVIDERS)])
202+
pipeline_spec,
203+
[
204+
yaml_provider.InlineProvider(
205+
TEST_PROVIDERS, INPUT_TRANSFORM_TEST_PROVIDERS)
206+
])
188207
]
189208
if not actual[0]:
190209
actual = list(p.transforms_stack[0].parts[-1].outputs.values())
@@ -373,9 +392,30 @@ def _wordcount_test_preprocessor(
373392
env.input_file('kinglear.txt', '\n'.join(lines)))
374393

375394

395+
@YamlExamplesTestSuite.register_test_preprocessor('test_kafka_yaml')
396+
def _kafka_test_preprocessor(
397+
test_spec: dict, expected: List[str], env: TestEnvironment):
398+
399+
test_spec = replace_recursive(
400+
test_spec,
401+
'ReadFromText',
402+
'path',
403+
env.input_file('kinglear.txt', input_data.text_data()))
404+
405+
if pipeline := test_spec.get('pipeline', None):
406+
for transform in pipeline.get('transforms', []):
407+
if transform.get('type', '') == 'ReadFromKafka':
408+
transform['type'] = 'TestReadFromKafka'
409+
410+
return test_spec
411+
412+
376413
@YamlExamplesTestSuite.register_test_preprocessor([
377414
'test_simple_filter_yaml',
378415
'test_simple_filter_and_combine_yaml',
416+
'test_iceberg_read_yaml',
417+
'test_iceberg_write_yaml',
418+
'test_kafka_yaml',
379419
'test_spanner_read_yaml',
380420
'test_spanner_write_yaml',
381421
'test_enrich_spanner_with_bigquery_yaml'
@@ -417,9 +457,10 @@ def _io_write_test_preprocessor(
417457
def _file_io_read_test_preprocessor(
418458
test_spec: dict, expected: List[str], env: TestEnvironment):
419459
"""
420-
This preprocessor replaces any ReadFrom transform with a Create transform
421-
that reads from a predefined in-memory dictionary. This allows the test
422-
to verify the pipeline's correctness without relying on external files.
460+
This preprocessor replaces any file IO ReadFrom transform with a Create
461+
transform that reads from a predefined in-memory dictionary. This allows
462+
the test to verify the pipeline's correctness without relying on external
463+
files.
423464
424465
Args:
425466
test_spec: The dictionary representation of the YAML pipeline specification.
@@ -445,6 +486,47 @@ def _file_io_read_test_preprocessor(
445486
return test_spec
446487

447488

489+
@YamlExamplesTestSuite.register_test_preprocessor(['test_iceberg_read_yaml'])
490+
def _iceberg_io_read_test_preprocessor(
491+
test_spec: dict, expected: List[str], env: TestEnvironment):
492+
"""
493+
Preprocessor for tests that involve reading from Iceberg.
494+
495+
This preprocessor replaces any ReadFromIceberg transform with a Create
496+
transform that reads from a predefined in-memory dictionary. This allows
497+
the test to verify the pipeline's correctness without relying on Iceberg
498+
tables stored externally.
499+
500+
Args:
501+
test_spec: The dictionary representation of the YAML pipeline specification.
502+
expected: A list of strings representing the expected output of the
503+
pipeline.
504+
env: The TestEnvironment object providing utilities for creating temporary
505+
files.
506+
507+
Returns:
508+
The modified test_spec dictionary with ReadFromIceberg transforms replaced.
509+
"""
510+
if pipeline := test_spec.get('pipeline', None):
511+
for transform in pipeline.get('transforms', []):
512+
if transform.get('type', '') == 'ReadFromIceberg':
513+
config = transform['config']
514+
(db_name, table_name,
515+
field_value_dynamic_destinations) = config['table'].split('.')
516+
517+
transform['type'] = 'Create'
518+
transform['config'] = {
519+
k: v
520+
for k, v in config.items() if k.startswith('__')
521+
}
522+
transform['config']['elements'] = INPUT_TABLES[(
523+
str(db_name),
524+
str(table_name),
525+
str(field_value_dynamic_destinations))]
526+
527+
return test_spec
528+
529+
448530
@YamlExamplesTestSuite.register_test_preprocessor(
449531
['test_spanner_read_yaml', 'test_enrich_spanner_with_bigquery_yaml'])
450532
def _spanner_io_read_test_preprocessor(
@@ -531,6 +613,7 @@ def _enrichment_test_preprocessor(
531613
spanner_shipments_data(),
532614
('orders-test', 'order-database', 'orders'): input_data.
533615
spanner_orders_data(),
616+
('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(),
534617
('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data.
535618
bigtable_data(),
536619
('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data()

0 commit comments

Comments
 (0)