Skip to content

Commit 90f8c50

Browse files
authored
[YAML] - Jinja % include example (#35914)
* update readme for jinja * fix conflicts * add jinja data * add jinja submodules * fix lint, whitespace, and save file issues * fix rebase conflict and exception etc * update readme * move around files * add new readme * move more files around * fix whitespace * fix additional errors after shifting folders * address comments * fix jinja processor * fix readme per new names for submodules * add todo ids * add comments * minor gemini fixes
1 parent c155a39 commit 90f8c50

File tree

11 files changed

+490
-6
lines changed

11 files changed

+490
-6
lines changed

sdks/python/apache_beam/yaml/examples/README.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
Build this jar for running with the run command in the next stage:
3838

3939
```
40-
cd <path_to_beam_repo>/beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar
40+
cd <PATH_TO_BEAM_REPO>/beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar
4141
```
4242

4343
## Example Run
@@ -70,6 +70,10 @@ pytest -v testing/
7070
7171
or
7272
73+
pytest -v testing/examples_test.py::JinjaTest
74+
75+
or
76+
7377
python -m unittest -v testing/examples_test.py
7478
```
7579

@@ -229,6 +233,22 @@ gcloud dataflow yaml run $JOB_NAME \
229233
--region $REGION
230234
```
231235

236+
### Jinja
237+
238+
Jinja [templatization](https://beam.apache.org/documentation/sdks/yaml/#jinja-templatization)
239+
can be used to build off of different contexts and/or with different
240+
configurations.
241+
242+
Several examples will be created based on the already used word count example
243+
by leveraging Jinja templating engine for dynamic pipeline generation based on
244+
inputs from the user through `% include`, `% import`, and inheritance
245+
directives.
246+
247+
Jinja `% include` directive:
248+
- [wordCountInclude.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml)
249+
- [Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md) on how to run the pipeline.
250+
251+
232252
### ML
233253

234254
Examples that include the built-in `Enrichment` transform for performing

sdks/python/apache_beam/yaml/examples/testing/examples_test.py

Lines changed: 129 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import logging
2222
import os
2323
import random
24+
import re
2425
import sys
2526
import unittest
2627
from typing import Any
@@ -33,6 +34,9 @@
3334

3435
import pytest
3536
import yaml
37+
from jinja2 import DictLoader
38+
from jinja2 import Environment
39+
from jinja2 import StrictUndefined
3640

3741
import apache_beam as beam
3842
from apache_beam import PCollection
@@ -339,8 +343,21 @@ def test_yaml_example(self):
339343
for i, line in enumerate(expected):
340344
expected[i] = line.replace('# ', '').replace('\n', '')
341345
expected = [line for line in expected if line]
346+
347+
raw_spec_string = ''.join(lines)
348+
# Filter for any jinja preprocessor - this has to be done before other
349+
# preprocessors.
350+
jinja_preprocessor = [
351+
preprocessor for preprocessor in custom_preprocessors
352+
if 'jinja_preprocessor' in preprocessor.__name__
353+
]
354+
if jinja_preprocessor:
355+
jinja_preprocessor = jinja_preprocessor[0]
356+
raw_spec_string = jinja_preprocessor(raw_spec_string)
357+
custom_preprocessors.remove(jinja_preprocessor)
358+
342359
pipeline_spec = yaml.load(
343-
''.join(lines), Loader=yaml_transform.SafeLineLoader)
360+
raw_spec_string, Loader=yaml_transform.SafeLineLoader)
344361

345362
with TestEnvironment() as env:
346363
for fn in custom_preprocessors:
@@ -513,8 +530,9 @@ def apply(preprocessor):
513530
return apply
514531

515532

516-
@YamlExamplesTestSuite.register_test_preprocessor('test_wordcount_minimal_yaml')
517-
def _wordcount_test_preprocessor(
533+
@YamlExamplesTestSuite.register_test_preprocessor(
534+
['test_wordcount_minimal_yaml'])
535+
def _wordcount_minimal_test_preprocessor(
518536
test_spec: dict, expected: List[str], env: TestEnvironment):
519537
"""
520538
Preprocessor for the wordcount_minimal.yaml test.
@@ -523,6 +541,8 @@ def _wordcount_test_preprocessor(
523541
of the wordcount example. This allows the test to verify the pipeline's
524542
correctness without relying on a fixed input file.
525543
544+
Based on this expected output: # Row(word='king', count=311)
545+
526546
Args:
527547
test_spec: The dictionary representation of the YAML pipeline specification.
528548
expected: A list of strings representing the expected output of the
@@ -538,8 +558,64 @@ def _wordcount_test_preprocessor(
538558
word = element.split('=')[1].split(',')[0].replace("'", '')
539559
count = int(element.split('=')[2].replace(')', ''))
540560
all_words += [word] * count
541-
random.shuffle(all_words)
542561

562+
return _wordcount_random_shuffler(test_spec, all_words, env)
563+
564+
565+
@YamlExamplesTestSuite.register_test_preprocessor(
566+
['test_wordCountInclude_yaml'])
567+
def _wordcount_jinja_test_preprocessor(
568+
test_spec: dict, expected: List[str], env: TestEnvironment):
569+
"""
570+
Preprocessor for the wordcount Jinja tests.
571+
572+
This preprocessor generates a random input file based on the expected output
573+
of the wordcount example. This allows the test to verify the pipeline's
574+
correctness without relying on a fixed input file.
575+
576+
Based on this expected output: # Row(output='king - 311')
577+
578+
Args:
579+
test_spec: The dictionary representation of the YAML pipeline specification.
580+
expected: A list of strings representing the expected output of the
581+
pipeline.
582+
env: The TestEnvironment object providing utilities for creating temporary
583+
files.
584+
585+
Returns:
586+
The modified test_spec dictionary with the input file path replaced.
587+
"""
588+
all_words = []
589+
for element in expected:
590+
match = re.search(r"output='(.*) - (\d+)'", element)
591+
if match:
592+
word, count_str = match.groups()
593+
all_words += [word] * int(count_str)
594+
return _wordcount_random_shuffler(test_spec, all_words, env)
595+
596+
597+
def _wordcount_random_shuffler(
598+
test_spec: dict, all_words: List[str], env: TestEnvironment):
599+
"""
600+
Helper function to create a randomized input file for wordcount-style tests.
601+
602+
This function takes a list of words, shuffles them, and arranges them into
603+
randomly sized lines. It then creates a temporary input file with this
604+
content and updates the provided test specification to use this file as
605+
the input for a 'ReadFromText' transform.
606+
607+
Args:
608+
test_spec: The dictionary representation of the YAML pipeline specification.
609+
all_words: A list of strings, where each string is a word to be included
610+
in the generated input file.
611+
env: The TestEnvironment object providing utilities for creating temporary
612+
files.
613+
614+
Returns:
615+
The modified test_spec dictionary with the input file path for
616+
'ReadFromText' replaced with the path to the newly generated file.
617+
"""
618+
random.shuffle(all_words)
543619
lines = []
544620
while all_words:
545621
line_length = random.randint(1, min(10, len(all_words)))
@@ -599,7 +675,8 @@ def _kafka_test_preprocessor(
599675
'test_streaming_sentiment_analysis_yaml',
600676
'test_iceberg_migration_yaml',
601677
'test_ml_preprocessing_yaml',
602-
'test_anomaly_scoring_yaml'
678+
'test_anomaly_scoring_yaml',
679+
'test_wordCountInclude_yaml'
603680
])
604681
def _io_write_test_preprocessor(
605682
test_spec: dict, expected: List[str], env: TestEnvironment):
@@ -1175,6 +1252,50 @@ def _batch_log_analysis_test_preprocessor(
11751252
return test_spec
11761253

11771254

1255+
@YamlExamplesTestSuite.register_test_preprocessor(
1256+
['test_wordCountInclude_yaml'])
1257+
def _jinja_preprocessor(raw_spec_string: str):
1258+
"""
1259+
Preprocessor for Jinja-based YAML tests.
1260+
1261+
This function takes a raw YAML string, which is treated as a Jinja2
1262+
template, and renders it to produce the final pipeline specification.
1263+
It specifically handles templates that use the `{% include ... %}`
1264+
directive by manually loading the content of the included files from the
1265+
filesystem.
1266+
1267+
The Jinja variables required for rendering are loaded from a predefined
1268+
data source.
1269+
1270+
Args:
1271+
raw_spec_string: A string containing the raw YAML content, which is a
1272+
Jinja2 template.
1273+
1274+
Returns:
1275+
A string containing the fully rendered YAML pipeline specification.
1276+
"""
1277+
1278+
jinja_variables = json.loads(input_data.word_count_jinja_parameter_data())
1279+
test_file_dir = os.path.dirname(__file__)
1280+
sdk_root = os.path.abspath(os.path.join(test_file_dir, '../../../..'))
1281+
1282+
include_files = input_data.word_count_jinja_template_data()
1283+
mock_templates = {'main_template': raw_spec_string}
1284+
for file_path in include_files:
1285+
full_path = os.path.join(sdk_root, file_path)
1286+
with open(full_path, 'r', encoding='utf-8') as f:
1287+
mock_templates[file_path] = f.read()
1288+
1289+
# Can't use the standard expand_jinja method due to it not supporting
1290+
# `% include` jinja templization.
1291+
# TODO(#35936): Maybe update expand_jinja to handle this case.
1292+
jinja_env = Environment(
1293+
loader=DictLoader(mock_templates), undefined=StrictUndefined)
1294+
template = jinja_env.get_template('main_template')
1295+
rendered_yaml_string = template.render(jinja_variables)
1296+
return rendered_yaml_string
1297+
1298+
11781299
INPUT_FILES = {
11791300
'products.csv': input_data.products_csv(),
11801301
'kinglear.txt': input_data.text_data(),
@@ -1216,6 +1337,9 @@ def _batch_log_analysis_test_preprocessor(
12161337
os.path.join(YAML_DOCS_DIR, '../transforms/elementwise/*.yaml')).run()
12171338
ExamplesTest = YamlExamplesTestSuite(
12181339
'ExamplesTest', os.path.join(YAML_DOCS_DIR, '../*.yaml')).run()
1340+
JinjaTest = YamlExamplesTestSuite(
1341+
'JinjaExamplesTest',
1342+
os.path.join(YAML_DOCS_DIR, '../transforms/jinja/**/*.yaml')).run()
12191343
IOTest = YamlExamplesTestSuite(
12201344
'IOExamplesTest', os.path.join(YAML_DOCS_DIR,
12211345
'../transforms/io/*.yaml')).run()

sdks/python/apache_beam/yaml/examples/testing/input_data.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# limitations under the License.
1717
#
1818

19+
import json
1920
import typing
2021

2122
from apache_beam.io.gcp.pubsub import PubsubMessage
@@ -33,6 +34,53 @@ def text_data():
3334
])
3435

3536

37+
def word_count_jinja_parameter_data():
38+
params = {
39+
"readFromTextTransform": {
40+
"path": "gs://dataflow-samples/shakespeare/kinglear.txt"
41+
},
42+
"mapToFieldsSplitConfig": {
43+
"language": "python", "fields": {
44+
"value": "1"
45+
}
46+
},
47+
"explodeTransform": {
48+
"fields": "word"
49+
},
50+
"combineTransform": {
51+
"group_by": "word", "combine": {
52+
"value": "sum"
53+
}
54+
},
55+
"mapToFieldsCountConfig": {
56+
"language": "python",
57+
"fields": {
58+
"output": "word + \" - \" + str(value)"
59+
}
60+
},
61+
"writeToTextTransform": {
62+
"path": "gs://apache-beam-testing-derrickaw/wordCounts/"
63+
}
64+
}
65+
return json.dumps(params)
66+
67+
68+
def word_count_jinja_template_data():
69+
return \
70+
[('apache_beam/yaml/examples/transforms/jinja/'
71+
'include/submodules/readFromTextTransform.yaml'),
72+
('apache_beam/yaml/examples/transforms/jinja/'
73+
'include/submodules/mapToFieldsSplitConfig.yaml'),
74+
('apache_beam/yaml/examples/transforms/jinja/'
75+
'include/submodules/explodeTransform.yaml'),
76+
('apache_beam/yaml/examples/transforms/jinja/'
77+
'include/submodules/combineTransform.yaml'),
78+
('apache_beam/yaml/examples/transforms/jinja/'
79+
'include/submodules/mapToFieldsCountConfig.yaml'),
80+
('apache_beam/yaml/examples/transforms/jinja/'
81+
'include/submodules/writeToTextTransform.yaml')]
82+
83+
3684
def iceberg_dynamic_destinations_users_data():
3785
return [{
3886
'id': 3, 'name': 'Smith', 'email': 'smith@example.com', 'zip': 'NY'
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
## Jinja % include Pipeline
21+
22+
This example leverages the `% include` Jinja directive by having one main
23+
pipeline and then submodules for each transformed used.
24+
25+
General setup:
26+
```sh
27+
export PIPELINE_FILE=apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml
28+
export KINGLEAR="gs://dataflow-samples/shakespeare/kinglear.txt"
29+
export TEMP_LOCATION="gs://MY-BUCKET/wordCounts/"
30+
31+
cd <PATH_TO_BEAM_REPO>/beam/sdks/python
32+
```
33+
34+
Multiline Run Example:
35+
```sh
36+
python -m apache_beam.yaml.main \
37+
--yaml_pipeline_file="${PIPELINE_FILE}" \
38+
--jinja_variables='{
39+
"readFromTextTransform": {"path": "'"${KINGLEAR}"'"},
40+
"mapToFieldsSplitConfig": {
41+
"language": "python",
42+
"fields": {
43+
"value": "1"
44+
}
45+
},
46+
"explodeTransform": {"fields": "word"},
47+
"combineTransform": {
48+
"group_by": "word",
49+
"combine": {"value": "sum"}
50+
},
51+
"mapToFieldsCountConfig": {
52+
"language": "python",
53+
"fields": {"output": "word + \" - \" + str(value)"}
54+
},
55+
"writeToTextTransform": {"path": "'"${TEMP_LOCATION}"'"}
56+
}'
57+
```
58+
59+
Single Line Run Example:
60+
```sh
61+
python -m apache_beam.yaml.main --yaml_pipeline_file="${PIPELINE_FILE}" --jinja_variables='{"readFromTextTransform": {"path": "gs://dataflow-samples/shakespeare/kinglear.txt"}, "mapToFieldsSplitConfig": {"language": "python", "fields":{"value":"1"}}, "explodeTransform":{"fields":"word"}, "combineTransform":{"group_by":"word", "combine":{"value":"sum"}}, "mapToFieldsCountConfig":{"language": "python", "fields":{"output":"word + \" - \" + str(value)"}}, "writeToTextTransform":{"path":"${TEMP_LOCATION}"}}'
62+
```
63+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# coding=utf-8
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
#
19+
# The Combine transform groups by each word and count the occurrences.
20+
21+
- name: Count words
22+
type: Combine
23+
config:
24+
group_by:
25+
- {{combineTransform.group_by}}
26+
combine:
27+
value: {{combineTransform.combine.value}}

0 commit comments

Comments
 (0)