From e51aad0ac0846eab4d26f585c39e1b837d85ff25 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 10 Oct 2025 21:08:32 +0000 Subject: [PATCH 01/11] add jinjava library --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 1b6a8dfc19..e696839ca4 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ 2.24.3 2.15.4 1.5.4 + 2.8.1 20250517 2.6.0 4.13.2 From 878bc23e026389d14954cffbba2f8052c8395915 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 10 Oct 2025 21:08:58 +0000 Subject: [PATCH 02/11] add dependency for ut --- yaml/pom.xml | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/yaml/pom.xml b/yaml/pom.xml index be98f47006..551ddd7a93 100644 --- a/yaml/pom.xml +++ b/yaml/pom.xml @@ -65,6 +65,38 @@ beam-it-kafka test + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + ${jackson.version} + test + + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + test + + + + + junit + junit + ${junit.version} + test + + + + + com.hubspot.jinjava + jinjava + ${jinjava.version} + test + From 6db725242cd8cdcde9524f92860fdde0a4b816be Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 10 Oct 2025 21:09:55 +0000 Subject: [PATCH 03/11] clean up formatting --- yaml/src/main/yaml/KafkaToBigQuery.yaml | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/yaml/src/main/yaml/KafkaToBigQuery.yaml b/yaml/src/main/yaml/KafkaToBigQuery.yaml index 22a64b8c6b..750942e928 100644 --- a/yaml/src/main/yaml/KafkaToBigQuery.yaml +++ b/yaml/src/main/yaml/KafkaToBigQuery.yaml @@ -34,18 +34,23 @@ template: type: text - name: "schema" - help: "Kafka schema. A schema is required if data format is JSON, AVRO or PROTO." + help: > + Kafka schema. A schema is required if data format is JSON, AVRO or + PROTO. required: true type: text - name: "numStorageWriteApiStreams" - help: "Number of streams defines the parallelism of the BigQueryIO’s Write." + help: > + Number of streams defines the parallelism of the BigQueryIO’s Write. required: false default: 1 type: integer - name: "storageWriteApiTriggeringFrequencySec" - help: "Triggering frequency will determine how soon the data will be visible for querying in BigQuery." + help: > + Triggering frequency will determine how soon the data will be visible + for querying in BigQuery. required: false default: 5 type: integer @@ -87,4 +92,4 @@ pipeline: num_streams: {{ numStorageWriteApiStreams | default(1) }} options: - streaming: true \ No newline at end of file + streaming: true From 54f054f397010eb4c2b47a2d2a9fa683e9aaf69c Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 10 Oct 2025 21:11:14 +0000 Subject: [PATCH 04/11] add yaml syntaxt ut --- .../templates/yaml/YamlSyntaxTest.java | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 yaml/src/test/java/com/google/cloud/teleport/templates/yaml/YamlSyntaxTest.java diff --git a/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/YamlSyntaxTest.java b/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/YamlSyntaxTest.java new file mode 100644 index 0000000000..2cdf149e42 --- /dev/null +++ b/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/YamlSyntaxTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.templates.yaml; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.hubspot.jinjava.Jinjava; +import com.hubspot.jinjava.interpret.RenderResult; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test validates that all YAML files in the project are syntactically correct after Jinja + * templating. + */ +@RunWith(JUnit4.class) +public class YamlSyntaxTest { + private static final Logger LOG = LoggerFactory.getLogger(YamlSyntaxTest.class); + + @Test + public void testAllYamlFilesAreWellFormed() throws IOException { + // The ObjectMapper configured for YAML will be our parser + ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); + Jinjava jinjava = new Jinjava(); + + // Define the directory where your YAML templates are stored + Path yamlDirectory = Paths.get("src", "main", "yaml"); + + // Use Java's file streaming to find all YAML files + try (Stream paths = Files.walk(yamlDirectory)) { + paths + .filter(Files::isRegularFile) + .filter(path -> path.toString().endsWith(".yaml")) + .forEach( + path -> { + try { + // Read the raw YAML content + String rawYamlContent = Files.readString(path); + + // Dynamically create a context with 'placeholder' values for all found variables. + Map context = new HashMap<>(); + RenderResult result = jinjava.renderForResult(rawYamlContent, new HashMap<>()); + Set variables = result.getContext().getResolvedExpressions(); + LOG.info("Found variables: " + variables); + variables.forEach(variable -> context.put(variable, "placeholder")); + + // Render the template with the 'placeholder' context + String renderedYamlContent = jinjava.render(rawYamlContent, context); + LOG.info("Rendered YAML content: " + renderedYamlContent); + + // Now validate the rendered YAML content + Object ignored = yamlMapper.readValue(renderedYamlContent, Object.class); + LOG.info("Successfully validated: " + path); + } catch (IOException e) { + throw new RuntimeException("Failed to parse YAML file: " + path, e); + } + }); + } + } +} From e734de5dc19455827e80de62dcdc84d05159aa13 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 23 Oct 2025 18:58:53 +0000 Subject: [PATCH 05/11] fix pom conflict --- pom.xml | 3 ++- yaml/pom.xml | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e696839ca4..88d2687b5d 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ 3.2.1 3.11.0 3.8.1 + 3.6.2 3.5.0 1.10.0 3.3.0 @@ -443,7 +444,7 @@ org.codehaus.mojo exec-maven-plugin - 3.6.2 + ${exec-maven-plugin.version} clean-surefire-xml diff --git a/yaml/pom.xml b/yaml/pom.xml index 551ddd7a93..3273146dd3 100644 --- a/yaml/pom.xml +++ b/yaml/pom.xml @@ -113,6 +113,44 @@ + + org.codehaus.mojo + exec-maven-plugin + ${exec-maven-plugin.version} + + + pip-install + test-compile + + exec + + + pip + + install + -r + src/test/python/requirements-test.txt + + + + + python-test + test + + exec + + + python + ${project.basedir} + + -m + unittest + src/test/python/yaml_syntax_test.py + + + + + From 508377236a9bc93fe7f0c0d8f21d0a500d86f455 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 13 Oct 2025 19:27:47 +0000 Subject: [PATCH 06/11] add python unit test for yaml to replace java based --- yaml/src/test/python/requirements-test.txt | 2 + yaml/src/test/python/yaml_syntax_test.py | 70 ++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 yaml/src/test/python/requirements-test.txt create mode 100644 yaml/src/test/python/yaml_syntax_test.py diff --git a/yaml/src/test/python/requirements-test.txt b/yaml/src/test/python/requirements-test.txt new file mode 100644 index 0000000000..e8c646d10a --- /dev/null +++ b/yaml/src/test/python/requirements-test.txt @@ -0,0 +1,2 @@ +apache-beam[gcp,test] +jinja2 diff --git a/yaml/src/test/python/yaml_syntax_test.py b/yaml/src/test/python/yaml_syntax_test.py new file mode 100644 index 0000000000..ad8c094651 --- /dev/null +++ b/yaml/src/test/python/yaml_syntax_test.py @@ -0,0 +1,70 @@ +# +# Copyright (C) 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +# + +import logging +import os +import unittest + +import apache_beam as beam +import yaml +from apache_beam.yaml import yaml_transform +from jinja2 import Environment, FileSystemLoader, meta + +# Configure logging at the module level to ensure it's set up early. +# This will ensure logs are printed to console. +# Using basicConfig with a format to make logs more informative. +logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logging.getLogger('apache_beam').setLevel(logging.DEBUG) + + +class YamlSyntaxTest(unittest.TestCase): + _logger = logging.getLogger(__name__) + def test_all_yaml_files_are_valid_beam_pipelines(self): + """ + Tests that all .yaml files in the src/main/yaml directory are valid + Apache Beam pipelines after being rendered with Jinja. + """ + yaml_dir = os.path.join(os.path.dirname(__file__), '../../main/yaml') + self.assertTrue(os.path.isdir(yaml_dir), f"Directory not found: {yaml_dir}") + + env = Environment(loader=FileSystemLoader(yaml_dir)) + + for template_name in env.list_templates(filter_func=lambda x: x.endswith('.yaml')): + with self.subTest(template_name=template_name): + logging.info(f"Validating {template_name}") + self._logger.debug(f"Attempting to validate YAML: {template_name}") + template = env.get_template(template_name) + logging.info(f"Template: {template}") + + # Find all undeclared variables in the template + undeclared_vars = meta.find_undeclared_variables(env.parse(template_name)) + + # Use placeholder values for Jinja variables for validation purposes + context = {var: 'placeholder' for var in undeclared_vars} + rendered_yaml = template.render(context) + self._logger.debug(f"Rendered YAML for {template_name}:\n{rendered_yaml}...") # Log first 500 chars + + self._logger.debug(f"Loading YAML into Beam pipeline_spec: {template_name}") + pipeline_spec = yaml.load(rendered_yaml, Loader=yaml_transform.SafeLineLoader) + + # Validate the pipeline spec against the generic schema without trying to + # expand the transforms, which avoids the need for expansion services. + yaml_transform.validate_against_schema(pipeline_spec, 'generic') + self._logger.info(f"Successfully validated YAML syntax for: {template_name}") + +if __name__ == '__main__': + unittest.main() \ No newline at end of file From 2831b69140e3cf21124258dfdad105391bedb6a5 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 13 Oct 2025 19:28:22 +0000 Subject: [PATCH 07/11] remove java yaml ut --- .../templates/yaml/YamlSyntaxTest.java | 84 ------------------- 1 file changed, 84 deletions(-) delete mode 100644 yaml/src/test/java/com/google/cloud/teleport/templates/yaml/YamlSyntaxTest.java diff --git a/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/YamlSyntaxTest.java b/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/YamlSyntaxTest.java deleted file mode 100644 index 2cdf149e42..0000000000 --- a/yaml/src/test/java/com/google/cloud/teleport/templates/yaml/YamlSyntaxTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (C) 2025 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.templates.yaml; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.hubspot.jinjava.Jinjava; -import com.hubspot.jinjava.interpret.RenderResult; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.stream.Stream; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This test validates that all YAML files in the project are syntactically correct after Jinja - * templating. - */ -@RunWith(JUnit4.class) -public class YamlSyntaxTest { - private static final Logger LOG = LoggerFactory.getLogger(YamlSyntaxTest.class); - - @Test - public void testAllYamlFilesAreWellFormed() throws IOException { - // The ObjectMapper configured for YAML will be our parser - ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); - Jinjava jinjava = new Jinjava(); - - // Define the directory where your YAML templates are stored - Path yamlDirectory = Paths.get("src", "main", "yaml"); - - // Use Java's file streaming to find all YAML files - try (Stream paths = Files.walk(yamlDirectory)) { - paths - .filter(Files::isRegularFile) - .filter(path -> path.toString().endsWith(".yaml")) - .forEach( - path -> { - try { - // Read the raw YAML content - String rawYamlContent = Files.readString(path); - - // Dynamically create a context with 'placeholder' values for all found variables. - Map context = new HashMap<>(); - RenderResult result = jinjava.renderForResult(rawYamlContent, new HashMap<>()); - Set variables = result.getContext().getResolvedExpressions(); - LOG.info("Found variables: " + variables); - variables.forEach(variable -> context.put(variable, "placeholder")); - - // Render the template with the 'placeholder' context - String renderedYamlContent = jinjava.render(rawYamlContent, context); - LOG.info("Rendered YAML content: " + renderedYamlContent); - - // Now validate the rendered YAML content - Object ignored = yamlMapper.readValue(renderedYamlContent, Object.class); - LOG.info("Successfully validated: " + path); - } catch (IOException e) { - throw new RuntimeException("Failed to parse YAML file: " + path, e); - } - }); - } - } -} From ab20151465d11fa08c06efed86b5d7a3f3b4bd75 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 14 Oct 2025 15:24:56 +0000 Subject: [PATCH 08/11] remove old dependencies --- yaml/pom.xml | 32 -------------------------------- 1 file changed, 32 deletions(-) diff --git a/yaml/pom.xml b/yaml/pom.xml index 3273146dd3..8be89d066f 100644 --- a/yaml/pom.xml +++ b/yaml/pom.xml @@ -65,38 +65,6 @@ beam-it-kafka test - - - - com.fasterxml.jackson.dataformat - jackson-dataformat-yaml - ${jackson.version} - test - - - - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - test - - - - - junit - junit - ${junit.version} - test - - - - - com.hubspot.jinjava - jinjava - ${jinjava.version} - test - From 08d054d6aaa3a8efc46c2cb4c03d7fc5cfd6b1da Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 14 Oct 2025 15:26:33 +0000 Subject: [PATCH 09/11] remove jinjava --- pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/pom.xml b/pom.xml index 88d2687b5d..ee002031d7 100644 --- a/pom.xml +++ b/pom.xml @@ -74,7 +74,6 @@ 2.24.3 2.15.4 1.5.4 - 2.8.1 20250517 2.6.0 4.13.2 From eaab6b8b334d814643374e53f43a715796c606d0 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 14 Oct 2025 20:15:50 +0000 Subject: [PATCH 10/11] remove comment --- yaml/src/test/python/yaml_syntax_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yaml/src/test/python/yaml_syntax_test.py b/yaml/src/test/python/yaml_syntax_test.py index ad8c094651..9f4c569a55 100644 --- a/yaml/src/test/python/yaml_syntax_test.py +++ b/yaml/src/test/python/yaml_syntax_test.py @@ -56,7 +56,7 @@ def test_all_yaml_files_are_valid_beam_pipelines(self): # Use placeholder values for Jinja variables for validation purposes context = {var: 'placeholder' for var in undeclared_vars} rendered_yaml = template.render(context) - self._logger.debug(f"Rendered YAML for {template_name}:\n{rendered_yaml}...") # Log first 500 chars + self._logger.debug(f"Rendered YAML for {template_name}:\n{rendered_yaml}...") self._logger.debug(f"Loading YAML into Beam pipeline_spec: {template_name}") pipeline_spec = yaml.load(rendered_yaml, Loader=yaml_transform.SafeLineLoader) From ed510be317975ce97712748e5da060fa8d708a3e Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 23 Oct 2025 20:42:19 +0000 Subject: [PATCH 11/11] fix comment about test case generation --- yaml/src/test/python/yaml_syntax_test.py | 98 ++++++++++++++++-------- 1 file changed, 68 insertions(+), 30 deletions(-) diff --git a/yaml/src/test/python/yaml_syntax_test.py b/yaml/src/test/python/yaml_syntax_test.py index 9f4c569a55..995d4be210 100644 --- a/yaml/src/test/python/yaml_syntax_test.py +++ b/yaml/src/test/python/yaml_syntax_test.py @@ -29,42 +29,80 @@ logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('apache_beam').setLevel(logging.DEBUG) + + +def create_test_method(template_name, yaml_dir): + """Creates a test method that validates a single YAML template. + This factory function generates a test method that will be dynamically added + to the YamlSyntaxTest class. Each generated test validates a single YAML + template file by rendering it with placeholder values for any Jinja + variables and then validating the resulting YAML against the Beam 'generic' + schema. -class YamlSyntaxTest(unittest.TestCase): - _logger = logging.getLogger(__name__) - def test_all_yaml_files_are_valid_beam_pipelines(self): - """ - Tests that all .yaml files in the src/main/yaml directory are valid - Apache Beam pipelines after being rendered with Jinja. - """ - yaml_dir = os.path.join(os.path.dirname(__file__), '../../main/yaml') - self.assertTrue(os.path.isdir(yaml_dir), f"Directory not found: {yaml_dir}") - - env = Environment(loader=FileSystemLoader(yaml_dir)) + Args: + template_name (str): The filename of the YAML template to be tested. + yaml_dir (str): The directory where the YAML templates are located. - for template_name in env.list_templates(filter_func=lambda x: x.endswith('.yaml')): - with self.subTest(template_name=template_name): - logging.info(f"Validating {template_name}") - self._logger.debug(f"Attempting to validate YAML: {template_name}") - template = env.get_template(template_name) - logging.info(f"Template: {template}") + Returns: + function: A test method that can be attached to a unittest.TestCase class. + """ - # Find all undeclared variables in the template - undeclared_vars = meta.find_undeclared_variables(env.parse(template_name)) - - # Use placeholder values for Jinja variables for validation purposes - context = {var: 'placeholder' for var in undeclared_vars} - rendered_yaml = template.render(context) - self._logger.debug(f"Rendered YAML for {template_name}:\n{rendered_yaml}...") + def test_method(self): + self._logger.info(f"Validating {template_name}") + env = Environment(loader=FileSystemLoader(yaml_dir), autoescape=False) + template_source = env.loader.get_source(env, template_name)[0] + + # Find all undeclared variables in the template + parsed_content = env.parse(template_source) + undeclared_vars = meta.find_undeclared_variables(parsed_content) + + # Use placeholder values for Jinja variables for validation purposes + context = {var: 'placeholder' for var in undeclared_vars} + template = env.get_template(template_name) + rendered_yaml = template.render(context) + self._logger.debug(f"Rendered YAML for {template_name}:\n{rendered_yaml}...") + + self._logger.debug(f"Loading YAML into Beam pipeline_spec: {template_name}") + pipeline_spec = yaml.load(rendered_yaml, Loader=yaml_transform.SafeLineLoader) + + # Validate the pipeline spec against the generic schema without trying to + # expand the transforms, which avoids the need for expansion services. + yaml_transform.validate_against_schema(pipeline_spec, 'generic') + self._logger.info(f"Successfully validated YAML syntax for: {template_name}") + + return test_method + + +class YamlSyntaxTest(unittest.TestCase): + """A test suite for validating the syntax of Beam YAML templates. - self._logger.debug(f"Loading YAML into Beam pipeline_spec: {template_name}") - pipeline_spec = yaml.load(rendered_yaml, Loader=yaml_transform.SafeLineLoader) + This class is dynamically populated with test methods, one for each + .yaml file found in the `src/main/yaml` directory. This is accomplished + by the `_create_tests` function, which runs at module-load time. + """ + _logger = logging.getLogger(__name__) + + +def _create_tests(): + """Discovers all YAML templates and dynamically creates a test for each. - # Validate the pipeline spec against the generic schema without trying to - # expand the transforms, which avoids the need for expansion services. - yaml_transform.validate_against_schema(pipeline_spec, 'generic') - self._logger.info(f"Successfully validated YAML syntax for: {template_name}") + This function scans the `src/main/yaml` directory for `.yaml` files and, + for each file, generates a unique test method on the `YamlSyntaxTest` class. + This allows `unittest` or `pytest` to discover and run each validation as a + separate test case, making it easy to identify which template is invalid. + """ + yaml_dir = os.path.join(os.path.dirname(__file__), '../../main/yaml') + if not os.path.isdir(yaml_dir): + return + + env = Environment(loader=FileSystemLoader(yaml_dir)) + for template_name in env.list_templates(filter_func=lambda x: x.endswith('.yaml')): + test_name = f"test_{template_name.replace('.yaml', '').replace('-', '_')}" + test_method = create_test_method(template_name, yaml_dir) + setattr(YamlSyntaxTest, test_name, test_method) + +_create_tests() if __name__ == '__main__': unittest.main() \ No newline at end of file