diff --git a/sdks/python/apache_beam/yaml/examples/README.md b/sdks/python/apache_beam/yaml/examples/README.md index c788b6d3e605..4cba973dbead 100644 --- a/sdks/python/apache_beam/yaml/examples/README.md +++ b/sdks/python/apache_beam/yaml/examples/README.md @@ -37,7 +37,7 @@ Build this jar for running with the run command in the next stage: ``` -cd /beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar +cd /beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar ``` ## Example Run @@ -70,6 +70,10 @@ pytest -v testing/ or +pytest -v testing/examples_test.py::JinjaTest + +or + python -m unittest -v testing/examples_test.py ``` @@ -229,6 +233,22 @@ gcloud dataflow yaml run $JOB_NAME \ --region $REGION ``` +### Jinja + +Jinja [templatization](https://beam.apache.org/documentation/sdks/yaml/#jinja-templatization) +can be used to build off of different contexts and/or with different +configurations. + +Several examples will be created based on the already used word count example +by leveraging Jinja templating engine for dynamic pipeline generation based on +inputs from the user through `% include`, `% import`, and inheritance +directives. + +Jinja `% include` directive: +- [wordCountInclude.yaml](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml) +- [Instructions](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md) on how to run the pipeline. + + ### ML Examples that include the built-in `Enrichment` transform for performing diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index d75cdd99431c..0bfcb3f61612 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -21,6 +21,7 @@ import logging import os import random +import re import sys import unittest from typing import Any @@ -33,6 +34,9 @@ import pytest import yaml +from jinja2 import DictLoader +from jinja2 import Environment +from jinja2 import StrictUndefined import apache_beam as beam from apache_beam import PCollection @@ -339,8 +343,21 @@ def test_yaml_example(self): for i, line in enumerate(expected): expected[i] = line.replace('# ', '').replace('\n', '') expected = [line for line in expected if line] + + raw_spec_string = ''.join(lines) + # Filter for any jinja preprocessor - this has to be done before other + # preprocessors. + jinja_preprocessor = [ + preprocessor for preprocessor in custom_preprocessors + if 'jinja_preprocessor' in preprocessor.__name__ + ] + if jinja_preprocessor: + jinja_preprocessor = jinja_preprocessor[0] + raw_spec_string = jinja_preprocessor(raw_spec_string) + custom_preprocessors.remove(jinja_preprocessor) + pipeline_spec = yaml.load( - ''.join(lines), Loader=yaml_transform.SafeLineLoader) + raw_spec_string, Loader=yaml_transform.SafeLineLoader) with TestEnvironment() as env: for fn in custom_preprocessors: @@ -513,8 +530,9 @@ def apply(preprocessor): return apply -@YamlExamplesTestSuite.register_test_preprocessor('test_wordcount_minimal_yaml') -def _wordcount_test_preprocessor( +@YamlExamplesTestSuite.register_test_preprocessor( + ['test_wordcount_minimal_yaml']) +def _wordcount_minimal_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): """ Preprocessor for the wordcount_minimal.yaml test. @@ -523,6 +541,8 @@ def _wordcount_test_preprocessor( of the wordcount example. This allows the test to verify the pipeline's correctness without relying on a fixed input file. + Based on this expected output: # Row(word='king', count=311) + Args: test_spec: The dictionary representation of the YAML pipeline specification. expected: A list of strings representing the expected output of the @@ -538,8 +558,64 @@ def _wordcount_test_preprocessor( word = element.split('=')[1].split(',')[0].replace("'", '') count = int(element.split('=')[2].replace(')', '')) all_words += [word] * count - random.shuffle(all_words) + return _wordcount_random_shuffler(test_spec, all_words, env) + + +@YamlExamplesTestSuite.register_test_preprocessor( + ['test_wordCountInclude_yaml']) +def _wordcount_jinja_test_preprocessor( + test_spec: dict, expected: List[str], env: TestEnvironment): + """ + Preprocessor for the wordcount Jinja tests. + + This preprocessor generates a random input file based on the expected output + of the wordcount example. This allows the test to verify the pipeline's + correctness without relying on a fixed input file. + + Based on this expected output: # Row(output='king - 311') + + Args: + test_spec: The dictionary representation of the YAML pipeline specification. + expected: A list of strings representing the expected output of the + pipeline. + env: The TestEnvironment object providing utilities for creating temporary + files. + + Returns: + The modified test_spec dictionary with the input file path replaced. + """ + all_words = [] + for element in expected: + match = re.search(r"output='(.*) - (\d+)'", element) + if match: + word, count_str = match.groups() + all_words += [word] * int(count_str) + return _wordcount_random_shuffler(test_spec, all_words, env) + + +def _wordcount_random_shuffler( + test_spec: dict, all_words: List[str], env: TestEnvironment): + """ + Helper function to create a randomized input file for wordcount-style tests. + + This function takes a list of words, shuffles them, and arranges them into + randomly sized lines. It then creates a temporary input file with this + content and updates the provided test specification to use this file as + the input for a 'ReadFromText' transform. + + Args: + test_spec: The dictionary representation of the YAML pipeline specification. + all_words: A list of strings, where each string is a word to be included + in the generated input file. + env: The TestEnvironment object providing utilities for creating temporary + files. + + Returns: + The modified test_spec dictionary with the input file path for + 'ReadFromText' replaced with the path to the newly generated file. + """ + random.shuffle(all_words) lines = [] while all_words: line_length = random.randint(1, min(10, len(all_words))) @@ -599,7 +675,8 @@ def _kafka_test_preprocessor( 'test_streaming_sentiment_analysis_yaml', 'test_iceberg_migration_yaml', 'test_ml_preprocessing_yaml', - 'test_anomaly_scoring_yaml' + 'test_anomaly_scoring_yaml', + 'test_wordCountInclude_yaml' ]) def _io_write_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): @@ -1175,6 +1252,50 @@ def _batch_log_analysis_test_preprocessor( return test_spec +@YamlExamplesTestSuite.register_test_preprocessor( + ['test_wordCountInclude_yaml']) +def _jinja_preprocessor(raw_spec_string: str): + """ + Preprocessor for Jinja-based YAML tests. + + This function takes a raw YAML string, which is treated as a Jinja2 + template, and renders it to produce the final pipeline specification. + It specifically handles templates that use the `{% include ... %}` + directive by manually loading the content of the included files from the + filesystem. + + The Jinja variables required for rendering are loaded from a predefined + data source. + + Args: + raw_spec_string: A string containing the raw YAML content, which is a + Jinja2 template. + + Returns: + A string containing the fully rendered YAML pipeline specification. + """ + + jinja_variables = json.loads(input_data.word_count_jinja_parameter_data()) + test_file_dir = os.path.dirname(__file__) + sdk_root = os.path.abspath(os.path.join(test_file_dir, '../../../..')) + + include_files = input_data.word_count_jinja_template_data() + mock_templates = {'main_template': raw_spec_string} + for file_path in include_files: + full_path = os.path.join(sdk_root, file_path) + with open(full_path, 'r', encoding='utf-8') as f: + mock_templates[file_path] = f.read() + + # Can't use the standard expand_jinja method due to it not supporting + # `% include` jinja templization. + # TODO(#35936): Maybe update expand_jinja to handle this case. + jinja_env = Environment( + loader=DictLoader(mock_templates), undefined=StrictUndefined) + template = jinja_env.get_template('main_template') + rendered_yaml_string = template.render(jinja_variables) + return rendered_yaml_string + + INPUT_FILES = { 'products.csv': input_data.products_csv(), 'kinglear.txt': input_data.text_data(), @@ -1216,6 +1337,9 @@ def _batch_log_analysis_test_preprocessor( os.path.join(YAML_DOCS_DIR, '../transforms/elementwise/*.yaml')).run() ExamplesTest = YamlExamplesTestSuite( 'ExamplesTest', os.path.join(YAML_DOCS_DIR, '../*.yaml')).run() +JinjaTest = YamlExamplesTestSuite( + 'JinjaExamplesTest', + os.path.join(YAML_DOCS_DIR, '../transforms/jinja/**/*.yaml')).run() IOTest = YamlExamplesTestSuite( 'IOExamplesTest', os.path.join(YAML_DOCS_DIR, '../transforms/io/*.yaml')).run() diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py b/sdks/python/apache_beam/yaml/examples/testing/input_data.py index 27f210b48563..50d40224f828 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py +++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py @@ -16,6 +16,7 @@ # limitations under the License. # +import json import typing from apache_beam.io.gcp.pubsub import PubsubMessage @@ -33,6 +34,53 @@ def text_data(): ]) +def word_count_jinja_parameter_data(): + params = { + "readFromTextTransform": { + "path": "gs://dataflow-samples/shakespeare/kinglear.txt" + }, + "mapToFieldsSplitConfig": { + "language": "python", "fields": { + "value": "1" + } + }, + "explodeTransform": { + "fields": "word" + }, + "combineTransform": { + "group_by": "word", "combine": { + "value": "sum" + } + }, + "mapToFieldsCountConfig": { + "language": "python", + "fields": { + "output": "word + \" - \" + str(value)" + } + }, + "writeToTextTransform": { + "path": "gs://apache-beam-testing-derrickaw/wordCounts/" + } + } + return json.dumps(params) + + +def word_count_jinja_template_data(): + return \ +[('apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/readFromTextTransform.yaml'), + ('apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/mapToFieldsSplitConfig.yaml'), + ('apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/explodeTransform.yaml'), + ('apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/combineTransform.yaml'), + ('apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/mapToFieldsCountConfig.yaml'), + ('apache_beam/yaml/examples/transforms/jinja/' + 'include/submodules/writeToTextTransform.yaml')] + + def iceberg_dynamic_destinations_users_data(): return [{ 'id': 3, 'name': 'Smith', 'email': 'smith@example.com', 'zip': 'NY' diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md new file mode 100644 index 000000000000..9b056e9906d2 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/README.md @@ -0,0 +1,63 @@ + + +## Jinja % include Pipeline + +This example leverages the `% include` Jinja directive by having one main +pipeline and then submodules for each transformed used. + +General setup: +```sh +export PIPELINE_FILE=apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml +export KINGLEAR="gs://dataflow-samples/shakespeare/kinglear.txt" +export TEMP_LOCATION="gs://MY-BUCKET/wordCounts/" + +cd /beam/sdks/python +``` + +Multiline Run Example: +```sh +python -m apache_beam.yaml.main \ + --yaml_pipeline_file="${PIPELINE_FILE}" \ + --jinja_variables='{ + "readFromTextTransform": {"path": "'"${KINGLEAR}"'"}, + "mapToFieldsSplitConfig": { + "language": "python", + "fields": { + "value": "1" + } + }, + "explodeTransform": {"fields": "word"}, + "combineTransform": { + "group_by": "word", + "combine": {"value": "sum"} + }, + "mapToFieldsCountConfig": { + "language": "python", + "fields": {"output": "word + \" - \" + str(value)"} + }, + "writeToTextTransform": {"path": "'"${TEMP_LOCATION}"'"} + }' +``` + +Single Line Run Example: +```sh +python -m apache_beam.yaml.main --yaml_pipeline_file="${PIPELINE_FILE}" --jinja_variables='{"readFromTextTransform": {"path": "gs://dataflow-samples/shakespeare/kinglear.txt"}, "mapToFieldsSplitConfig": {"language": "python", "fields":{"value":"1"}}, "explodeTransform":{"fields":"word"}, "combineTransform":{"group_by":"word", "combine":{"value":"sum"}}, "mapToFieldsCountConfig":{"language": "python", "fields":{"output":"word + \" - \" + str(value)"}}, "writeToTextTransform":{"path":"${TEMP_LOCATION}"}}' +``` + diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/combineTransform.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/combineTransform.yaml new file mode 100644 index 000000000000..bbf813558180 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/combineTransform.yaml @@ -0,0 +1,27 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# The Combine transform groups by each word and count the occurrences. + + - name: Count words + type: Combine + config: + group_by: + - {{combineTransform.group_by}} + combine: + value: {{combineTransform.combine.value}} diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/explodeTransform.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/explodeTransform.yaml new file mode 100644 index 000000000000..d56649c5b9d3 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/explodeTransform.yaml @@ -0,0 +1,26 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# The Explode transform to take arrays of words and emit each word as a +# separate element. + + - name: Explode word arrays + type: Explode + config: + fields: + - {{explodeTransform.fields}} \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsCountConfig.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsCountConfig.yaml new file mode 100644 index 000000000000..f1423895c41e --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsCountConfig.yaml @@ -0,0 +1,24 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# A generic MapToFields transform to format the word and count into a single +# output string. + + language: {{mapToFieldsCountConfig.language}} + fields: + output: {{mapToFieldsCountConfig.fields.output}} diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsSplitConfig.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsSplitConfig.yaml new file mode 100644 index 000000000000..16c41110b926 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsSplitConfig.yaml @@ -0,0 +1,33 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# A MapToFields transform to map words to a word and a count of 1. + + language: {{mapToFieldsSplitConfig.language}} + fields: + word: + callable: |- + # TODO(#35936): Including another file here works fine, but if + # the file has a license header or other irrevalent comments, it + # will break the pipeline. Need to investigate more on Jinja + # filtering in the expand_jinja method or some other way. + import re + def my_mapping(row): + return re.findall(r'[A-Za-z\']+', row.line.lower()) + value: {{mapToFieldsSplitConfig.fields.value}} \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/readFromTextTransform.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/readFromTextTransform.yaml new file mode 100644 index 000000000000..96029c380683 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/readFromTextTransform.yaml @@ -0,0 +1,26 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This examples reads from a public file stored on Google Cloud. This +# requires authenticating with Google Cloud, or setting the file in +#`ReadFromText` to a local file. + + - name: Read from GCS + type: ReadFromText + config: + path: {{readFromTextTransform.path}} \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/writeToTextTransform.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/writeToTextTransform.yaml new file mode 100644 index 000000000000..6cce90e7a486 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/submodules/writeToTextTransform.yaml @@ -0,0 +1,27 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# This examples writes to a public file stored on Google Cloud. This +# requires authenticating with Google Cloud, or setting the file in +#`WriteToText` to a local file. + + - name: Write to GCS + type: WriteToText + config: + path: {{writeToTextTransform.path}} \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml new file mode 100644 index 000000000000..a28ba688b2f0 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/jinja/include/wordCountInclude.yaml @@ -0,0 +1,66 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This examples reads from a public file stored on Google Cloud. This +# requires authenticating with Google Cloud, or setting the file in +#`ReadFromText` to a local file. +# +# To set up Application Default Credentials, +# see https://cloud.google.com/docs/authentication/external/set-up-adc. +# +# This pipeline reads in a text file, counts distinct words found in the text, +# then logs a row containing each word and its count. + +pipeline: + type: chain + transforms: +# Read in text file +{% include 'apache_beam/yaml/examples/transforms/jinja/include/submodules/readFromTextTransform.yaml' %} + +# Split words and count occurrences + - name: Split words + type: MapToFields + config: +{% include 'apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsSplitConfig.yaml' %} + +# Explode into individual words +{% include 'apache_beam/yaml/examples/transforms/jinja/include/submodules/explodeTransform.yaml' %} + +# Group by word +{% include 'apache_beam/yaml/examples/transforms/jinja/include/submodules/combineTransform.yaml' %} + +# Format output to a single string consisting of `word - count` + - name: Format output + type: MapToFields + config: +{% include 'apache_beam/yaml/examples/transforms/jinja/include/submodules/mapToFieldsCountConfig.yaml' %} + +# Write to text file on GCS, locally, etc +{% include 'apache_beam/yaml/examples/transforms/jinja/include/submodules/writeToTextTransform.yaml' %} + +# Expected: +# Row(output='king - 311') +# Row(output='lear - 253') +# Row(output='dramatis - 1') +# Row(output='personae - 1') +# Row(output='of - 483') +# Row(output='britain - 2') +# Row(output='france - 32') +# Row(output='duke - 26') +# Row(output='burgundy - 20') +# Row(output='cornwall - 75')