Skip to content

Commit be002c7

Browse files
[YAML]: add optional schema config for all transforms (#35952)
* rebase - fix changes confilct * fix some errors * add comment * fix whitespace * Update website/www/site/content/en/documentation/sdks/yaml-schema.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update website/www/site/content/en/documentation/sdks/yaml-schema.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * address gemeni review comment and lint issue * address comments * revert to filtering out None fields * update yaml files based on combined error_handling output * update readmes and docs * update tests based on new logic * update code logic based on new output design * remove changes to text.yaml * remove old logic * fix wording * update comments etc * fix github action test * add unit test for get_main_output_key * update comments and logic * add another yaml transform test to cover code change * add comments and warnings about non-validated outputs * updated get_main_output_keys test * update again ut --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 56607ec commit be002c7

File tree

15 files changed

+747
-15
lines changed

15 files changed

+747
-15
lines changed

CHANGES.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@
121121
- Use the `--element_processing_timeout_minutes` option to reduce the chance of having stalled pipelines due to unexpected cases of slow processing, where slowness might not happen again if processing of the same element is retried.
122122
* (Python) Adding GCP Spanner Change Stream support for Python (apache_beam.io.gcp.spanner) ([#24103](https://github.com/apache/beam/issues/24103)).
123123

124-
125124
## Breaking Changes
126125

127126
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

sdks/python/apache_beam/yaml/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ or
6262
pytest -v integration_tests.py::<yaml_file_name_without_extension>Test
6363
```
6464

65-
To run the postcommit tests:
65+
To run some of the postcommit tests, for example:
6666

6767
```bash
68-
pytest -v integration_tests.py --test_files_dir="extended_tests"
68+
pytest -v integration_tests.py --test_files_dir="extended_tests/messaging"
6969
```
7070

sdks/python/apache_beam/yaml/json_utils.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,10 @@ def row_to_json(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]:
287287
for field in beam_type.row_type.schema.fields
288288
}
289289
return lambda row: {
290-
name: convert(getattr(row, name))
290+
name: converted
291291
for (name, convert) in converters.items()
292+
# To filter out nullable fields in rows
293+
if (converted := convert(getattr(row, name, None))) is not None
292294
}
293295
elif type_info == "logical_type":
294296
return lambda value: value
@@ -348,6 +350,9 @@ def validate(row):
348350
nonlocal validator
349351
if validator is None:
350352
validator = jsonschema.validators.validator_for(json_schema)(json_schema)
353+
# NOTE: A row like BeamSchema_...(name='Bob', score=None, age=25) needs to
354+
# have any fields that are None to be filtered out or the validator will
355+
# fail (e.g. {'age': 25, 'name': 'Bob'}).
351356
validator.validate(convert(row))
352357

353358
return validate

sdks/python/apache_beam/yaml/pipeline.schema.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ $defs:
4545
properties: { __line__: {}}
4646
additionalProperties:
4747
type: string
48+
output_schema:
49+
type: object
4850
additionalProperties: true
4951
required:
5052
- type
@@ -129,6 +131,7 @@ $defs:
129131
name: {}
130132
input: {}
131133
output: {}
134+
output_schema: { type: object }
132135
windowing: {}
133136
resource_hints: {}
134137
config: { type: object }

sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,90 @@ pipelines:
8282
config:
8383
elements:
8484
- {user: bob, timestamp: 3}
85+
86+
# Assign timestamp to beam row element with error handling and output schema
87+
# check.
88+
- pipeline:
89+
type: composite
90+
transforms:
91+
- type: Create
92+
name: CreateVisits
93+
config:
94+
elements:
95+
- {user: alice, timestamp: "not-valid"}
96+
- {user: bob, timestamp: 3}
97+
- type: AssignTimestamps
98+
input: CreateVisits
99+
config:
100+
timestamp: timestamp
101+
error_handling:
102+
output: invalid_rows
103+
output_schema:
104+
type: object
105+
properties:
106+
user:
107+
type: string
108+
timestamp:
109+
type: integer
110+
- type: MapToFields
111+
name: ExtractInvalidTimestamp
112+
input: AssignTimestamps.invalid_rows
113+
config:
114+
language: python
115+
fields:
116+
user: "element.user"
117+
timestamp: "element.timestamp"
118+
- type: AssertEqual
119+
input: ExtractInvalidTimestamp
120+
config:
121+
elements:
122+
- {user: "alice", timestamp: "not-valid"}
123+
- type: AssertEqual
124+
input: AssignTimestamps
125+
config:
126+
elements:
127+
- {user: bob, timestamp: 3}
128+
129+
# Assign timestamp to beam row element with error handling and output schema
130+
# check with more error handling.
131+
- pipeline:
132+
type: composite
133+
transforms:
134+
- type: Create
135+
name: CreateVisits
136+
config:
137+
elements:
138+
- {user: alice, timestamp: "not-valid"}
139+
- {user: bob, timestamp: 3}
140+
- type: AssignTimestamps
141+
input: CreateVisits
142+
config:
143+
timestamp: timestamp
144+
error_handling:
145+
output: invalid_rows
146+
output_schema:
147+
type: object
148+
properties:
149+
user:
150+
type: string
151+
timestamp:
152+
type: boolean
153+
- type: MapToFields
154+
name: ExtractInvalidTimestamp
155+
input: AssignTimestamps.invalid_rows
156+
config:
157+
language: python
158+
fields:
159+
user: "element.user"
160+
timestamp: "element.timestamp"
161+
- type: AssertEqual
162+
input: ExtractInvalidTimestamp
163+
config:
164+
elements:
165+
- {user: "alice", timestamp: "not-valid"}
166+
- {user: bob, timestamp: 3}
167+
- type: AssertEqual
168+
input: AssignTimestamps
169+
config:
170+
elements: []
171+

sdks/python/apache_beam/yaml/tests/create.yaml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,26 @@ pipelines:
115115
- {sdk: MapReduce, year: 2004}
116116
- {}
117117
- {sdk: MillWheel, year: 2008}
118+
119+
# Simple Create with output schema check
120+
- pipeline:
121+
type: chain
122+
transforms:
123+
- type: Create
124+
config:
125+
elements:
126+
- {sdk: MapReduce, year: 2004}
127+
- {sdk: MillWheel, year: 2008}
128+
output_schema:
129+
type: object
130+
properties:
131+
sdk:
132+
type: string
133+
year:
134+
type: integer
135+
- type: AssertEqual
136+
config:
137+
elements:
138+
- {sdk: MapReduce, year: 2004}
139+
- {sdk: MillWheel, year: 2008}
140+

sdks/python/apache_beam/yaml/tests/validate_with_schema.yaml

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,48 @@ pipelines:
9292
- {name: "Alice", age: 30, score: 95.5}
9393
- {name: "Bob", age: 25, score: 88.0}
9494

95-
95+
# Validate a Beam Row with a predefined schema, nulls, and error handling
96+
- pipeline:
97+
type: composite
98+
transforms:
99+
- type: Create
100+
config:
101+
elements:
102+
- {name: "Alice", age: 30, score: 95.5}
103+
- {name: "Bob", age: 25}
104+
- {name: "Charlie", age: 27, score: "apple"}
105+
- type: ValidateWithSchema
106+
input: Create
107+
config:
108+
schema:
109+
type: object
110+
properties:
111+
name:
112+
type: string
113+
age:
114+
type: integer
115+
score:
116+
type: number
117+
error_handling:
118+
output: invalid_rows
119+
# ValidateWithSchema outputs the element, error msg, and traceback, so
120+
# MapToFields is needed to easily assert downstream.
121+
- type: MapToFields
122+
input: ValidateWithSchema.invalid_rows
123+
config:
124+
language: python
125+
fields:
126+
name: "element.name"
127+
age: "element.age"
128+
score: "element.score"
129+
- type: AssertEqual
130+
input: ValidateWithSchema
131+
config:
132+
elements:
133+
- {name: "Alice", age: 30, score: 95.5}
134+
- {name: "Bob", age: 25}
135+
- type: AssertEqual
136+
input: MapToFields
137+
config:
138+
elements:
139+
- {name: "Charlie", age: 27, score: "apple"}

sdks/python/apache_beam/yaml/yaml_errors.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import apache_beam as beam
2323
from apache_beam.typehints.row_type import RowTypeConstraint
24+
from apache_beam.yaml.yaml_utils import SafeLineLoader
2425

2526

2627
class ErrorHandlingConfig(NamedTuple):
@@ -35,9 +36,11 @@ class ErrorHandlingConfig(NamedTuple):
3536

3637
def exception_handling_args(error_handling_spec):
3738
if error_handling_spec:
39+
# error_handling_spec may have come from a yaml file and have metadata.
40+
clean_spec = SafeLineLoader.strip_metadata(error_handling_spec)
3841
return {
3942
'dead_letter_tag' if k == 'output' else k: v
40-
for (k, v) in error_handling_spec.items()
43+
for (k, v) in clean_spec.items()
4144
}
4245
else:
4346
return None

sdks/python/apache_beam/yaml/yaml_mapping.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ def expand(self, pcoll):
487487
typing_from_runner_api(existing_fields[fld]))
488488

489489

490-
class _Validate(beam.PTransform):
490+
class Validate(beam.PTransform):
491491
"""Validates each element of a PCollection against a json schema.
492492
493493
Args:
@@ -982,7 +982,7 @@ def create_mapping_providers():
982982
'Partition-javascript': _Partition,
983983
'Partition-generic': _Partition,
984984
'StripErrorMetadata': _StripErrorMetadata,
985-
'ValidateWithSchema': _Validate,
985+
'ValidateWithSchema': Validate,
986986
}),
987987
yaml_provider.SqlBackedProvider({
988988
'Filter-sql': _SqlFilterTransform,

sdks/python/apache_beam/yaml/yaml_provider.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -759,9 +759,12 @@ def _unify_element_with_schema(element, target_schema):
759759
elif isinstance(element, dict):
760760
element_dict = element
761761
else:
762-
# This element is not a row, so it can't be unified with a
763-
# row schema.
764-
return element
762+
# This element is not a row-like object. If the target schema has a single
763+
# field, assume this element is the value for that field.
764+
if len(target_schema._fields) == 1:
765+
return target_schema(**{target_schema._fields[0]: element})
766+
else:
767+
return element
765768

766769
# Create new element with only the fields that exist in the original
767770
# element plus None for fields that are expected but missing

0 commit comments

Comments
 (0)