Skip to content

Commit 151cb8c

Browse files
committed
Fix TypeError in avro_union_type_to_beam_type with complex union types
1 parent 3f3f214 commit 151cb8c

File tree

2 files changed

+42
-5
lines changed

2 files changed

+42
-5
lines changed

sdks/python/apache_beam/io/avroio.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -554,11 +554,11 @@ def avro_union_type_to_beam_type(union_type: List) -> schema_pb2.FieldType:
554554
"""
555555
if len(union_type) == 2 and "null" in union_type:
556556
for avro_type in union_type:
557-
if avro_type in AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES:
558-
return schema_pb2.FieldType(
559-
atomic_type=AVRO_PRIMITIVES_TO_BEAM_PRIMITIVES[avro_type],
560-
nullable=True)
561-
return schemas.typing_to_runner_api(Any)
557+
if avro_type != "null":
558+
beam_type = avro_type_to_beam_type(avro_type)
559+
if beam_type.WhichOneof("type_info") == "atomic_type":
560+
return schema_pb2.FieldType(
561+
atomic_type=beam_type.atomic_type, nullable=True)
562562
return schemas.typing_to_runner_api(Any)
563563

564564

sdks/python/apache_beam/io/avroio_test.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,43 @@ def test_avro_union_type_to_beam_type_with_string_long(self):
206206
expected_beam_type = schemas.typing_to_runner_api(Any)
207207
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
208208

209+
def test_avro_union_type_to_beam_type_with_record_and_null(self):
210+
record_type = {
211+
'type': 'record',
212+
'name': 'TestRecord',
213+
'fields': [{
214+
'name': 'field1', 'type': 'string'
215+
}, {
216+
'name': 'field2', 'type': 'int'
217+
}]
218+
}
219+
union_type = [record_type, 'null']
220+
beam_type = avro_union_type_to_beam_type(union_type)
221+
expected_beam_type = schemas.typing_to_runner_api(Any)
222+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
223+
224+
def test_avro_union_type_to_beam_type_with_nullable_annotated_string(self):
225+
annotated_string_type = {"avro.java.string": "String", "type": "string"}
226+
union_type = ['null', annotated_string_type]
227+
228+
beam_type = avro_union_type_to_beam_type(union_type)
229+
230+
expected_beam_type = schema_pb2.FieldType(
231+
atomic_type=schema_pb2.STRING, nullable=True)
232+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
233+
234+
def test_avro_union_type_to_beam_type_with_only_null(self):
235+
union_type = ['null']
236+
beam_type = avro_union_type_to_beam_type(union_type)
237+
expected_beam_type = schemas.typing_to_runner_api(Any)
238+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
239+
240+
def test_avro_union_type_to_beam_type_with_multiple_types(self):
241+
union_type = ['null', 'string', 'int']
242+
beam_type = avro_union_type_to_beam_type(union_type)
243+
expected_beam_type = schemas.typing_to_runner_api(Any)
244+
hc.assert_that(beam_type, hc.equal_to(expected_beam_type))
245+
209246
def test_avro_schema_to_beam_and_back(self):
210247
avro_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
211248
beam_schema = avro_schema_to_beam_schema(avro_schema)

0 commit comments

Comments
 (0)