Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion sdks/python/apache_beam/yaml/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
Build this jar for running with the run command in the next stage:

```
cd <path_to_beam_repo>/beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar
cd <PATH_TO_BEAM_REPO>/beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar
```

## Example Run
Expand Down Expand Up @@ -70,6 +70,10 @@ pytest -v testing/

or

pytest -v testing/examples_test.py::JinjaTest

or

python -m unittest -v testing/examples_test.py
```

Expand Down Expand Up @@ -229,6 +233,22 @@ gcloud dataflow yaml run $JOB_NAME \
--region $REGION
```

### Jinja

Jinja [templatization](https://beam.apache.org/documentation/sdks/yaml/#jinja-templatization)
can be used to build off of different contexts and/or with different
configurations.

Several examples will be created based on the already used word count example
by leveraging Jinja templating engine for dynamic pipeline generation based on
inputs from the user through `% include`, `% import`, and inheritance
directives.

Jinja `% include` directive:
- [wordCountInclude.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml)
- [Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md) on how to run the pipeline.


### ML

Examples that include the built-in `Enrichment` transform for performing
Expand Down
134 changes: 129 additions & 5 deletions sdks/python/apache_beam/yaml/examples/testing/examples_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
import random
import re
import sys
import unittest
from typing import Any
Expand All @@ -33,6 +34,9 @@

import pytest
import yaml
from jinja2 import DictLoader
from jinja2 import Environment
from jinja2 import StrictUndefined

import apache_beam as beam
from apache_beam import PCollection
Expand Down Expand Up @@ -339,8 +343,21 @@ def test_yaml_example(self):
for i, line in enumerate(expected):
expected[i] = line.replace('# ', '').replace('\n', '')
expected = [line for line in expected if line]

raw_spec_string = ''.join(lines)
# Filter for any jinja preprocessor - this has to be done before other
# preprocessors.
jinja_preprocessor = [
preprocessor for preprocessor in custom_preprocessors
if 'jinja_preprocessor' in preprocessor.__name__
]
if jinja_preprocessor:
jinja_preprocessor = jinja_preprocessor[0]
raw_spec_string = jinja_preprocessor(raw_spec_string)
custom_preprocessors.remove(jinja_preprocessor)

pipeline_spec = yaml.load(
''.join(lines), Loader=yaml_transform.SafeLineLoader)
raw_spec_string, Loader=yaml_transform.SafeLineLoader)

with TestEnvironment() as env:
for fn in custom_preprocessors:
Expand Down Expand Up @@ -513,8 +530,9 @@ def apply(preprocessor):
return apply


