Skip to content

Commit ac00807

Browse files
derrickawtvalentyn
andauthored
[YAML]: add import jinja pipeline example (#35945)
* add import jinja pipeline example * revert name change * update overall examples readme * fix lint issue * fix gemini small issue * Update sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md --------- Co-authored-by: tvalentyn <[email protected]>
1 parent a7ec1ae commit ac00807

File tree

6 files changed

+230
-21
lines changed

6 files changed

+230
-21
lines changed

sdks/python/apache_beam/yaml/examples/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,10 @@ by leveraging Jinja templating engine for dynamic pipeline generation based on
245245
inputs from the user through `% include`, `% import`, and inheritance
246246
directives.
247247

248+
Jinja `% import` directive:
249+
- [wordCountImport.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml)
250+
- [Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/import/README.md) on how to run the pipeline.
251+
248252
Jinja `% include` directive:
249253
- [wordCountInclude.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml)
250254
- [Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md) on how to run the pipeline.

sdks/python/apache_beam/yaml/examples/testing/examples_test.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,8 @@ def test_yaml_example(self):
353353
]
354354
if jinja_preprocessor:
355355
jinja_preprocessor = jinja_preprocessor[0]
356-
raw_spec_string = jinja_preprocessor(raw_spec_string)
356+
raw_spec_string = jinja_preprocessor(
357+
raw_spec_string, self._testMethodName)
357358
custom_preprocessors.remove(jinja_preprocessor)
358359

