Skip to content

Commit d7fa511

Browse files
authored
[yaml] : add more error handling tests and website example (#37245)
* add more error handling and provider tests * provide example of provider with error handling
1 parent 7694450 commit d7fa511

File tree

2 files changed

+213
-0
lines changed

2 files changed

+213
-0
lines changed

sdks/python/apache_beam/yaml/yaml_transform_test.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import glob
2020
import logging
2121
import os
22+
import shutil
2223
import tempfile
2324
import unittest
2425

@@ -911,6 +912,60 @@ def test_must_handle_error_output(self):
911912
''',
912913
providers=TEST_PROVIDERS)
913914

915+
def test_error_handling_log_combined_errors(self):
916+
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
917+
pickle_library='cloudpickle')) as p:
918+
result = p | YamlTransform(
919+
'''
920+
type: composite
921+
transforms:
922+
- type: Create
923+
name: Input1
924+
config:
925+
elements: [1, 2, 0]
926+
- type: Create
927+
name: Input2
928+
config:
929+
elements: [3, 'a', 5]
930+
- type: MapToFields
931+
name: Inverse
932+
input: Input1
933+
config:
934+
language: python
935+
fields:
936+
inverse: "1 / element"
937+
error_handling:
938+
output: errors
939+
- type: MapToFields
940+
name: Square
941+
input: Input2
942+
config:
943+
language: python
944+
fields:
945+
square: "element * element"
946+
error_handling:
947+
output: errors
948+
- type: LogForTesting
949+
input:
950+
- Inverse.errors
951+
- Square.errors
952+
- type: Flatten
953+
name: GoodData
954+
input:
955+
- Inverse
956+
- Square
957+
output: GoodData
958+
''',
959+
providers=TEST_PROVIDERS)
960+
assert_that(
961+
result,
962+
equal_to([
963+
beam.Row(inverse=1.0, square=None),
964+
beam.Row(inverse=0.5, square=None),
965+
beam.Row(square=9, inverse=None),
966+
beam.Row(square=25, inverse=None)
967+
]))
968+
914969
def test_mapping_errors(self):
915970
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
916971
pickle_library='cloudpickle')) as p:
@@ -1297,6 +1352,106 @@ def test_prefers_same_provider_class(self):
12971352
label='StartWith3')
12981353

12991354

1355+
class TestExternalYamlProvider(unittest.TestCase):
1356+
def setUp(self):
1357+
self.temp_dir = tempfile.mkdtemp()
1358+
self.provider_path = os.path.join(self.temp_dir, 'power_provider.yaml')
1359+
with open(self.provider_path, 'w') as f:
1360+
f.write(
1361+
"""
1362+
- type: yaml
1363+
transforms:
1364+
RaiseElementToPower:
1365+
config_schema:
1366+
properties:
1367+
n: {type: integer}
1368+
body:
1369+
type: MapToFields
1370+
config:
1371+
language: python
1372+
append: true
1373+
fields:
1374+
power: "element ** {{n}}"
1375+
error_handling:
1376+
output: my_error
1377+
""")
1378+
1379+
def tearDown(self):
1380+
shutil.rmtree(self.temp_dir)
1381+
1382+
def test_provider_with_error_handling(self):
1383+
loaded_providers = yaml_provider.load_providers(self.provider_path)
1384+
test_providers = yaml_provider.InlineProvider(TEST_PROVIDERS)
1385+
merged_providers = yaml_provider.merge_providers(
1386+
loaded_providers, [test_providers])
1387+
1388+
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
1389+
pickle_library='cloudpickle')) as p:
1390+
results = p | YamlTransform(
1391+
'''
1392+
type: composite
1393+
transforms:
1394+
- type: Create
1395+
config:
1396+
elements: [2, 'bad', 3]
1397+
- type: RaiseElementToPower
1398+
input: Create
1399+
config:
1400+
n: 2
1401+
- type: PyMap
1402+
name: TrimErrors
1403+
input: RaiseElementToPower.my_error
1404+
config:
1405+
fn: "lambda x: x.msg"
1406+
output:
1407+
good: RaiseElementToPower.good
1408+
bad: TrimErrors
1409+
''',
1410+
providers=merged_providers)
1411+
1412+
assert_that(
1413+
results['good'],
1414+
equal_to([beam.Row(element=2, power=4), beam.Row(element=3,
1415+
power=9)]),
1416+
label="CheckGood")
1417+
assert_that(
1418+
results['bad'],
1419+
equal_to([
1420+
'TypeError("unsupported operand type(s) for ** or pow(): ' +
1421+
'\'str\' and \'int\'")'
1422+
]),
1423+
label="CheckBad")
1424+
1425+
def test_must_consume_error_output(self):
1426+
# By adding a dummy error_handling block here, we signal to the static
1427+
# checker that this transform has an error output that must be consumed.
1428+
# The framework is able to handle the "nesting" where the provider for
1429+
# RaiseElementToPower also defines error handling internally.
1430+
loaded_providers = yaml_provider.load_providers(self.provider_path)
1431+
test_providers = yaml_provider.InlineProvider(TEST_PROVIDERS)
1432+
merged_providers = yaml_provider.merge_providers(
1433+
loaded_providers, [test_providers])
1434+
1435+
with self.assertRaisesRegex(Exception, 'Unconsumed error output.*'):
1436+
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
1437+
pickle_library='cloudpickle')) as p:
1438+
_ = p | YamlTransform(
1439+
'''
1440+
type: composite
1441+
transforms:
1442+
- type: Create
1443+
config:
1444+
elements: [2, 'bad', 3]
1445+
- type: RaiseElementToPower
1446+
input: Create
1447+
config:
1448+
n: 2
1449+
error_handling:
1450+
output: my_error
1451+
''',
1452+
providers=merged_providers)
1453+
1454+
13001455
@beam.transforms.ptransform.annotate_yaml
13011456
class LinearTransform(beam.PTransform):
13021457
"""A transform used for testing annotate_yaml."""

website/www/site/content/en/documentation/sdks/yaml-errors.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,4 +218,62 @@ pipeline:
218218
path: /path/to/errors.json
219219
```
220220

221+
## Error Handling with Custom Providers
222+
Custom transforms, such as those defined in separate YAML files via a `YamlProvider`, can also expose error outputs from their underlying transforms.
223+
224+
Consider a file `my_transforms.yaml` that defines a `RaiseElementToPower` transform:
225+
```yaml
226+
# my_transforms.yaml
227+
- type: yaml
228+
transforms:
229+
RaiseElementToPower:
230+
config_schema:
231+
properties:
232+
n: {type: integer}
233+
body:
234+
type: MapToFields
235+
config:
236+
language: python
237+
append: true
238+
fields:
239+
power: "element ** {{n}}"
240+
# This transform internally defines and exposes an error output.
241+
error_handling:
242+
output: my_error
243+
```
244+
This transform takes a numeric element and raises it to the power of `n`. If the element is not a number, it will produce an error. The error output from the internal `MapToFields` is named `my_error`. This error output is automatically exposed by the `RaiseElementToPower` transform.
245+
246+
When using this transform in a pipeline, you can access this error output and handle it. The main output of the transform will contain only the successfully processed elements.
247+
248+
```yaml
249+
pipeline:
250+
transforms:
251+
- type: Create
252+
config:
253+
elements: [2, 'bad', 3]
254+
- type: RaiseElementToPower
255+
input: Create
256+
config:
257+
n: 2
258+
- type: WriteToJson
259+
name: WriteGood
260+
# The main output contains successfully processed elements.
261+
input: RaiseElementToPower
262+
config:
263+
path: /path/to/good
264+
- type: WriteToJson
265+
name: WriteBad
266+
# The error output is accessed by its name.
267+
input: RaiseElementToPower.my_error
268+
config:
269+
path: /path/to/bad
270+
271+
providers:
272+
- include: my_transforms.yaml
273+
274+
```
275+
In this example, the pipeline separates the good and bad records coming from the custom `RaiseElementToPower` transform. The good records are written to one location, and the error records are written to another.
276+
277+
A pipeline will fail at construction time if an error output is declared (either in a built-in transform or a custom one) but not consumed. This helps ensure that all error paths are considered.
278+
221279
See YAML schema [info](https://beam.apache.org/documentation/sdks/yaml-schema/) for another use of error_handling in a schema context.

0 commit comments

Comments
 (0)