Skip to content

Commit 402f353

Browse files
committed
add multiline windowing config support back
1 parent b4f1763 commit 402f353

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

sdks/python/apache_beam/yaml/yaml_transform.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,9 +1030,25 @@ def push_windowing_to_roots(spec):
10301030
def preprocess_windowing(spec):
10311031
if spec['type'] == 'WindowInto':
10321032
# This is the transform where it is actually applied.
1033+
10331034
if 'windowing' in spec:
10341035
spec['config'] = spec.get('config', {})
10351036
spec['config']['windowing'] = spec.pop('windowing')
1037+
1038+
if spec.get('config', {}).get('windowing'):
1039+
windowing_config = spec['config']['windowing']
1040+
if isinstance(windowing_config, str):
1041+
try:
1042+
# PyYAML can load a JSON string - one-line and multi-line.
1043+
# Without this code, multi-line is not supported.
1044+
parsed_config = yaml.safe_load(windowing_config)
1045+
if not isinstance(parsed_config, dict):
1046+
raise TypeError('Windowing config string must be a YAML/JSON map.')
1047+
spec['config']['windowing'] = parsed_config
1048+
except Exception as e:
1049+
raise ValueError(
1050+
f'Error parsing windowing config string at \
1051+
{identify_object(spec)}: {e}') from e
10361052
return spec
10371053
elif 'windowing' not in spec:
10381054
# Nothing to do.

sdks/python/apache_beam/yaml/yaml_transform_test.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -993,7 +993,7 @@ def test_explicit_window_into(self):
993993
providers=TEST_PROVIDERS)
994994
assert_that(result, equal_to([6, 9]))
995995

996-
def test_explicit_window_into_with_json_string_config(self):
996+
def test_explicit_window_into_with_json_string_config_one_line(self):
997997
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
998998
pickle_library='cloudpickle')) as p:
999999
result = p | YamlTransform(
@@ -1011,6 +1011,25 @@ def test_explicit_window_into_with_json_string_config(self):
10111011
providers=TEST_PROVIDERS)
10121012
assert_that(result, equal_to([6, 9]))
10131013

1014+
def test_explicit_window_into_with_json_string_config_multi_line(self):
1015+
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
1016+
pickle_library='cloudpickle')) as p:
1017+
result = p | YamlTransform(
1018+
'''
1019+
type: chain
1020+
transforms:
1021+
- type: CreateTimestamped
1022+
config:
1023+
elements: [0, 1, 2, 3, 4, 5]
1024+
- type: WindowInto
1025+
config:
1026+
windowing: |
1027+
{"type": "fixed", "size": "4s"}
1028+
- type: SumGlobally
1029+
''',
1030+
providers=TEST_PROVIDERS)
1031+
assert_that(result, equal_to([6, 9]))
1032+
10141033
def test_explicit_window_into_with_string_config_fails(self):
10151034
with self.assertRaisesRegex(ValueError, 'Error parsing windowing config'):
10161035
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(

0 commit comments

Comments
 (0)