From 306035010f4e35a7f425c81bd869ea9502bfeac1 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 17 Nov 2025 14:36:17 -0500 Subject: [PATCH 1/2] Fix inconsist data type in GenericRecord and AvroSchema for AvroWriter --- .../avro/schemas/utils/AvroUtils.java | 22 ++++++++++++++++++ .../PortableBigQueryDestinations.java | 2 +- ...yFileLoadsSchemaTransformProviderTest.java | 23 +++++++++++++++---- 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 460bfaec4a36..7385e16665af 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -1214,6 +1214,16 @@ private static org.apache.avro.Schema getFieldSchema( return fieldType.getNullable() ? ReflectData.makeNullable(baseType) : baseType; } + private static final org.apache.avro.Schema INT_AVRO_TYPE = + org.apache.avro.Schema.create(Type.INT); + private static final org.apache.avro.Schema LONG_AVRO_TYPE = + org.apache.avro.Schema.create(Type.LONG); + private static final org.apache.avro.Schema FLOAT_AVRO_TYPE = + org.apache.avro.Schema.create(Type.FLOAT); + private static final org.apache.avro.Schema DOUBLE_AVRO_TYPE = + org.apache.avro.Schema.create(Type.DOUBLE); + + /** Convert a value from Beam Row to a vlue used for Avro GenericRecord. */ private static @Nullable Object genericFromBeamField( FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable Object value) { TypeWithNullability typeWithNullability = new TypeWithNullability(avroSchema); @@ -1230,6 +1240,18 @@ private static org.apache.avro.Schema getFieldSchema( return value; } + // Handle implicit up-cast: use avroSchema + if (INT_AVRO_TYPE.equals(typeWithNullability.type)) { + return ((Number) value).intValue(); + } else if (LONG_AVRO_TYPE.equals(typeWithNullability.type)) { + return ((Number) value).longValue(); + } else if (FLOAT_AVRO_TYPE.equals(typeWithNullability.type)) { + return ((Number) value).floatValue(); + } else if (DOUBLE_AVRO_TYPE.equals(typeWithNullability.type)) { + return ((Number) value).doubleValue(); + } + + // TODO: should we use Avro Schema as the source-of-truth in general? switch (fieldType.getTypeName()) { case BYTE: case INT16: diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java index 42eee4f3f03c..c927cec34735 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -122,7 +122,7 @@ public SerializableFunction, GenericRecord> getAvroFilterF row = checkStateNotNull(row.getRow(RECORD)); } Row filtered = rowFilter.filter(row); - return AvroUtils.toGenericRecord(filtered); + return AvroUtils.toGenericRecord(filtered, request.getSchema()); }; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java index 168febea9d88..7ba420e5b8c7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java @@ -70,13 +70,28 @@ public class BigQueryFileLoadsSchemaTransformProviderTest { new TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID); private static final Schema SCHEMA = - Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64)); + Schema.of( + Field.of("name", FieldType.STRING), + Field.of("number", FieldType.INT64), + Field.of("age", FieldType.INT32).withNullable(true)); private static final List ROWS = Arrays.asList( - Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(), - Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 2L).build(), - Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 3L).build()); + Row.withSchema(SCHEMA) + .withFieldValue("name", "a") + .withFieldValue("number", 1L) + .withFieldValue("age", 10) + .build(), + Row.withSchema(SCHEMA) + .withFieldValue("name", "b") + .withFieldValue("number", 2L) + .withFieldValue("age", 20) + .build(), + Row.withSchema(SCHEMA) + .withFieldValue("name", "c") + .withFieldValue("number", 3L) + .withFieldValue("age", null) + .build()); private static final BigQueryOptions OPTIONS = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); From 58b3c00541cc48249da8655e26da8f4721904048 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Tue, 18 Nov 2025 10:45:28 -0500 Subject: [PATCH 2/2] clean code --- .../avro/schemas/utils/AvroUtils.java | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 7385e16665af..1a8cac7ffb65 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -36,6 +36,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; import net.bytebuddy.description.type.TypeDescription.ForLoadedType; @@ -97,6 +98,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -1214,14 +1216,13 @@ private static org.apache.avro.Schema getFieldSchema( return fieldType.getNullable() ? ReflectData.makeNullable(baseType) : baseType; } - private static final org.apache.avro.Schema INT_AVRO_TYPE = - org.apache.avro.Schema.create(Type.INT); - private static final org.apache.avro.Schema LONG_AVRO_TYPE = - org.apache.avro.Schema.create(Type.LONG); - private static final org.apache.avro.Schema FLOAT_AVRO_TYPE = - org.apache.avro.Schema.create(Type.FLOAT); - private static final org.apache.avro.Schema DOUBLE_AVRO_TYPE = - org.apache.avro.Schema.create(Type.DOUBLE); + private static final Map> + NUMERIC_CONVERTERS = + ImmutableMap.of( + org.apache.avro.Schema.create(Type.INT), Number::intValue, + org.apache.avro.Schema.create(Type.LONG), Number::longValue, + org.apache.avro.Schema.create(Type.FLOAT), Number::floatValue, + org.apache.avro.Schema.create(Type.DOUBLE), Number::doubleValue); /** Convert a value from Beam Row to a vlue used for Avro GenericRecord. */ private static @Nullable Object genericFromBeamField( @@ -1240,15 +1241,8 @@ private static org.apache.avro.Schema getFieldSchema( return value; } - // Handle implicit up-cast: use avroSchema - if (INT_AVRO_TYPE.equals(typeWithNullability.type)) { - return ((Number) value).intValue(); - } else if (LONG_AVRO_TYPE.equals(typeWithNullability.type)) { - return ((Number) value).longValue(); - } else if (FLOAT_AVRO_TYPE.equals(typeWithNullability.type)) { - return ((Number) value).floatValue(); - } else if (DOUBLE_AVRO_TYPE.equals(typeWithNullability.type)) { - return ((Number) value).doubleValue(); + if (NUMERIC_CONVERTERS.containsKey(typeWithNullability.type)) { + return NUMERIC_CONVERTERS.get(typeWithNullability.type).apply((Number) value); } // TODO: should we use Avro Schema as the source-of-truth in general?