@@ -993,6 +993,43 @@ def test_explicit_window_into(self):
993993 providers = TEST_PROVIDERS )
994994 assert_that (result , equal_to ([6 , 9 ]))
995995
996+ def test_explicit_window_into_with_json_string_config (self ):
997+ with beam .Pipeline (options = beam .options .pipeline_options .PipelineOptions (
998+ pickle_library = 'cloudpickle' )) as p :
999+ result = p | YamlTransform (
1000+ '''
1001+ type: chain
1002+ transforms:
1003+ - type: CreateTimestamped
1004+ config:
1005+ elements: [0, 1, 2, 3, 4, 5]
1006+ - type: WindowInto
1007+ config:
1008+ windowing: |
1009+ {"type": "fixed", "size": "4s"}
1010+ - type: SumGlobally
1011+ ''' ,
1012+ providers = TEST_PROVIDERS )
1013+ assert_that (result , equal_to ([6 , 9 ]))
1014+
1015+ def test_explicit_window_into_with_string_config_fails (self ):
1016+ with self .assertRaisesRegex (ValueError , 'Error parsing windowing config' ):
1017+ with beam .Pipeline (options = beam .options .pipeline_options .PipelineOptions (
1018+ pickle_library = 'cloudpickle' )) as p :
1019+ _ = p | YamlTransform (
1020+ '''
1021+ type: chain
1022+ transforms:
1023+ - type: CreateTimestamped
1024+ config:
1025+ elements: [0, 1, 2, 3, 4, 5]
1026+ - type: WindowInto
1027+ config:
1028+ windowing: |
1029+ 'not a valid yaml'
1030+ ''' ,
1031+ providers = TEST_PROVIDERS )
1032+
9961033 def test_windowing_on_input (self ):
9971034 with beam .Pipeline (options = beam .options .pipeline_options .PipelineOptions (
9981035 pickle_library = 'cloudpickle' )) as p :
0 commit comments