Skip to content

Commit dc9aaba

Browse files
committed
Fix TypeError in avro_union_type_to_beam_type with complex union types
1 parent 3f3f214 commit dc9aaba

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

sdks/python/apache_beam/io/avroio.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,8 @@ def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType:
554554
"""
555555
if len(union_type) == 2 and "null" in union_type:
556556
for avro_type in union_type:
557-
if avro_type in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES:
557+
if isinstance(avro_type,
558+
str) and avro_type in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES:
558559
return schema_pb2.FieldType(
559560
atomic_type=AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES[avro_type],
560561
nullable=True)

sdks/python/apache_beam/io/avroio_test.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,34 @@ def test_avro_union_type_to_beam_type_with_string_long(self):
206206
expected_beam_type = schemas.typing_to_runner_api(Any)
207207
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
208208

209+
def test_avro_union_type_to_beam_type_with_record_and_null(self):
210+
record_type = {
211+
'type': 'record',
212+
'name': 'TestRecord',
213+
'fields': [{
214+
'name': 'field1', 'type': 'string'
215+
}, {
216+
'name': 'field2', 'type': 'int'
217+
}]
218+
}
219+
union_type = [record_type, 'null']
220+
beam_type = avro_union_type_to_beam_type(union_type)
221+
expected_beam_type = schemas.typing_to_runner_api(Any)
222+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
223+
224+
def test_avro_union_type_to_beam_type_with_nullable_annotated_string(self):
225+
union_type = [
226+
'null',
227+
'''
228+
{
229+
"avro.java.string": "String",
230+
"type": "string"
231+
}'''
232+
]
233+
beam_type = avro_union_type_to_beam_type(union_type)
234+
expected_beam_type = schemas.typing_to_runner_api(Any)
235+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
236+
209237
def test_avro_schema_to_beam_and_back(self):
210238
avro_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
211239
beam_schema = avro_schema_to_beam_schema(avro_schema)

0 commit comments

Comments
 (0)