Skip to content

Commit 32aff26

Browse files
authored
[yaml] - add yaml expand_pipeline ut tests (#36087)
* rebase * add more tests * fix lint issue
1 parent be4fb97 commit 32aff26

File tree

1 file changed

+64
-0
lines changed

1 file changed

+64
-0
lines changed

sdks/python/apache_beam/yaml/yaml_transform_unit_test.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from apache_beam.yaml.yaml_transform import ensure_errors_consumed
2929
from apache_beam.yaml.yaml_transform import ensure_transforms_have_types
3030
from apache_beam.yaml.yaml_transform import expand_composite_transform
31+
from apache_beam.yaml.yaml_transform import expand_pipeline
3132
from apache_beam.yaml.yaml_transform import extract_name
3233
from apache_beam.yaml.yaml_transform import get_main_output_key
3334
from apache_beam.yaml.yaml_transform import identify_object
@@ -1035,6 +1036,69 @@ def test_init_with_dict(self):
10351036
self.assertEqual(result._spec['type'], "composite") # preprocessed spec
10361037

10371038

1039+
class ExpandPipelineTest(unittest.TestCase):
1040+
def test_expand_pipeline_with_pipeline_key_only(self):
1041+
spec = '''
1042+
pipeline:
1043+
type: chain
1044+
transforms:
1045+
- type: Create
1046+
config:
1047+
elements: [1,2,3]
1048+
- type: LogForTesting
1049+
'''
1050+
with new_pipeline() as p:
1051+
expand_pipeline(p, spec, validate_schema=None)
1052+
1053+
def test_expand_pipeline_with_pipeline_and_option_keys(self):
1054+
spec = '''
1055+
pipeline:
1056+
type: chain
1057+
transforms:
1058+
- type: Create
1059+
config:
1060+
elements: [1,2,3]
1061+
- type: LogForTesting
1062+
options:
1063+
streaming: false
1064+
'''
1065+
with new_pipeline() as p:
1066+
expand_pipeline(p, spec, validate_schema=None)
1067+
1068+
def test_expand_pipeline_with_extra_top_level_keys(self):
1069+
spec = '''
1070+
template:
1071+
version: "1.0"
1072+
author: "test_user"
1073+
1074+
pipeline:
1075+
type: chain
1076+
transforms:
1077+
- type: Create
1078+
config:
1079+
elements: [1,2,3]
1080+
- type: LogForTesting
1081+
1082+
other_metadata: "This is an ignored comment."
1083+
'''
1084+
with new_pipeline() as p:
1085+
expand_pipeline(p, spec, validate_schema=None)
1086+
1087+
def test_expand_pipeline_with_incorrect_pipelines_key_fails(self):
1088+
spec = '''
1089+
pipelines:
1090+
type: chain
1091+
transforms:
1092+
- type: Create
1093+
config:
1094+
elements: [1,2,3]
1095+
- type: LogForTesting
1096+
'''
1097+
with new_pipeline() as p:
1098+
with self.assertRaises(KeyError):
1099+
expand_pipeline(p, spec, validate_schema=None)
1100+
1101+
10381102
if __name__ == '__main__':
10391103
logging.getLogger().setLevel(logging.INFO)
10401104
unittest.main()

0 commit comments

Comments
 (0)