359360
pipeline_spec = yaml.load(
@@ -563,7 +564,7 @@ def _wordcount_minimal_test_preprocessor(
563564

564565

565566
@YamlExamplesTestSuite.register_test_preprocessor(
566-
['test_wordCountInclude_yaml'])
567+
['test_wordCountInclude_yaml', 'test_wordCountImport_yaml'])
567568
def _wordcount_jinja_test_preprocessor(
568569
test_spec: dict, expected: List[str], env: TestEnvironment):
569570
"""
@@ -676,7 +677,8 @@ def _kafka_test_preprocessor(
676677
'test_iceberg_migration_yaml',
677678
'test_ml_preprocessing_yaml',
678679
'test_anomaly_scoring_yaml',
679-
'test_wordCountInclude_yaml'
680+
'test_wordCountInclude_yaml',
681+
'test_wordCountImport_yaml'
680682
])
681683
def _io_write_test_preprocessor(
682684
test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -1253,8 +1255,8 @@ def _batch_log_analysis_test_preprocessor(
12531255

12541256

12551257
@YamlExamplesTestSuite.register_test_preprocessor(
1256-
['test_wordCountInclude_yaml'])
1257-
def _jinja_preprocessor(raw_spec_string: str):
1258+
['test_wordCountInclude_yaml', 'test_wordCountImport_yaml'])
1259+
def _jinja_preprocessor(raw_spec_string: str, test_name: str):
12581260
"""
12591261
Preprocessor for Jinja-based YAML tests.
12601262
@@ -1274,12 +1276,11 @@ def _jinja_preprocessor(raw_spec_string: str):
12741276
Returns:
12751277
A string containing the fully rendered YAML pipeline specification.
12761278
"""
1277-
12781279
jinja_variables = json.loads(input_data.word_count_jinja_parameter_data())
12791280
test_file_dir = os.path.dirname(__file__)
12801281
sdk_root = os.path.abspath(os.path.join(test_file_dir, '../../../..'))
12811282

1282-
include_files = input_data.word_count_jinja_template_data()
1283+
include_files = input_data.word_count_jinja_template_data(test_name)
12831284
mock_templates = {'main_template': raw_spec_string}
12841285
for file_path in include_files:
12851286
full_path = os.path.join(sdk_root, file_path)

sdks/python/apache_beam/yaml/examples/testing/input_data.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,28 @@ def word_count_jinja_parameter_data():
6565
return json.dumps(params)
6666

6767

68-
def word_count_jinja_template_data():
69-
return \
70-
[('apache_beam/yaml/examples/transforms/jinja/'
71-
'include/submodules/readFromTextTransform.yaml'),
72-
('apache_beam/yaml/examples/transforms/jinja/'
73-
'include/submodules/mapToFieldsSplitConfig.yaml'),
74-
('apache_beam/yaml/examples/transforms/jinja/'
75-
'include/submodules/explodeTransform.yaml'),
76-
('apache_beam/yaml/examples/transforms/jinja/'
77-
'include/submodules/combineTransform.yaml'),
78-
('apache_beam/yaml/examples/transforms/jinja/'
79-
'include/submodules/mapToFieldsCountConfig.yaml'),
80-
('apache_beam/yaml/examples/transforms/jinja/'
81-
'include/submodules/writeToTextTransform.yaml')]
68+
def word_count_jinja_template_data(test_name: str) -> list[str]:
69+
if test_name == 'test_wordCountInclude_yaml':
70+
return [
71+
'apache_beam/yaml/examples/transforms/jinja/'
72+
'include/submodules/readFromTextTransform.yaml',
73+
'apache_beam/yaml/examples/transforms/jinja/'
74+
'include/submodules/mapToFieldsSplitConfig.yaml',
75+
'apache_beam/yaml/examples/transforms/jinja/'
76+
'include/submodules/explodeTransform.yaml',
77+
'apache_beam/yaml/examples/transforms/jinja/'
78+
'include/submodules/combineTransform.yaml',
79+
'apache_beam/yaml/examples/transforms/jinja/'
80+
'include/submodules/mapToFieldsCountConfig.yaml',
81+
'apache_beam/yaml/examples/transforms/jinja/'
82+
'include/submodules/writeToTextTransform.yaml'
83+
]
84+
elif test_name == 'test_wordCountImport_yaml':
85+
return [
86+
'apache_beam/yaml/examples/transforms/jinja/'
87+
'import/macros/wordCountMacros.yaml'
88+
]
89+
return []
8290

8391

8492
def iceberg_dynamic_destinations_users_data():
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
## Jinja % import Pipeline
21+
22+
This example leverages the `% import` Jinja directive by having one main
23+
pipeline and then one macros file containing all the transforms and configs
24+
used.
25+
26+
General setup:
27+
```sh
28+
export PIPELINE_FILE=apache_beam/yaml/examples/transforms/jinja/import/wordCountImport.yaml
29+
export KINGLEAR="gs://dataflow-samples/shakespeare/kinglear.txt"
30+
export TEMP_LOCATION="gs://MY-BUCKET/wordCounts/"
31+
32+
cd <PATH_TO_BEAM_REPO>/beam/sdks/python
33+
```
34+
35+
Multiline Run Example:
36+
```sh
37+
python -m apache_beam.yaml.main \
38+
--yaml_pipeline_file="${PIPELINE_FILE}" \
39+
--jinja_variables='{
40+
"readFromTextTransform": {"path": "'"${KINGLEAR}"'"},
41+
"mapToFieldsSplitConfig": {
42+
"language": "python",
43+
"fields": {
44+
"value": "1"
45+
}
46+
},
47+
"explodeTransform": {"fields": "word"},
48+
"combineTransform": {
49+
"group_by": "word",
50+
"combine": {"value": "sum"}
51+
},
52+
"mapToFieldsCountConfig": {
53+
"language": "python",
54+
"fields": {"output": "word + \" - \" + str(value)"}
55+
},
56+
"writeToTextTransform": {"path": "'"${TEMP_LOCATION}"'"}
57+
}'
58+
```
59+
60+
Single Line Run Example:
61+
```sh
62+
python -m apache_beam.yaml.main --yaml_pipeline_file="${PIPELINE_FILE}" --jinja_variables='{"readFromTextTransform": {"path": "gs://dataflow-samples/shakespeare/kinglear.txt"}, "mapToFieldsSplitConfig": {"language": "python", "fields":{"value":"1"}}, "explodeTransform":{"fields":"word"}, "combineTransform":{"group_by":"word", "combine":{"value":"sum"}}, "mapToFieldsCountConfig":{"language": "python", "fields":{"output":"word + \" - \" + str(value)"}}, "writeToTextTransform":{"path":"${TEMP_LOCATION}"}}'
63+
```
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
{%- macro readFromTextTransform(params) -%}
19+
20+
- name: Read from GCS
21+
type: ReadFromText
22+
config:
23+
path: "{{ params.path }}"
24+
{%- endmacro -%}
25+
26+
{%- macro mapToFieldsSplitConfig(params) -%}
27+
language: "{{ params.language }}"
28+
fields:
29+
value: "{{ params.fields.value }}"
30+
word:
31+
callable: |-
32+
import re
33+
def my_mapping(row):
34+
return re.findall(r'[A-Za-z\']+', row.line.lower())
35+
{%- endmacro -%}
36+
37+
{%- macro explodeTransform(params) -%}
38+
- name: Explode word arrays
39+
type: Explode
40+
config:
41+
fields: "{{ params.fields }}"
42+
{%- endmacro -%}
43+
44+
{%- macro combineTransform(params) -%}
45+
- name: Count words
46+
type: Combine
47+
config:
48+
group_by: "{{ params.group_by }}"
49+
combine:
50+
value: "{{ params.combine.value }}"
51+
{%- endmacro -%}
52+
53+
{%- macro mapToFieldsCountConfig(params) -%}
54+
language: "{{ params.language }}"
55+
fields:
56+
output: '{{ params.fields.output }}'
57+
{%- endmacro -%}
58+
59+
{%- macro writeToTextTransform(params) -%}
60+
- name: Write to GCS
61+
type: WriteToText
62+
config:
63+
path: "{{ params.path }}"
64+
{%- endmacro -%}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# coding=utf-8
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
# This examples reads from a public file stored on Google Cloud. This
20+
# requires authenticating with Google Cloud, or setting the file in
21+
#`ReadFromText` to a local file.
22+
#
23+
# To set up Application Default Credentials,
24+
# see https://cloud.google.com/docs/authentication/external/set-up-adc.
25+
#
26+
# This pipeline reads in a text file, counts distinct words found in the text,
27+
# then logs a row containing each word and its count.
28+
29+
{% import 'apache_beam/yaml/examples/transforms/jinja/import/macros/wordCountMacros.yaml' as macros %}
30+
31+
pipeline:
32+
type: chain
33+
transforms:
34+
35+
# Read in text file
36+
{{ macros.readFromTextTransform(readFromTextTransform) | indent(4, true) }}
37+
38+
# Split words and count occurrences
39+
- name: Split words
40+
type: MapToFields
41+
config:
42+
{{ macros.mapToFieldsSplitConfig(mapToFieldsSplitConfig) | indent(8, true) }}
43+
44+
# Explode into individual words
45+
{{ macros.explodeTransform(explodeTransform) | indent(4, true) }}
46+
47+
# Group by word
48+
{{ macros.combineTransform(combineTransform) | indent(4, true) }}
49+
50+
# Format output to a single string consisting of `word - count`
51+
- name: Format output
52+
type: MapToFields
53+
config:
54+
{{ macros.mapToFieldsCountConfig(mapToFieldsCountConfig) | indent(8, true) }}
55+
56+
# Write to text file on GCS, locally, etc
57+
{{ macros.writeToTextTransform(writeToTextTransform) | indent(4, true) }}
58+
59+
# Expected:
60+
# Row(output='king - 311')
61+
# Row(output='lear - 253')
62+
# Row(output='dramatis - 1')
63+
# Row(output='personae - 1')
64+
# Row(output='of - 483')
65+
# Row(output='britain - 2')
66+
# Row(output='france - 32')
67+
# Row(output='duke - 26')
68+
# Row(output='burgundy - 20')
69+
# Row(output='cornwall - 75')

0 commit comments

Comments
 (0)