diff --git a/pom.xml b/pom.xml
index 1b6a8dfc19..ee002031d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,6 +38,7 @@
3.2.1
3.11.0
3.8.1
+ 3.6.2
3.5.0
1.10.0
3.3.0
@@ -442,7 +443,7 @@
org.codehaus.mojo
exec-maven-plugin
- 3.6.2
+ ${exec-maven-plugin.version}
clean-surefire-xml
diff --git a/yaml/pom.xml b/yaml/pom.xml
index be98f47006..8be89d066f 100644
--- a/yaml/pom.xml
+++ b/yaml/pom.xml
@@ -81,6 +81,44 @@
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ ${exec-maven-plugin.version}
+
+
+ pip-install
+ test-compile
+
+ exec
+
+
+ pip
+
+ install
+ -r
+ src/test/python/requirements-test.txt
+
+
+
+
+ python-test
+ test
+
+ exec
+
+
+ python
+ ${project.basedir}
+
+ -m
+ unittest
+ src/test/python/yaml_syntax_test.py
+
+
+
+
+
diff --git a/yaml/src/main/yaml/KafkaToBigQuery.yaml b/yaml/src/main/yaml/KafkaToBigQuery.yaml
index 22a64b8c6b..750942e928 100644
--- a/yaml/src/main/yaml/KafkaToBigQuery.yaml
+++ b/yaml/src/main/yaml/KafkaToBigQuery.yaml
@@ -34,18 +34,23 @@ template:
type: text
- name: "schema"
- help: "Kafka schema. A schema is required if data format is JSON, AVRO or PROTO."
+ help: >
+ Kafka schema. A schema is required if data format is JSON, AVRO or
+ PROTO.
required: true
type: text
- name: "numStorageWriteApiStreams"
- help: "Number of streams defines the parallelism of the BigQueryIO’s Write."
+ help: >
+ Number of streams defines the parallelism of the BigQueryIO’s Write.
required: false
default: 1
type: integer
- name: "storageWriteApiTriggeringFrequencySec"
- help: "Triggering frequency will determine how soon the data will be visible for querying in BigQuery."
+ help: >
+ Triggering frequency will determine how soon the data will be visible
+ for querying in BigQuery.
required: false
default: 5
type: integer
@@ -87,4 +92,4 @@ pipeline:
num_streams: {{ numStorageWriteApiStreams | default(1) }}
options:
- streaming: true
\ No newline at end of file
+ streaming: true
diff --git a/yaml/src/test/python/requirements-test.txt b/yaml/src/test/python/requirements-test.txt
new file mode 100644
index 0000000000..e8c646d10a
--- /dev/null
+++ b/yaml/src/test/python/requirements-test.txt
@@ -0,0 +1,2 @@
+apache-beam[gcp,test]
+jinja2
diff --git a/yaml/src/test/python/yaml_syntax_test.py b/yaml/src/test/python/yaml_syntax_test.py
new file mode 100644
index 0000000000..995d4be210
--- /dev/null
+++ b/yaml/src/test/python/yaml_syntax_test.py
@@ -0,0 +1,108 @@
+#
+# Copyright (C) 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+
+import logging
+import os
+import unittest
+
+import apache_beam as beam
+import yaml
+from apache_beam.yaml import yaml_transform
+from jinja2 import Environment, FileSystemLoader, meta
+
+# Configure logging at the module level to ensure it's set up early.
+# This will ensure logs are printed to console.
+# Using basicConfig with a format to make logs more informative.
+logging.basicConfig(level=logging.DEBUG,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+logging.getLogger('apache_beam').setLevel(logging.DEBUG)
+
+
+def create_test_method(template_name, yaml_dir):
+ """Creates a test method that validates a single YAML template.
+
+ This factory function generates a test method that will be dynamically added
+ to the YamlSyntaxTest class. Each generated test validates a single YAML
+ template file by rendering it with placeholder values for any Jinja
+ variables and then validating the resulting YAML against the Beam 'generic'
+ schema.
+
+ Args:
+ template_name (str): The filename of the YAML template to be tested.
+ yaml_dir (str): The directory where the YAML templates are located.
+
+ Returns:
+ function: A test method that can be attached to a unittest.TestCase class.
+ """
+
+ def test_method(self):
+ self._logger.info(f"Validating {template_name}")
+ env = Environment(loader=FileSystemLoader(yaml_dir), autoescape=False)
+ template_source = env.loader.get_source(env, template_name)[0]
+
+ # Find all undeclared variables in the template
+ parsed_content = env.parse(template_source)
+ undeclared_vars = meta.find_undeclared_variables(parsed_content)
+
+ # Use placeholder values for Jinja variables for validation purposes
+ context = {var: 'placeholder' for var in undeclared_vars}
+ template = env.get_template(template_name)
+ rendered_yaml = template.render(context)
+ self._logger.debug(f"Rendered YAML for {template_name}:\n{rendered_yaml}...")
+
+ self._logger.debug(f"Loading YAML into Beam pipeline_spec: {template_name}")
+ pipeline_spec = yaml.load(rendered_yaml, Loader=yaml_transform.SafeLineLoader)
+
+ # Validate the pipeline spec against the generic schema without trying to
+ # expand the transforms, which avoids the need for expansion services.
+ yaml_transform.validate_against_schema(pipeline_spec, 'generic')
+ self._logger.info(f"Successfully validated YAML syntax for: {template_name}")
+
+ return test_method
+
+
+class YamlSyntaxTest(unittest.TestCase):
+ """A test suite for validating the syntax of Beam YAML templates.
+
+ This class is dynamically populated with test methods, one for each
+ .yaml file found in the `src/main/yaml` directory. This is accomplished
+ by the `_create_tests` function, which runs at module-load time.
+ """
+ _logger = logging.getLogger(__name__)
+
+
+def _create_tests():
+ """Discovers all YAML templates and dynamically creates a test for each.
+
+ This function scans the `src/main/yaml` directory for `.yaml` files and,
+ for each file, generates a unique test method on the `YamlSyntaxTest` class.
+ This allows `unittest` or `pytest` to discover and run each validation as a
+ separate test case, making it easy to identify which template is invalid.
+ """
+ yaml_dir = os.path.join(os.path.dirname(__file__), '../../main/yaml')
+ if not os.path.isdir(yaml_dir):
+ return
+
+ env = Environment(loader=FileSystemLoader(yaml_dir))
+ for template_name in env.list_templates(filter_func=lambda x: x.endswith('.yaml')):
+ test_name = f"test_{template_name.replace('.yaml', '').replace('-', '_')}"
+ test_method = create_test_method(template_name, yaml_dir)
+ setattr(YamlSyntaxTest, test_name, test_method)
+
+_create_tests()
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file