diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 460bfaec4a36..1a8cac7ffb65 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -36,6 +36,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; import net.bytebuddy.description.type.TypeDescription.ForLoadedType; @@ -97,6 +98,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -1214,6 +1216,15 @@ private static org.apache.avro.Schema getFieldSchema( return fieldType.getNullable() ? ReflectData.makeNullable(baseType) : baseType; } + private static final Map> + NUMERIC_CONVERTERS = + ImmutableMap.of( + org.apache.avro.Schema.create(Type.INT), Number::intValue, + org.apache.avro.Schema.create(Type.LONG), Number::longValue, + org.apache.avro.Schema.create(Type.FLOAT), Number::floatValue, + org.apache.avro.Schema.create(Type.DOUBLE), Number::doubleValue); + + /** Convert a value from Beam Row to a vlue used for Avro GenericRecord. */ private static @Nullable Object genericFromBeamField( FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable Object value) { TypeWithNullability typeWithNullability = new TypeWithNullability(avroSchema); @@ -1230,6 +1241,11 @@ private static org.apache.avro.Schema getFieldSchema( return value; } + if (NUMERIC_CONVERTERS.containsKey(typeWithNullability.type)) { + return NUMERIC_CONVERTERS.get(typeWithNullability.type).apply((Number) value); + } + + // TODO: should we use Avro Schema as the source-of-truth in general? switch (fieldType.getTypeName()) { case BYTE: case INT16: diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java index 42eee4f3f03c..c927cec34735 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -122,7 +122,7 @@ public SerializableFunction, GenericRecord> getAvroFilterF row = checkStateNotNull(row.getRow(RECORD)); } Row filtered = rowFilter.filter(row); - return AvroUtils.toGenericRecord(filtered); + return AvroUtils.toGenericRecord(filtered, request.getSchema()); }; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java index 168febea9d88..7ba420e5b8c7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java @@ -70,13 +70,28 @@ public class BigQueryFileLoadsSchemaTransformProviderTest { new TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID); private static final Schema SCHEMA = - Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64)); + Schema.of( + Field.of("name", FieldType.STRING), + Field.of("number", FieldType.INT64), + Field.of("age", FieldType.INT32).withNullable(true)); private static final List ROWS = Arrays.asList( - Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(), - Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 2L).build(), - Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 3L).build()); + Row.withSchema(SCHEMA) + .withFieldValue("name", "a") + .withFieldValue("number", 1L) + .withFieldValue("age", 10) + .build(), + Row.withSchema(SCHEMA) + .withFieldValue("name", "b") + .withFieldValue("number", 2L) + .withFieldValue("age", 20) + .build(), + Row.withSchema(SCHEMA) + .withFieldValue("name", "c") + .withFieldValue("number", 3L) + .withFieldValue("age", null) + .build()); private static final BigQueryOptions OPTIONS = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);