@YamlExamplesTestSuite.register_test_preprocessor('test_wordcount_minimal_yaml')
def _wordcount_test_preprocessor(
@YamlExamplesTestSuite.register_test_preprocessor(
['test_wordcount_minimal_yaml'])
def _wordcount_minimal_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
"""
Preprocessor for the wordcount_minimal.yaml test.
Expand All @@ -523,6 +541,8 @@ def _wordcount_test_preprocessor(
of the wordcount example. This allows the test to verify the pipeline's
correctness without relying on a fixed input file.

Based on this expected output: # Row(word='king', count=311)

Args:
test_spec: The dictionary representation of the YAML pipeline specification.
expected: A list of strings representing the expected output of the
Expand All @@ -538,8 +558,64 @@ def _wordcount_test_preprocessor(
word = element.split('=')[1].split(',')[0].replace("'", '')
count = int(element.split('=')[2].replace(')', ''))
all_words += [word] * count
random.shuffle(all_words)

return _wordcount_random_shuffler(test_spec, all_words, env)


@YamlExamplesTestSuite.register_test_preprocessor(
['test_wordCountInclude_yaml'])
def _wordcount_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
"""
Preprocessor for the wordcount Jinja tests.

This preprocessor generates a random input file based on the expected output
of the wordcount example. This allows the test to verify the pipeline's
correctness without relying on a fixed input file.

Based on this expected output: # Row(output='king - 311')

Args:
test_spec: The dictionary representation of the YAML pipeline specification.
expected: A list of strings representing the expected output of the
pipeline.
env: The TestEnvironment object providing utilities for creating temporary
files.

Returns:
The modified test_spec dictionary with the input file path replaced.
"""
all_words = []
for element in expected:
match = re.search(r"output='(.*) - (\d+)'", element)
if match:
word, count_str = match.groups()
all_words += [word] * int(count_str)
return _wordcount_random_shuffler(test_spec, all_words, env)


def _wordcount_random_shuffler(
test_spec: dict, all_words: List[str], env: TestEnvironment):
"""
Helper function to create a randomized input file for wordcount-style tests.

This function takes a list of words, shuffles them, and arranges them into
randomly sized lines. It then creates a temporary input file with this
content and updates the provided test specification to use this file as
the input for a 'ReadFromText' transform.

Args:
test_spec: The dictionary representation of the YAML pipeline specification.
all_words: A list of strings, where each string is a word to be included
in the generated input file.
env: The TestEnvironment object providing utilities for creating temporary
files.

Returns:
The modified test_spec dictionary with the input file path for
'ReadFromText' replaced with the path to the newly generated file.
"""
random.shuffle(all_words)
lines = []
while all_words:
line_length = random.randint(1, min(10, len(all_words)))
Expand Down Expand Up @@ -599,7 +675,8 @@ def _kafka_test_preprocessor(
'test_streaming_sentiment_analysis_yaml',
'test_iceberg_migration_yaml',
'test_ml_preprocessing_yaml',
'test_anomaly_scoring_yaml'
'test_anomaly_scoring_yaml',
'test_wordCountInclude_yaml'
])
def _io_write_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
Expand Down Expand Up @@ -1175,6 +1252,50 @@ def _batch_log_analysis_test_preprocessor(
return test_spec


@YamlExamplesTestSuite.register_test_preprocessor(
['test_wordCountInclude_yaml'])
def _jinja_preprocessor(raw_spec_string: str):
"""
Preprocessor for Jinja-based YAML tests.

This function takes a raw YAML string, which is treated as a Jinja2
template, and renders it to produce the final pipeline specification.
It specifically handles templates that use the `{% include ... %}`
directive by manually loading the content of the included files from the
filesystem.

The Jinja variables required for rendering are loaded from a predefined
data source.

Args:
raw_spec_string: A string containing the raw YAML content, which is a
Jinja2 template.

Returns:
A string containing the fully rendered YAML pipeline specification.
"""

jinja_variables = json.loads(input_data.word_count_jinja_parameter_data())
test_file_dir = os.path.dirname(__file__)
sdk_root = os.path.abspath(os.path.join(test_file_dir, '../../../..'))

include_files = input_data.word_count_jinja_template_data()
mock_templates = {'main_template': raw_spec_string}
for file_path in include_files:
full_path = os.path.join(sdk_root, file_path)
with open(full_path, 'r', encoding='utf-8') as f:
mock_templates[file_path] = f.read()

# Can't use the standard expand_jinja method due to it not supporting
# `% include` jinja templization.
# TODO(#35936): Maybe update expand_jinja to handle this case.
jinja_env = Environment(
loader=DictLoader(mock_templates), undefined=StrictUndefined)
template = jinja_env.get_template('main_template')
rendered_yaml_string = template.render(jinja_variables)
return rendered_yaml_string


INPUT_FILES = {
'products.csv': input_data.products_csv(),
'kinglear.txt': input_data.text_data(),
Expand Down Expand Up @@ -1216,6 +1337,9 @@ def _batch_log_analysis_test_preprocessor(
os.path.join(YAML_DOCS_DIR, '../transforms/elementwise/*.yaml')).run()
ExamplesTest = YamlExamplesTestSuite(
'ExamplesTest', os.path.join(YAML_DOCS_DIR, '../*.yaml')).run()
JinjaTest = YamlExamplesTestSuite(
'JinjaExamplesTest',
os.path.join(YAML_DOCS_DIR, '../transforms/jinja/**/*.yaml')).run()
IOTest = YamlExamplesTestSuite(
'IOExamplesTest', os.path.join(YAML_DOCS_DIR,
'../transforms/io/*.yaml')).run()
Expand Down
30 changes: 30 additions & 0 deletions sdks/python/apache_beam/yaml/examples/testing/input_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ def text_data():
])


def word_count_jinja_parameter_data():
return \
'{"readFromTextTransform": {"path": ' \
'"gs://dataflow-samples/shakespeare/kinglear.txt"}, ' \
'"mapToFieldsSplitConfig": {"language": "python", ' \
'"fields":{"value":"1"}}, ' \
'"explodeTransform":{"fields":"word"}, ' \
'"combineTransform":{"group_by":"word", "combine":{"value":"sum"}}, ' \
'"mapToFieldsCountConfig":{"language": "python", "fields":' \
'{"output": "word + \\" - \\" + str(value)"}}, ' \
'"writeToTextTransform":' \
'{"path":"gs://apache-beam-testing-derrickaw/wordCounts/"}}'


def word_count_jinja_template_data():
return \
[('apache_beam/yaml/examples/transforms/jinja/'
'include/submodules/readFromTextTransform.yaml'),
('apache_beam/yaml/examples/transforms/jinja/'
'include/submodules/mapToFieldsSplitConfig.yaml'),
('apache_beam/yaml/examples/transforms/jinja/'
'include/submodules/explodeTransform.yaml'),
('apache_beam/yaml/examples/transforms/jinja/'
'include/submodules/combineTransform.yaml'),
('apache_beam/yaml/examples/transforms/jinja/'
'include/submodules/mapToFieldsCountConfig.yaml'),
('apache_beam/yaml/examples/transforms/jinja/'
'include/submodules/writeToTextTransform.yaml')]


def iceberg_dynamic_destinations_users_data():
return [{
'id': 3, 'name': 'Smith', 'email': 'smith@example.com', 'zip': 'NY'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Jinja % include Pipeline

This example leverages the `% include` Jinja directive by having one main
pipeline and then submodules for each transformed used.

General setup:
```sh
export PIPELINE_FILE=apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml
export KINGLEAR="gs://dataflow-samples/shakespeare/kinglear.txt"
export TEMP_LOCATION="gs://MY-BUCKET/wordCounts/"

cd <PATH_TO_BEAM_REPO>/beam/sdks/python
```

Multiline Run Example:
```sh
python -m apache_beam.yaml.main \
--yaml_pipeline_file="${PIPELINE_FILE}" \
--jinja_variables='{
"readFromTextTransform": {"path": "'"${KINGLEAR}"'"},
"mapToFieldsSplitConfig": {
"language": "python",
"fields": {
"value": "1"
}
},
"explodeTransform": {"fields": "word"},
"combineTransform": {
"group_by": "word",
"combine": {"value": "sum"}
},
"mapToFieldsCountConfig": {
"language": "python",
"fields": {"output": "word + \" - \" + str(value)"}
},
"writeToTextTransform": {"path": "'"${TEMP_LOCATION}"'"}
}'
```

Single Line Run Example:
```sh
python -m apache_beam.yaml.main --yaml_pipeline_file="${PIPELINE_FILE}" --jinja_variables='{"readFromTextTransform": {"path": "gs://dataflow-samples/shakespeare/kinglear.txt"}, "mapToFieldsSplitConfig": {"language": "python", "fields":{"value":"1"}}, "explodeTransform":{"fields":"word"}, "combineTransform":{"group_by":"word", "combine":{"value":"sum"}}, "mapToFieldsCountConfig":{"language": "python", "fields":{"output":"word + \" - \" + str(value)"}}, "writeToTextTransform":{"path":"${TEMP_LOCATION}"}}'
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# The Combine transform groups by each word and count the occurrences.

- name: Count words
type: Combine
config:
group_by:
- {{combineTransform.group_by}}
combine:
value: {{combineTransform.combine.value}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# The Explode transform to take arrays of words and emit each word as a
# separate element.

- name: Explode word arrays
type: Explode
config:
fields:
- {{explodeTransform.fields}}
Loading
Loading