Skip to content

Commit 27085de

Browse files
authored
Fix TypeError in avro_union_type_to_beam_type with complex union types (#35459)
1 parent ee590cc commit 27085de

File tree

2 files changed

+53
-6
lines changed

2 files changed

+53
-6
lines changed

sdks/python/apache_beam/io/avroio.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType:
543543
"""convert an avro union type to a beam type
544544
545545
if the union type is a nullable, and it is a nullable union of an avro
546-
primitive with a corresponding beam primitive then create a nullable beam
546+
type with a corresponding beam type then create a nullable beam
547547
field of the corresponding beam type, otherwise return an Any type.
548548
549549
Args:
@@ -554,11 +554,10 @@ def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType:
554554
"""
555555
if len(union_type) == 2 and "null" in union_type:
556556
for avro_type in union_type:
557-
if avro_type in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES:
558-
return schema_pb2.FieldType(
559-
atomic_type=AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES[avro_type],
560-
nullable=True)
561-
return schemas.typing_to_runner_api(Any)
557+
if avro_type != "null":
558+
beam_type = avro_type_to_beam_type(avro_type)
559+
beam_type.nullable = True
560+
return beam_type
562561
return schemas.typing_to_runner_api(Any)
563562

564563

sdks/python/apache_beam/io/avroio_test.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,54 @@ def test_avro_union_type_to_beam_type_with_string_long(self):
206206
expected_beam_type = schemas.typing_to_runner_api(Any)
207207
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
208208

209+
def test_avro_union_type_to_beam_type_with_record_and_null(self):
210+
record_type = {
211+
'type': 'record',
212+
'name': 'TestRecord',
213+
'fields': [{
214+
'name': 'field1', 'type': 'string'
215+
}, {
216+
'name': 'field2', 'type': 'int'
217+
}]
218+
}
219+
union_type = [record_type, 'null']
220+
beam_type = avro_union_type_to_beam_type(union_type)
221+
expected_beam_type = schema_pb2.FieldType(
222+
row_type=schema_pb2.RowType(
223+
schema=schema_pb2.Schema(
224+
fields=[
225+
schemas.schema_field(
226+
'field1',
227+
schema_pb2.FieldType(atomic_type=schema_pb2.STRING)),
228+
schemas.schema_field(
229+
'field2',
230+
schema_pb2.FieldType(atomic_type=schema_pb2.INT32))
231+
])),
232+
nullable=True)
233+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
234+
235+
def test_avro_union_type_to_beam_type_with_nullable_annotated_string(self):
236+
annotated_string_type = {"avro.java.string": "String", "type": "string"}
237+
union_type = ['null', annotated_string_type]
238+
239+
beam_type = avro_union_type_to_beam_type(union_type)
240+
241+
expected_beam_type = schema_pb2.FieldType(
242+
atomic_type=schema_pb2.STRING, nullable=True)
243+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
244+
245+
def test_avro_union_type_to_beam_type_with_only_null(self):
246+
union_type = ['null']
247+
beam_type = avro_union_type_to_beam_type(union_type)
248+
expected_beam_type = schemas.typing_to_runner_api(Any)
249+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
250+
251+
def test_avro_union_type_to_beam_type_with_multiple_types(self):
252+
union_type = ['null', 'string', 'int']
253+
beam_type = avro_union_type_to_beam_type(union_type)
254+
expected_beam_type = schemas.typing_to_runner_api(Any)
255+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
256+
209257
def test_avro_schema_to_beam_and_back(self):
210258
avro_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
211259
beam_schema = avro_schema_to_beam_schema(avro_schema)

0 commit comments

Comments
 (0)