diff --git a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto index 407a803644ef..060bbccbd757 100644 --- a/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto +++ b/sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto @@ -33,6 +33,51 @@ import "proto3_schema_options.proto"; option java_package = "org.apache.beam.sdk.extensions.protobuf"; +message PrimitiveEncodedFields { + int64 encoded_timestamp = 1; + int32 encoded_date = 2; + bytes encoded_numeric = 3; + bytes encoded_bignumeric = 4; + int64 encoded_packed_datetime = 5; + int64 encoded_packed_time = 6; +} + +message NestedEncodedFields { + PrimitiveEncodedFields nested = 1; + repeated PrimitiveEncodedFields nested_list = 2; +} + +message PrimitiveUnEncodedFields { + string timestamp = 1; + string date = 2; + string numeric = 3; + string bignumeric = 4; + string datetime = 5; + string time = 6; +} + +message NestedUnEncodedFields { + PrimitiveUnEncodedFields nested = 1; + repeated PrimitiveUnEncodedFields nested_list = 2; +} + +message WrapperUnEncodedFields { + google.protobuf.FloatValue float = 1; + google.protobuf.DoubleValue double = 2; + google.protobuf.BoolValue bool = 3; + google.protobuf.Int32Value int32 = 4; + google.protobuf.Int64Value int64 = 5; + google.protobuf.UInt32Value uint32 = 6; + google.protobuf.UInt64Value uint64 = 7; + google.protobuf.BytesValue bytes = 8; + google.protobuf.Timestamp timestamp = 9; +} + +message NestedWrapperUnEncodedFields { + WrapperUnEncodedFields nested = 1; + repeated WrapperUnEncodedFields nested_list = 2; +} + message Primitive { double primitive_double = 1; float primitive_float = 2; @@ -287,4 +332,4 @@ message NoWrapPrimitive { optional bool bool = 13; optional string string = 14; optional bytes bytes = 15; -} \ No newline at end of file +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index d8d89bdb74b2..4761c8074283 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -182,6 +182,7 @@ public ByteString mergeNewFields( public TableRow toTableRow(ByteString protoBytes, Predicate includeField) { try { return TableRowToStorageApiProto.tableRowFromMessage( + getSchemaInformation(), DynamicMessage.parseFrom( TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()), protoBytes), true, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e3f9de3b7ab3..986eebeeb05a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -42,6 +42,7 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.cloud.bigquery.storage.v1.ReadStream; import com.google.gson.JsonArray; @@ -119,6 +120,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -2297,10 +2299,79 @@ public static Write writeProtos(Class protoMessageClas if (DynamicMessage.class.equals(protoMessageClass)) { throw new IllegalArgumentException("DynamicMessage is not supported."); } - return BigQueryIO.write() - .withFormatFunction( - m -> TableRowToStorageApiProto.tableRowFromMessage(m, false, Predicates.alwaysTrue())) - .withWriteProtosClass(protoMessageClass); + try { + return BigQueryIO.write() + .toBuilder() + .setFormatFunction(FormatProto.fromClass(protoMessageClass)) + .build() + .withWriteProtosClass(protoMessageClass); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + abstract static class TableRowFormatFunction + implements SerializableBiFunction< + TableRowToStorageApiProto.@Nullable SchemaInformation, T, TableRow> { + static TableRowFormatFunction fromSerializableFunction( + SerializableFunction serializableFunction) { + return new TableRowFormatFunction() { + @Override + public TableRow apply( + TableRowToStorageApiProto.@Nullable SchemaInformation schemaInformation, T t) { + return serializableFunction.apply(t); + } + }; + } + + SerializableFunction toSerializableFunction() { + return input -> apply(null, input); + } + } + + private static class FormatProto extends TableRowFormatFunction { + transient TableRowToStorageApiProto.SchemaInformation inferredSchemaInformation; + final Class protoMessageClass; + + FormatProto(Class protoMessageClass) { + this.protoMessageClass = protoMessageClass; + } + + TableRowToStorageApiProto.SchemaInformation inferSchemaInformation() { + try { + if (inferredSchemaInformation == null) { + Descriptors.Descriptor descriptor = + (Descriptors.Descriptor) + org.apache.beam.sdk.util.Preconditions.checkStateNotNull( + protoMessageClass.getMethod("getDescriptor")) + .invoke(null); + Descriptors.Descriptor convertedDescriptor = + TableRowToStorageApiProto.wrapDescriptorProto( + ProtoSchemaConverter.convert(descriptor).getProtoDescriptor()); + TableSchema tableSchema = + TableRowToStorageApiProto.protoSchemaToTableSchema( + TableRowToStorageApiProto.tableSchemaFromDescriptor(convertedDescriptor)); + this.inferredSchemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema); + } + return inferredSchemaInformation; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + static FormatProto fromClass(Class protoMessageClass) + throws Exception { + return new FormatProto<>(protoMessageClass); + } + + @Override + public TableRow apply(TableRowToStorageApiProto.SchemaInformation schemaInformation, T input) { + TableRowToStorageApiProto.SchemaInformation localSchemaInformation = + schemaInformation != null ? schemaInformation : inferSchemaInformation(); + return TableRowToStorageApiProto.tableRowFromMessage( + localSchemaInformation, input, false, Predicates.alwaysTrue()); + } } /** Implementation of {@link #write}. */ @@ -2354,9 +2425,9 @@ public enum Method { abstract @Nullable SerializableFunction, TableDestination> getTableFunction(); - abstract @Nullable SerializableFunction getFormatFunction(); + abstract @Nullable TableRowFormatFunction getFormatFunction(); - abstract @Nullable SerializableFunction getFormatRecordOnFailureFunction(); + abstract @Nullable TableRowFormatFunction getFormatRecordOnFailureFunction(); abstract RowWriterFactory.@Nullable AvroRowWriterFactory getAvroRowWriterFactory(); @@ -2467,10 +2538,10 @@ abstract static class Builder { abstract Builder setTableFunction( SerializableFunction, TableDestination> tableFunction); - abstract Builder setFormatFunction(SerializableFunction formatFunction); + abstract Builder setFormatFunction(TableRowFormatFunction formatFunction); abstract Builder setFormatRecordOnFailureFunction( - SerializableFunction formatFunction); + TableRowFormatFunction formatFunction); abstract Builder setAvroRowWriterFactory( RowWriterFactory.AvroRowWriterFactory avroRowWriterFactory); @@ -2718,7 +2789,9 @@ public Write to(DynamicDestinations dynamicDestinations) { /** Formats the user's type into a {@link TableRow} to be written to BigQuery. */ public Write withFormatFunction(SerializableFunction formatFunction) { - return toBuilder().setFormatFunction(formatFunction).build(); + return toBuilder() + .setFormatFunction(TableRowFormatFunction.fromSerializableFunction(formatFunction)) + .build(); } /** @@ -2733,7 +2806,10 @@ public Write withFormatFunction(SerializableFunction formatFunct */ public Write withFormatRecordOnFailureFunction( SerializableFunction formatFunction) { - return toBuilder().setFormatRecordOnFailureFunction(formatFunction).build(); + return toBuilder() + .setFormatRecordOnFailureFunction( + TableRowFormatFunction.fromSerializableFunction(formatFunction)) + .build(); } /** @@ -3599,9 +3675,8 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) { private WriteResult expandTyped( PCollection input, DynamicDestinations dynamicDestinations) { boolean optimizeWrites = getOptimizeWrites(); - SerializableFunction formatFunction = getFormatFunction(); - SerializableFunction formatRecordOnFailureFunction = - getFormatRecordOnFailureFunction(); + TableRowFormatFunction formatFunction = getFormatFunction(); + TableRowFormatFunction formatRecordOnFailureFunction = getFormatRecordOnFailureFunction(); RowWriterFactory.AvroRowWriterFactory avroRowWriterFactory = (RowWriterFactory.AvroRowWriterFactory) getAvroRowWriterFactory(); @@ -3623,7 +3698,9 @@ private WriteResult expandTyped( // If no format function set, then we will automatically convert the input type to a // TableRow. // TODO: it would be trivial to convert to avro records here instead. - formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction()); + formatFunction = + TableRowFormatFunction.fromSerializableFunction( + BigQueryUtils.toTableRow(input.getToRowFunction())); } // Infer the TableSchema from the input Beam schema. // TODO: If the user provided a schema, we should use that. There are things that can be @@ -3769,8 +3846,8 @@ private WriteResult continueExpandTyped( getCreateDisposition(), dynamicDestinations, elementCoder, - tableRowWriterFactory.getToRowFn(), - tableRowWriterFactory.getToFailsafeRowFn()) + tableRowWriterFactory.getToRowFn().toSerializableFunction(), + tableRowWriterFactory.getToFailsafeRowFn().toSerializableFunction()) .withInsertRetryPolicy(retryPolicy) .withTestServices(getBigQueryServices()) .withExtendedErrorInfo(getExtendedErrorInfo()) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index d58d6b8d609a..d519ea4016ff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -20,7 +20,6 @@ import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray; import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray; -import com.google.api.services.bigquery.model.TableRow; import com.google.auto.service.AutoService; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.DataFormat; @@ -641,14 +640,14 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { if (formatFunctionBytes != null) { builder = builder.setFormatFunction( - (SerializableFunction) fromByteArray(formatFunctionBytes)); + (BigQueryIO.TableRowFormatFunction) fromByteArray(formatFunctionBytes)); } byte[] formatRecordOnFailureFunctionBytes = configRow.getBytes("format_record_on_failure_function"); if (formatRecordOnFailureFunctionBytes != null) { builder = builder.setFormatRecordOnFailureFunction( - (SerializableFunction) + (BigQueryIO.TableRowFormatFunction) fromByteArray(formatRecordOnFailureFunctionBytes)); } byte[] avroRowWriterFactoryBytes = configRow.getBytes("avro_row_writer_factory"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 060560d5cade..21f2e64cd1f8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -34,6 +34,8 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -169,11 +171,46 @@ public abstract static class Builder { } private static final String BIGQUERY_TIME_PATTERN = "HH:mm:ss[.SSSSSS]"; - private static final java.time.format.DateTimeFormatter BIGQUERY_TIME_FORMATTER = + static final java.time.format.DateTimeFormatter BIGQUERY_TIME_FORMATTER = java.time.format.DateTimeFormatter.ofPattern(BIGQUERY_TIME_PATTERN); - private static final java.time.format.DateTimeFormatter BIGQUERY_DATETIME_FORMATTER = + static final java.time.format.DateTimeFormatter BIGQUERY_DATETIME_FORMATTER = java.time.format.DateTimeFormatter.ofPattern("uuuu-MM-dd'T'" + BIGQUERY_TIME_PATTERN); + // Custom formatter that accepts "2022-05-09 18:04:59.123456" + // The old dremel parser accepts this format, and so does insertall. We need to accept it + // for backwards compatibility, and it is based on UTC time. + static final java.time.format.DateTimeFormatter DATETIME_SPACE_FORMATTER = + new java.time.format.DateTimeFormatterBuilder() + .append(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE) + .optionalStart() + .appendLiteral(' ') + .optionalEnd() + .optionalStart() + .appendLiteral('T') + .optionalEnd() + .append(java.time.format.DateTimeFormatter.ISO_LOCAL_TIME) + .toFormatter() + .withZone(ZoneOffset.UTC); + + static final java.time.format.DateTimeFormatter TIMESTAMP_FORMATTER = + new java.time.format.DateTimeFormatterBuilder() + // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS' + .append(DATETIME_SPACE_FORMATTER) + // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS(+HH:mm:ss|Z)' + .optionalStart() + .appendOffsetId() + .optionalEnd() + .optionalStart() + .appendOffset("+HH:mm", "+00:00") + .optionalEnd() + // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS [time_zone]', time_zone -> UTC, Asia/Kolkata, etc + // if both an offset and a time zone are provided, the offset takes precedence + .optionalStart() + .appendLiteral(' ') + .parseCaseSensitive() + .appendZoneRegionId() + .toFormatter(); + private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PRINTER; /** @@ -747,7 +784,11 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(value); } catch (NumberFormatException e) { // Handle as a String, ie. "2023-02-16 12:00:00" - return LocalDateTime.parse(jsonBQString, BIGQUERY_DATETIME_FORMATTER); + try { + return LocalDateTime.parse(jsonBQString); + } catch (DateTimeParseException e2) { + return LocalDateTime.parse(jsonBQString, DATETIME_SPACE_FORMATTER); + } } } else if (fieldType.isLogicalType(SqlTypes.DATE.getIdentifier())) { return LocalDate.parse(jsonBQString); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java index 21bf9ae74adf..cc5c97ed0d3a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import java.io.Serializable; import org.apache.avro.Schema; @@ -41,29 +40,29 @@ abstract BigQueryRowWriter createRowWriter( String tempFilePrefix, DestinationT destination) throws Exception; static RowWriterFactory tableRows( - SerializableFunction toRow, - SerializableFunction toFailsafeRow) { + BigQueryIO.TableRowFormatFunction toRow, + BigQueryIO.TableRowFormatFunction toFailsafeRow) { return new TableRowWriterFactory(toRow, toFailsafeRow); } static final class TableRowWriterFactory extends RowWriterFactory { - private final SerializableFunction toRow; - private final SerializableFunction toFailsafeRow; + private final BigQueryIO.TableRowFormatFunction toRow; + private final BigQueryIO.TableRowFormatFunction toFailsafeRow; private TableRowWriterFactory( - SerializableFunction toRow, - SerializableFunction toFailsafeRow) { + BigQueryIO.TableRowFormatFunction toRow, + BigQueryIO.TableRowFormatFunction toFailsafeRow) { this.toRow = toRow; this.toFailsafeRow = toFailsafeRow; } - public SerializableFunction getToRowFn() { + public BigQueryIO.TableRowFormatFunction getToRowFn() { return toRow; } - public SerializableFunction getToFailsafeRowFn() { + public BigQueryIO.TableRowFormatFunction getToFailsafeRowFn() { if (toFailsafeRow == null) { return toRow; } @@ -76,9 +75,10 @@ public OutputType getOutputType() { } @Override + @SuppressWarnings("nullness") public BigQueryRowWriter createRowWriter( String tempFilePrefix, DestinationT destination) throws Exception { - return new TableRowWriter<>(tempFilePrefix, toRow); + return new TableRowWriter<>(tempFilePrefix, toRow.toSerializableFunction()); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java index fd5fe27f0c7c..21abde7d256c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java @@ -22,20 +22,23 @@ import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Message; -import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.Row; import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; /** Storage API DynamicDestinations used when the input is a Beam Row. */ class StorageApiDynamicDestinationsBeamRow extends StorageApiDynamicDestinations { private final TableSchema tableSchema; private final SerializableFunction toRow; - private final @Nullable SerializableFunction formatRecordOnFailureFunction; + private final @Nullable SerializableBiFunction< + TableRowToStorageApiProto.@Nullable SchemaInformation, T, TableRow> + formatRecordOnFailureFunction; private final boolean usesCdc; @@ -43,7 +46,9 @@ class StorageApiDynamicDestinationsBeamRow inner, Schema schema, SerializableFunction toRow, - @Nullable SerializableFunction formatRecordOnFailureFunction, + @Nullable + SerializableBiFunction + formatRecordOnFailureFunction, boolean usesCdc) { super(inner); this.tableSchema = BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(schema); @@ -108,7 +113,7 @@ public StorageApiWritePayload toMessage( @Override public TableRow toFailsafeTableRow(T element) { if (formatRecordOnFailureFunction != null) { - return formatRecordOnFailureFunction.apply(element); + return formatRecordOnFailureFunction.apply(null, element); } else { return BigQueryUtils.toTableRow(toRow.apply(element)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java index a387495863a2..0948876a46f9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java @@ -36,8 +36,7 @@ class StorageApiDynamicDestinationsGenericRecord, GenericRecord> toGenericRecord; private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory; - private final @javax.annotation.Nullable SerializableFunction - formatRecordOnFailureFunction; + private final BigQueryIO.@Nullable TableRowFormatFunction formatRecordOnFailureFunction; private boolean usesCdc; @@ -45,7 +44,7 @@ class StorageApiDynamicDestinationsGenericRecord inner, SerializableFunction<@Nullable TableSchema, Schema> schemaFactory, SerializableFunction, GenericRecord> toGenericRecord, - @Nullable SerializableFunction formatRecordOnFailureFunction, + BigQueryIO.@Nullable TableRowFormatFunction formatRecordOnFailureFunction, boolean usesCdc) { super(inner); this.toGenericRecord = toGenericRecord; @@ -110,7 +109,7 @@ public StorageApiWritePayload toMessage( @Override public TableRow toFailsafeTableRow(T element) { if (formatRecordOnFailureFunction != null) { - return formatRecordOnFailureFunction.apply(element); + return formatRecordOnFailureFunction.apply(null, element); } else { return BigQueryUtils.convertGenericRecordToTableRow( toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), bqTableSchema); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java index 7f4ec4a77d0b..544c1dc28e53 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java @@ -27,7 +27,6 @@ import java.lang.reflect.InvocationTargetException; import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.checkerframework.checker.nullness.qual.NonNull; @@ -36,13 +35,13 @@ class StorageApiDynamicDestinationsProto extends StorageApiDynamicDestinations { private final DescriptorProtos.DescriptorProto descriptorProto; - private final @Nullable SerializableFunction formatRecordOnFailureFunction; + private final @Nullable BigQueryIO.TableRowFormatFunction formatRecordOnFailureFunction; @SuppressWarnings({"unchecked", "nullness"}) StorageApiDynamicDestinationsProto( DynamicDestinations inner, Class protoClass, - @Nullable SerializableFunction formatRecordOnFailureFunction) { + @Nullable BigQueryIO.TableRowFormatFunction formatRecordOnFailureFunction) { super(inner); try { this.formatRecordOnFailureFunction = formatRecordOnFailureFunction; @@ -66,9 +65,11 @@ public MessageConverter getMessageConverter( class Converter implements MessageConverter { TableSchema tableSchema; + transient @Nullable TableRowToStorageApiProto.SchemaInformation schemaInformation; Converter(TableSchema tableSchema) { this.tableSchema = tableSchema; + this.schemaInformation = null; } @Override @@ -76,6 +77,14 @@ public TableSchema getTableSchema() { return tableSchema; } + public TableRowToStorageApiProto.SchemaInformation getSchemaInformation() { + if (this.schemaInformation == null) { + this.schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema); + } + return this.schemaInformation; + } + @Override public DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) throws Exception { @@ -97,13 +106,15 @@ public StorageApiWritePayload toMessage( formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null); } + @SuppressWarnings("nullness") @Override public TableRow toFailsafeTableRow(T element) { if (formatRecordOnFailureFunction != null) { - return formatRecordOnFailureFunction.apply(element); + return formatRecordOnFailureFunction.apply(schemaInformation, element); } else { try { return TableRowToStorageApiProto.tableRowFromMessage( + getSchemaInformation(), DynamicMessage.parseFrom( TableRowToStorageApiProto.wrapDescriptorProto(descriptorProto), element.toByteArray()), diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java index 08588cfc7850..2438515b8770 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java @@ -27,7 +27,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.NonNull; @@ -35,8 +34,8 @@ public class StorageApiDynamicDestinationsTableRow extends StorageApiDynamicDestinations { - private final SerializableFunction formatFunction; - private final @Nullable SerializableFunction formatRecordOnFailureFunction; + private final BigQueryIO.TableRowFormatFunction formatFunction; + private final @Nullable BigQueryIO.TableRowFormatFunction formatRecordOnFailureFunction; private final boolean usesCdc; private final CreateDisposition createDisposition; @@ -51,8 +50,8 @@ public class StorageApiDynamicDestinationsTableRow inner, - SerializableFunction formatFunction, - @Nullable SerializableFunction formatRecordOnFailureFunction, + BigQueryIO.TableRowFormatFunction formatFunction, + @Nullable BigQueryIO.TableRowFormatFunction formatRecordOnFailureFunction, boolean usesCdc, CreateDisposition createDisposition, boolean ignoreUnknownValues, @@ -156,16 +155,16 @@ public DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) @Override public TableRow toFailsafeTableRow(T element) { if (formatRecordOnFailureFunction != null) { - return formatRecordOnFailureFunction.apply(element); + return formatRecordOnFailureFunction.apply(schemaInformation, element); } else { - return formatFunction.apply(element); + return formatFunction.apply(schemaInformation, element); } } @Override public StorageApiWritePayload toMessage( T element, @Nullable RowMutationInformation rowMutationInformation) throws Exception { - TableRow tableRow = formatFunction.apply(element); + TableRow tableRow = formatFunction.apply(schemaInformation, element); String changeType = null; String changeSequenceNum = null; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 5553713923cb..f6d10b47ccf2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -655,11 +655,12 @@ long flush( @Nullable TableRow failedRow = failsafeTableRows.get(i); if (failedRow == null) { ByteString rowBytes = inserts.getSerializedRows(i); + AppendClientInfo aci = getAppendClientInfo(true, null); failedRow = TableRowToStorageApiProto.tableRowFromMessage( + aci.getSchemaInformation(), DynamicMessage.parseFrom( - TableRowToStorageApiProto.wrapDescriptorProto( - getAppendClientInfo(true, null).getDescriptor()), + TableRowToStorageApiProto.wrapDescriptorProto(aci.getDescriptor()), rowBytes), true, successfulRowsPredicate); @@ -739,12 +740,13 @@ long flush( if (failedRow == null) { ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); + AppendClientInfo aci = Preconditions.checkStateNotNull(appendClientInfo); failedRow = TableRowToStorageApiProto.tableRowFromMessage( + aci.getSchemaInformation(), DynamicMessage.parseFrom( TableRowToStorageApiProto.wrapDescriptorProto( - Preconditions.checkStateNotNull(appendClientInfo) - .getDescriptor()), + aci.getDescriptor()), protoBytes), true, Predicates.alwaysTrue()); @@ -897,6 +899,8 @@ long flush( try { TableRow row = TableRowToStorageApiProto.tableRowFromMessage( + Preconditions.checkStateNotNull(appendClientInfo) + .getSchemaInformation(), DynamicMessage.parseFrom(descriptor, rowBytes), true, successfulRowsPredicate); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index f9874d6ab419..c5451b04a4b2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static java.util.stream.Collectors.toList; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.DATETIME_SPACE_FORMATTER; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.TIMESTAMP_FORMATTER; import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableRow; @@ -44,21 +46,24 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.math.RoundingMode; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; import java.time.DateTimeException; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; import java.time.format.DateTimeParseException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.function.Predicate; import java.util.function.Supplier; @@ -71,6 +76,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -83,42 +89,6 @@ * with the Storage write API. */ public class TableRowToStorageApiProto { - - // Custom formatter that accepts "2022-05-09 18:04:59.123456" - // The old dremel parser accepts this format, and so does insertall. We need to accept it - // for backwards compatibility, and it is based on UTC time. - private static final DateTimeFormatter DATETIME_SPACE_FORMATTER = - new DateTimeFormatterBuilder() - .append(DateTimeFormatter.ISO_LOCAL_DATE) - .optionalStart() - .appendLiteral(' ') - .optionalEnd() - .optionalStart() - .appendLiteral('T') - .optionalEnd() - .append(DateTimeFormatter.ISO_LOCAL_TIME) - .toFormatter() - .withZone(ZoneOffset.UTC); - - private static final DateTimeFormatter TIMESTAMP_FORMATTER = - new DateTimeFormatterBuilder() - // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS' - .append(DATETIME_SPACE_FORMATTER) - // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS(+HH:mm:ss|Z)' - .optionalStart() - .appendOffsetId() - .optionalEnd() - .optionalStart() - .appendOffset("+HH:mm", "+00:00") - .optionalEnd() - // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS [time_zone]', time_zone -> UTC, Asia/Kolkata, etc - // if both an offset and a time zone are provided, the offset takes precedence - .optionalStart() - .appendLiteral(' ') - .parseCaseSensitive() - .appendZoneRegionId() - .toFormatter(); - abstract static class SchemaConversionException extends Exception { SchemaConversionException(String msg) { super(msg); @@ -146,12 +116,13 @@ public static class SchemaDoesntMatchException extends SchemaConversionException } public static class SingleValueConversionException extends SchemaConversionException { - SingleValueConversionException(Object sourceValue, SchemaInformation schema, Exception e) { + SingleValueConversionException( + Object sourceValue, TableFieldSchema.Type type, String fullName, Exception e) { super( "Column: " - + getPrettyFieldName(schema) + + getPrettyFieldName(fullName) + " (" - + schema.getType() + + type + "). " + "Value: " + sourceValue @@ -161,8 +132,7 @@ public static class SingleValueConversionException extends SchemaConversionExcep + e); } - private static String getPrettyFieldName(SchemaInformation schema) { - String fullName = schema.getFullName(); + private static String getPrettyFieldName(String fullName) { String rootPrefix = "root."; return fullName.startsWith(rootPrefix) ? fullName.substring(rootPrefix.length()) : fullName; } @@ -221,6 +191,233 @@ private static String getPrettyFieldName(SchemaInformation schema) { .put(TableFieldSchema.Type.JSON, "JSON") .build(); + @FunctionalInterface + public interface ThrowingBiFunction { + OutputT apply(FirstInputT t, SecondInputT u) throws SchemaConversionException; + } + + static final DecimalFormat DECIMAL_FORMAT = + new DecimalFormat("0.0###############", DecimalFormatSymbols.getInstance(Locale.ROOT)); + + // Map of functions to convert json values into the value expected in the Vortex proto object. + static final Map> + TYPE_MAP_PROTO_CONVERTERS = + ImmutableMap + .> + builder() + .put( + TableFieldSchema.Type.INT64, + (fullName, value) -> { + if (value instanceof String) { + try { + return Long.valueOf((String) value); + } catch (NumberFormatException e) { + throw new SingleValueConversionException( + value, TableFieldSchema.Type.INT64, fullName, e); + } + } else if (value instanceof Integer || value instanceof Long) { + return ((Number) value).longValue(); + } else if (value instanceof BigDecimal) { + try { + return ((BigDecimal) value).longValueExact(); + } catch (ArithmeticException e) { + throw new SingleValueConversionException( + value, TableFieldSchema.Type.INT64, fullName, e); + } + } else if (value instanceof BigInteger) { + try { + return ((BigInteger) value).longValueExact(); + } catch (ArithmeticException e) { + throw new SingleValueConversionException( + value, TableFieldSchema.Type.INT64, fullName, e); + } + } + return null; + }) + .put( + TableFieldSchema.Type.DOUBLE, + (schemaInformation, value) -> { + if (value instanceof String) { + return Double.valueOf((String) value); + } else if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + return null; + }) + .put( + TableFieldSchema.Type.BOOL, + (schemaInformation, value) -> { + if (value instanceof String) { + return Boolean.valueOf((String) value); + } else if (value instanceof Boolean) { + return value; + } + return null; + }) + .put( + TableFieldSchema.Type.BYTES, + (schemaInformation, value) -> { + if (value instanceof String) { + return ByteString.copyFrom(BaseEncoding.base64().decode((String) value)); + } else if (value instanceof byte[]) { + return ByteString.copyFrom((byte[]) value); + } else if (value instanceof ByteString) { + return value; + } + return null; + }) + .put( + TableFieldSchema.Type.TIMESTAMP, + (schemaInformation, value) -> { + if (value instanceof String) { + try { + // '2011-12-03T10:15:30Z', '2011-12-03 10:15:30+05:00' + // '2011-12-03 10:15:30 UTC', '2011-12-03T10:15:30 America/New_York' + Instant timestamp = Instant.from(TIMESTAMP_FORMATTER.parse((String) value)); + return toEpochMicros(timestamp); + } catch (DateTimeException e) { + try { + // for backwards compatibility, default time zone is UTC for values with + // no time-zone + // '2011-12-03T10:15:30' + Instant timestamp = + Instant.from( + TIMESTAMP_FORMATTER + .withZone(ZoneOffset.UTC) + .parse((String) value)); + return toEpochMicros(timestamp); + } catch (DateTimeParseException err) { + // "12345667" + Instant timestamp = Instant.ofEpochMilli(Long.parseLong((String) value)); + return toEpochMicros(timestamp); + } + } + } else if (value instanceof Instant) { + return toEpochMicros((Instant) value); + } else if (value instanceof org.joda.time.Instant) { + // joda instant precision is millisecond + return ((org.joda.time.Instant) value).getMillis() * 1000L; + } else if (value instanceof Integer || value instanceof Long) { + return ((Number) value).longValue(); + } else if (value instanceof Double || value instanceof Float) { + // assume value represents number of seconds since epoch + return BigDecimal.valueOf(((Number) value).doubleValue()) + .scaleByPowerOfTen(6) + .setScale(0, RoundingMode.HALF_UP) + .longValue(); + } + return null; + }) + .put( + TableFieldSchema.Type.DATE, + (schemaInformation, value) -> { + if (value instanceof String) { + return ((Long) LocalDate.parse((String) value).toEpochDay()).intValue(); + } else if (value instanceof LocalDate) { + return ((Long) ((LocalDate) value).toEpochDay()).intValue(); + } else if (value instanceof org.joda.time.LocalDate) { + return Days.daysBetween( + org.joda.time.Instant.EPOCH.toDateTime().toLocalDate(), + (org.joda.time.LocalDate) value) + .getDays(); + } else if (value instanceof Integer || value instanceof Long) { + return ((Number) value).intValue(); + } + return null; + }) + .put( + TableFieldSchema.Type.NUMERIC, + (schemaInformation, value) -> { + if (value instanceof String) { + return BigDecimalByteStringEncoder.encodeToNumericByteString( + new BigDecimal((String) value)); + } else if (value instanceof BigDecimal) { + return BigDecimalByteStringEncoder.encodeToNumericByteString( + ((BigDecimal) value)); + } else if (value instanceof Double || value instanceof Float) { + return BigDecimalByteStringEncoder.encodeToNumericByteString( + BigDecimal.valueOf(((Number) value).doubleValue())); + } else if (value instanceof Short + || value instanceof Integer + || value instanceof Long) { + return BigDecimalByteStringEncoder.encodeToNumericByteString( + BigDecimal.valueOf(((Number) value).longValue())); + } + return null; + }) + .put( + TableFieldSchema.Type.BIGNUMERIC, + (schemaInformation, value) -> { + if (value instanceof String) { + return BigDecimalByteStringEncoder.encodeToBigNumericByteString( + new BigDecimal((String) value)); + } else if (value instanceof BigDecimal) { + return BigDecimalByteStringEncoder.encodeToBigNumericByteString( + ((BigDecimal) value)); + } else if (value instanceof Double || value instanceof Float) { + return BigDecimalByteStringEncoder.encodeToBigNumericByteString( + BigDecimal.valueOf(((Number) value).doubleValue())); + } else if (value instanceof Short + || value instanceof Integer + || value instanceof Long) { + return BigDecimalByteStringEncoder.encodeToBigNumericByteString( + BigDecimal.valueOf(((Number) value).longValue())); + } + return null; + }) + .put( + TableFieldSchema.Type.DATETIME, + (schemaInformation, value) -> { + if (value instanceof String) { + try { + // '2011-12-03T10:15:30' + return CivilTimeEncoder.encodePacked64DatetimeMicros( + LocalDateTime.parse((String) value)); + } catch (DateTimeParseException e2) { + // '2011-12-03 10:15:30' + return CivilTimeEncoder.encodePacked64DatetimeMicros( + LocalDateTime.parse((String) value, DATETIME_SPACE_FORMATTER)); + } + } else if (value instanceof Number) { + return ((Number) value).longValue(); + } else if (value instanceof LocalDateTime) { + return CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value); + } else if (value instanceof org.joda.time.LocalDateTime) { + return CivilTimeEncoder.encodePacked64DatetimeMicros( + (org.joda.time.LocalDateTime) value); + } + return null; + }) + .put( + TableFieldSchema.Type.TIME, + (schemaInformation, value) -> { + if (value instanceof String) { + return CivilTimeEncoder.encodePacked64TimeMicros( + LocalTime.parse((String) value)); + } else if (value instanceof Number) { + return ((Number) value).longValue(); + } else if (value instanceof LocalTime) { + return CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value); + } else if (value instanceof org.joda.time.LocalTime) { + return CivilTimeEncoder.encodePacked64TimeMicros( + (org.joda.time.LocalTime) value); + } + return null; + }) + .put( + TableFieldSchema.Type.STRING, + (schemaInformation, value) -> + Preconditions.checkArgumentNotNull(value).toString()) + .put( + TableFieldSchema.Type.JSON, + (schemaInformation, value) -> + Preconditions.checkArgumentNotNull(value).toString()) + .put( + TableFieldSchema.Type.GEOGRAPHY, + (schemaInformation, value) -> + Preconditions.checkArgumentNotNull(value).toString()) + .build(); + public static TableFieldSchema.Mode modeToProtoMode( @Nullable String defaultValueExpression, String mode) { TableFieldSchema.Mode resultMode = @@ -345,7 +542,7 @@ public static TableFieldSchema tableFieldToProtoTableField( return builder.build(); } - static class SchemaInformation { + public static class SchemaInformation { private final TableFieldSchema tableFieldSchema; private final List subFields; private final Map subFieldsByName; @@ -382,6 +579,14 @@ public TableFieldSchema.Type getType() { return tableFieldSchema.getType(); } + public boolean isNullable() { + return tableFieldSchema.getMode().equals(TableFieldSchema.Mode.NULLABLE); + } + + public boolean isRepeated() { + return tableFieldSchema.getMode().equals(TableFieldSchema.Mode.REPEATED); + } + public SchemaInformation getSchemaForField(String name) { SchemaInformation schemaInformation = subFieldsByName.get(name.toLowerCase()); if (schemaInformation == null) { @@ -398,7 +603,7 @@ public SchemaInformation getSchemaForField(int i) { return schemaInformation; } - static SchemaInformation fromTableSchema(TableSchema tableSchema) { + public static SchemaInformation fromTableSchema(TableSchema tableSchema) { TableFieldSchema root = TableFieldSchema.newBuilder() .addAllFields(tableSchema.getFieldsList()) @@ -658,6 +863,9 @@ public static DynamicMessage messageFromTableRow( final int finalIndex = i; Supplier<@Nullable TableRow> getNestedUnknown = () -> { + if (unknownFields == null) { + return null; + } TableRow localUnknownFields = Preconditions.checkStateNotNull(unknownFields); @Nullable TableRow nested = (TableRow) (localUnknownFields.getF().get(finalIndex).getV()); @@ -988,7 +1196,8 @@ public static ByteString mergeNewFields( throw new RuntimeException(e); } TableRow original = - TableRowToStorageApiProto.tableRowFromMessage(message, true, Predicates.alwaysTrue()); + TableRowToStorageApiProto.tableRowFromMessage( + schemaInformation, message, true, Predicates.alwaysTrue()); Map fieldDescriptors = descriptor.getFields().stream() .collect(Collectors.toMap(Descriptors.FieldDescriptor::getName, Functions.identity())); @@ -1061,7 +1270,7 @@ public static ByteString mergeNewFields( return singularFieldToProtoValue( schemaInformation, fieldDescriptor, - bqValue, + Preconditions.checkStateNotNull(bqValue), ignoreUnknownValues, allowMissingRequiredFields, getUnknownNestedFields); @@ -1071,208 +1280,60 @@ public static ByteString mergeNewFields( static @Nullable Object singularFieldToProtoValue( SchemaInformation schemaInformation, FieldDescriptor fieldDescriptor, - @Nullable Object value, + Object value, boolean ignoreUnknownValues, boolean allowMissingRequiredFields, Supplier<@Nullable TableRow> getUnknownNestedFields) throws SchemaConversionException { - switch (schemaInformation.getType()) { - case INT64: - if (value instanceof String) { - try { - return Long.valueOf((String) value); - } catch (NumberFormatException e) { - throw new SingleValueConversionException(value, schemaInformation, e); - } - } else if (value instanceof Integer || value instanceof Long) { - return ((Number) value).longValue(); - } else if (value instanceof BigDecimal) { - try { - return ((BigDecimal) value).longValueExact(); - } catch (ArithmeticException e) { - throw new SingleValueConversionException(value, schemaInformation, e); - } - } else if (value instanceof BigInteger) { - try { - return ((BigInteger) value).longValueExact(); - } catch (ArithmeticException e) { - throw new SingleValueConversionException(value, schemaInformation, e); - } - } - break; - case DOUBLE: - if (value instanceof String) { - return Double.valueOf((String) value); - } else if (value instanceof Number) { - return ((Number) value).doubleValue(); - } - break; - case BOOL: - if (value instanceof String) { - return Boolean.valueOf((String) value); - } else if (value instanceof Boolean) { - return value; - } - break; - case BYTES: - if (value instanceof String) { - return ByteString.copyFrom(BaseEncoding.base64().decode((String) value)); - } else if (value instanceof byte[]) { - return ByteString.copyFrom((byte[]) value); - } else if (value instanceof ByteString) { - return value; - } - break; - case TIMESTAMP: - if (value instanceof String) { - try { - // '2011-12-03T10:15:30Z', '2011-12-03 10:15:30+05:00' - // '2011-12-03 10:15:30 UTC', '2011-12-03T10:15:30 America/New_York' - Instant timestamp = Instant.from(TIMESTAMP_FORMATTER.parse((String) value)); - return toEpochMicros(timestamp); - } catch (DateTimeException e) { - try { - // for backwards compatibility, default time zone is UTC for values with no time-zone - // '2011-12-03T10:15:30' - Instant timestamp = - Instant.from(TIMESTAMP_FORMATTER.withZone(ZoneOffset.UTC).parse((String) value)); - return toEpochMicros(timestamp); - } catch (DateTimeParseException err) { - // "12345667" - Instant timestamp = Instant.ofEpochMilli(Long.parseLong((String) value)); - return toEpochMicros(timestamp); - } - } - } else if (value instanceof Instant) { - return toEpochMicros((Instant) value); - } else if (value instanceof org.joda.time.Instant) { - // joda instant precision is millisecond - return ((org.joda.time.Instant) value).getMillis() * 1000L; - } else if (value instanceof Integer || value instanceof Long) { - return ((Number) value).longValue(); - } else if (value instanceof Double || value instanceof Float) { - // assume value represents number of seconds since epoch - return BigDecimal.valueOf(((Number) value).doubleValue()) - .scaleByPowerOfTen(6) - .setScale(0, RoundingMode.HALF_UP) - .longValue(); - } - break; - case DATE: - if (value instanceof String) { - return ((Long) LocalDate.parse((String) value).toEpochDay()).intValue(); - } else if (value instanceof LocalDate) { - return ((Long) ((LocalDate) value).toEpochDay()).intValue(); - } else if (value instanceof org.joda.time.LocalDate) { - return Days.daysBetween( - org.joda.time.Instant.EPOCH.toDateTime().toLocalDate(), - (org.joda.time.LocalDate) value) - .getDays(); - } else if (value instanceof Integer || value instanceof Long) { - return ((Number) value).intValue(); - } - break; - case NUMERIC: - if (value instanceof String) { - return BigDecimalByteStringEncoder.encodeToNumericByteString( - new BigDecimal((String) value)); - } else if (value instanceof BigDecimal) { - return BigDecimalByteStringEncoder.encodeToNumericByteString(((BigDecimal) value)); - } else if (value instanceof Double || value instanceof Float) { - return BigDecimalByteStringEncoder.encodeToNumericByteString( - BigDecimal.valueOf(((Number) value).doubleValue())); - } else if (value instanceof Short || value instanceof Integer || value instanceof Long) { - return BigDecimalByteStringEncoder.encodeToNumericByteString( - BigDecimal.valueOf(((Number) value).longValue())); - } - break; - case BIGNUMERIC: - if (value instanceof String) { - return BigDecimalByteStringEncoder.encodeToBigNumericByteString( - new BigDecimal((String) value)); - } else if (value instanceof BigDecimal) { - return BigDecimalByteStringEncoder.encodeToBigNumericByteString(((BigDecimal) value)); - } else if (value instanceof Double || value instanceof Float) { - return BigDecimalByteStringEncoder.encodeToBigNumericByteString( - BigDecimal.valueOf(((Number) value).doubleValue())); - } else if (value instanceof Short || value instanceof Integer || value instanceof Long) { - return BigDecimalByteStringEncoder.encodeToBigNumericByteString( - BigDecimal.valueOf(((Number) value).longValue())); - } - break; - case DATETIME: - if (value instanceof String) { - try { - // '2011-12-03T10:15:30' - return CivilTimeEncoder.encodePacked64DatetimeMicros( - LocalDateTime.parse((String) value)); - } catch (DateTimeParseException e2) { - // '2011-12-03 10:15:30' - return CivilTimeEncoder.encodePacked64DatetimeMicros( - LocalDateTime.parse((String) value, DATETIME_SPACE_FORMATTER)); - } - } else if (value instanceof Number) { - return ((Number) value).longValue(); - } else if (value instanceof LocalDateTime) { - return CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value); - } else if (value instanceof org.joda.time.LocalDateTime) { - return CivilTimeEncoder.encodePacked64DatetimeMicros((org.joda.time.LocalDateTime) value); - } - break; - case TIME: - if (value instanceof String) { - return CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) value)); - } else if (value instanceof Number) { - return ((Number) value).longValue(); - } else if (value instanceof LocalTime) { - return CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value); - } else if (value instanceof org.joda.time.LocalTime) { - return CivilTimeEncoder.encodePacked64TimeMicros((org.joda.time.LocalTime) value); - } - break; - case STRING: - case JSON: - case GEOGRAPHY: - return Preconditions.checkArgumentNotNull(value).toString(); - case STRUCT: - if (value instanceof TableRow) { - TableRow tableRow = (TableRow) value; - return messageFromTableRow( - schemaInformation, - fieldDescriptor.getMessageType(), - tableRow, - ignoreUnknownValues, - allowMissingRequiredFields, - getUnknownNestedFields.get(), - null, - null); - } else if (value instanceof AbstractMap) { - // This will handle nested rows. - AbstractMap map = ((AbstractMap) value); - return messageFromMap( - schemaInformation, - fieldDescriptor.getMessageType(), - map, - ignoreUnknownValues, - allowMissingRequiredFields, - getUnknownNestedFields.get(), - null, - null); - } - break; - default: + @Nullable Object converted = null; + if (schemaInformation.getType() == TableFieldSchema.Type.STRUCT) { + if (value instanceof TableRow) { + TableRow tableRow = (TableRow) value; + converted = + messageFromTableRow( + schemaInformation, + fieldDescriptor.getMessageType(), + tableRow, + ignoreUnknownValues, + allowMissingRequiredFields, + getUnknownNestedFields.get(), + null, + null); + } else if (value instanceof AbstractMap) { + // This will handle nested rows. + AbstractMap map = ((AbstractMap) value); + converted = + messageFromMap( + schemaInformation, + fieldDescriptor.getMessageType(), + map, + ignoreUnknownValues, + allowMissingRequiredFields, + getUnknownNestedFields.get(), + null, + null); + } + } else { + @Nullable + ThrowingBiFunction converter = + TYPE_MAP_PROTO_CONVERTERS.get(schemaInformation.getType()); + if (converter == null) { throw new RuntimeException("Unknown type " + schemaInformation.getType()); + } + converted = converter.apply(schemaInformation.getFullName(), value); } - - throw new SchemaDoesntMatchException( - "Unexpected value: " - + value - + ", type: " - + (value == null ? "null" : value.getClass()) - + ". Table field name: " - + schemaInformation.getFullName() - + ", type: " - + schemaInformation.getType()); + if (converted == null) { + throw new SchemaDoesntMatchException( + "Unexpected value: " + + value + + ", type: " + + (value == null ? "null" : value.getClass()) + + ". Table field name: " + + schemaInformation.getFullName() + + ", type: " + + schemaInformation.getType()); + } + return converted; } private static long toEpochMicros(Instant timestamp) { @@ -1282,68 +1343,378 @@ private static long toEpochMicros(Instant timestamp) { @VisibleForTesting public static TableRow tableRowFromMessage( - Message message, boolean includeCdcColumns, Predicate includeField) { - return tableRowFromMessage(message, includeCdcColumns, includeField, ""); + SchemaInformation schemaInformation, + Message message, + boolean includeCdcColumns, + Predicate includeField) { + return tableRowFromMessage(schemaInformation, message, includeCdcColumns, includeField, ""); } public static TableRow tableRowFromMessage( + SchemaInformation schemaInformation, + Message message, + boolean includeCdcColumns, + Predicate includeField, + String namePrefix) { + // We first try to create a map-style TableRow for backwards compatibility with existing usage. + // However this will + // fail if there is a column name "f". If it fails, we then instead create a list-style + // TableRow. + Optional tableRow = + tableRowFromMessageNoF( + schemaInformation, message, includeCdcColumns, includeField, namePrefix); + return tableRow.orElseGet( + () -> + tableRowFromMessageUseSetF( + schemaInformation, message, includeCdcColumns, includeField, "")); + } + + private static Optional tableRowFromMessageNoF( + SchemaInformation schemaInformation, Message message, boolean includeCdcColumns, Predicate includeField, String namePrefix) { - // TODO: Would be more correct to generate TableRows using setF. TableRow tableRow = new TableRow(); for (Map.Entry field : message.getAllFields().entrySet()) { StringBuilder fullName = new StringBuilder(); FieldDescriptor fieldDescriptor = field.getKey(); String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor); + if ("f".equals(fieldName)) { + // TableRow.put won't work as expected if the fields in named "f." Fail the call, and force + // a retry using + // the setF codepath. + return Optional.empty(); + } fullName = fullName.append(namePrefix).append(fieldName); Object fieldValue = field.getValue(); if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString())) && includeField.test(fieldName)) { - tableRow.put( - fieldName, + SchemaInformation fieldSchemaInformation = schemaInformation.getSchemaForField(fieldName); + Object convertedFieldValue = jsonValueFromMessageValue( - fieldDescriptor, fieldValue, true, includeField, fullName.append(".").toString())); + fieldSchemaInformation, + fieldDescriptor, + fieldValue, + true, + includeField, + fullName.append(".").toString(), + false); + if (convertedFieldValue instanceof Optional) { + Optional optional = (Optional) convertedFieldValue; + if (!optional.isPresent()) { + // Some nested message had a field named "f." Fail. + return Optional.empty(); + } else { + convertedFieldValue = optional.get(); + } + } + tableRow.put(fieldName, convertedFieldValue); + } + } + return Optional.of(tableRow); + } + + public static TableRow tableRowFromMessageUseSetF( + SchemaInformation schemaInformation, + Message message, + boolean includeCdcColumns, + Predicate includeField, + String namePrefix) { + List tableCells = + Lists.newArrayListWithCapacity(message.getDescriptorForType().getFields().size()); + + for (FieldDescriptor fieldDescriptor : message.getDescriptorForType().getFields()) { + TableCell tableCell = new TableCell(); + boolean isPresent = + (fieldDescriptor.isRepeated() && message.getRepeatedFieldCount(fieldDescriptor) > 0) + || (!fieldDescriptor.isRepeated() && message.hasField(fieldDescriptor)); + if (isPresent) { + StringBuilder fullName = new StringBuilder(); + String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor); + fullName = fullName.append(namePrefix).append(fieldName); + if ((includeCdcColumns || !StorageApiCDC.COLUMNS.contains(fullName.toString())) + && includeField.test(fieldName)) { + SchemaInformation fieldSchemaInformation = schemaInformation.getSchemaForField(fieldName); + Object fieldValue = message.getField(fieldDescriptor); + Object converted = + jsonValueFromMessageValue( + fieldSchemaInformation, + fieldDescriptor, + fieldValue, + true, + includeField, + fullName.append(".").toString(), + true); + tableCell.setV(converted); + } } + tableCells.add(tableCell); } + + TableRow tableRow = new TableRow(); + tableRow.setF(tableCells); + return tableRow; } + // Our process for generating descriptors modifies the names of nested descriptors for wrapper + // types, so we record them here. + private static final Set FLOAT_VALUE_DESCRIPTOR_NAMES = + ImmutableSet.of("google_protobuf_FloatValue", "FloatValue"); + private static final Set DOUBLE_VALUE_DESCRIPTOR_NAMES = + ImmutableSet.of("google_protobuf_DoubleValue", "DoubleValue"); + private static final Set BOOL_VALUE_DESCRIPTOR_NAMES = + ImmutableSet.of("google_protobuf_BoolValue", "BoolValue"); + private static final Set INT32_VALUE_DESCRIPTOR_NAMES = + ImmutableSet.of("google_protobuf_Int32Value", "Int32Value"); + private static final Set INT64_VALUE_DESCRIPTOR_NAMES = + ImmutableSet.of("google_protobuf_Int64Value", "Int64Value"); + private static final Set UINT32_VALUE_DESCRIPTOR_NAMES = + ImmutableSet.of("google_protobuf_UInt32Value", "UInt32Value"); + private static final Set UINT64_VALUE_DESCRIPTOR_NAMES = + ImmutableSet.of("google_protobuf_UInt64Value", "UInt64Value"); + private static final Set BYTES_VALUE_DESCRIPTOR_NAMES = + ImmutableSet.of("google_protobuf_BytesValue", "BytesValue"); + private static final Set TIMESTAMP_VALUE_DESCRIPTOR_NAMES = + ImmutableSet.of("google_protobuf_Timestamp", "Timestamp"); + + // Translate a proto message value into a json value. If useSetF==false, this will fail with + // Optional.empty() if + // any fields named "f" are found (due to restrictions on the TableRow class). In that case, the + // top level will retry + // with useSetF==true. We fallback this way in order to maintain backwards compatibility with + // existing users. public static Object jsonValueFromMessageValue( + SchemaInformation schemaInformation, FieldDescriptor fieldDescriptor, Object fieldValue, boolean expandRepeated, Predicate includeField, - String prefix) { + String prefix, + boolean useSetF) { if (expandRepeated && fieldDescriptor.isRepeated()) { List valueList = (List) fieldValue; - return valueList.stream() - .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false, includeField, prefix)) - .collect(toList()); + List expanded = Lists.newArrayListWithCapacity(valueList.size()); + for (Object value : valueList) { + Object translatedValue = + jsonValueFromMessageValue( + schemaInformation, fieldDescriptor, value, false, includeField, prefix, useSetF); + if (!useSetF && translatedValue instanceof Optional) { + Optional optional = (Optional) translatedValue; + if (!optional.isPresent()) { + // A nested element contained an "f" column. Fail the call. + return Optional.empty(); + } + translatedValue = optional.get(); + } + expanded.add(translatedValue); + } + return expanded; } - switch (fieldDescriptor.getType()) { - case GROUP: - case MESSAGE: - return tableRowFromMessage((Message) fieldValue, false, includeField, prefix); - case BYTES: - return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray()); - case ENUM: - throw new RuntimeException("Enumerations not supported"); - case INT32: - case FLOAT: - case BOOL: + // BigQueryIO supports direct proto writes - i.e. we allow the user to pass in their own proto + // and skip our + // conversion layer, as long as the proto conforms to the types supported by the BigQuery + // Storage Write API. + // For many schema types, the Storage Write API supports different proto field types (often with + // different + // encodings), so the mapping of schema type -> proto type is one to many. To read the data out + // of the proto, + // we need to examine both the schema type and the proto field type. + switch (schemaInformation.getType()) { case DOUBLE: + switch (fieldDescriptor.getType()) { + case FLOAT: + case DOUBLE: + case STRING: + return DECIMAL_FORMAT.format(Double.parseDouble(fieldValue.toString())); + case MESSAGE: + // Handle the various number wrapper types. + Message doubleMessage = (Message) fieldValue; + if (FLOAT_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) { + float floatValue = + (float) + doubleMessage.getField( + doubleMessage.getDescriptorForType().findFieldByName("value")); + + return DECIMAL_FORMAT.format(floatValue); + } else if (DOUBLE_VALUE_DESCRIPTOR_NAMES.contains( + fieldDescriptor.getMessageType().getName())) { + double doubleValue = + (double) + doubleMessage.getField( + doubleMessage.getDescriptorForType().findFieldByName("value")); + return DECIMAL_FORMAT.format(doubleValue); + } else { + throw new RuntimeException( + "Not implemented yet " + fieldDescriptor.getMessageType().getName()); + } + default: + return fieldValue.toString(); + } + case BOOL: + // Wrapper type. + if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) { + Message boolMessage = (Message) fieldValue; + if (BOOL_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) { + return boolMessage + .getField(boolMessage.getDescriptorForType().findFieldByName("value")) + .toString(); + } else { + throw new RuntimeException( + "Not implemented yet " + fieldDescriptor.getMessageType().getName()); + } + } + return fieldValue.toString(); + case JSON: + case GEOGRAPHY: // The above types have native representations in JSON for all their // possible values. - return fieldValue; case STRING: + return fieldValue.toString(); case INT64: + switch (fieldDescriptor.getType()) { + case MESSAGE: + // Wrapper types. + Message message = (Message) fieldValue; + if (INT32_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) { + return message + .getField(message.getDescriptorForType().findFieldByName("value")) + .toString(); + } else if (INT64_VALUE_DESCRIPTOR_NAMES.contains( + fieldDescriptor.getMessageType().getName())) { + return message + .getField(message.getDescriptorForType().findFieldByName("value")) + .toString(); + } else if (UINT32_VALUE_DESCRIPTOR_NAMES.contains( + fieldDescriptor.getMessageType().getName())) { + return message + .getField(message.getDescriptorForType().findFieldByName("value")) + .toString(); + } else if (UINT64_VALUE_DESCRIPTOR_NAMES.contains( + fieldDescriptor.getMessageType().getName())) { + return message + .getField(message.getDescriptorForType().findFieldByName("value")) + .toString(); + } else { + throw new RuntimeException( + "Not implemented yet " + fieldDescriptor.getMessageType().getFullName()); + } + default: + return fieldValue.toString(); + } + case BYTES: + switch (fieldDescriptor.getType()) { + case BYTES: + return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray()); + case STRING: + return BaseEncoding.base64() + .encode(((String) fieldValue).getBytes(StandardCharsets.UTF_8)); + case MESSAGE: + Message message = (Message) fieldValue; + if (BYTES_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName())) { + ByteString byteString = + (ByteString) + message.getField(message.getDescriptorForType().findFieldByName("value")); + return BaseEncoding.base64().encode(byteString.toByteArray()); + } + throw new RuntimeException( + "Not implemented " + fieldDescriptor.getMessageType().getFullName()); + default: + return fieldValue.toString(); + } + case TIMESTAMP: + if (isProtoFieldTypeInteger(fieldDescriptor.getType())) { + long epochMicros = Long.valueOf(fieldValue.toString()); + long epochSeconds = epochMicros / 1_000_000L; + long nanoAdjustment = (epochMicros % 1_000_000L) * 1_000L; + Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment); + return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER); + } else if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) { + Message message = (Message) fieldValue; + if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains( + fieldDescriptor.getMessageType().getName())) { + Descriptor descriptor = message.getDescriptorForType(); + long seconds = (long) message.getField(descriptor.findFieldByName("seconds")); + int nanos = (int) message.getField(descriptor.findFieldByName("nanos")); + Instant instant = Instant.ofEpochSecond(seconds, nanos); + return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER); + } else { + throw new RuntimeException( + "Not implemented yet " + fieldDescriptor.getMessageType().getFullName()); + } + } else { + return fieldValue.toString(); + } + + case DATE: + if (isProtoFieldTypeInteger(fieldDescriptor.getType())) { + int intDate = Integer.parseInt(fieldValue.toString()); + return LocalDate.ofEpochDay(intDate).toString(); + } else { + return fieldValue.toString(); + } + case NUMERIC: + switch (fieldDescriptor.getType()) { + case BYTES: + ByteString numericByteString = (ByteString) fieldValue; + return BigDecimalByteStringEncoder.decodeNumericByteString(numericByteString) + .stripTrailingZeros() + .toString(); + default: + return fieldValue.toString(); + } + case BIGNUMERIC: + switch (fieldDescriptor.getType()) { + case BYTES: + ByteString numericByteString = (ByteString) fieldValue; + return BigDecimalByteStringEncoder.decodeBigNumericByteString(numericByteString) + .stripTrailingZeros() + .toString(); + default: + return fieldValue.toString(); + } + + case DATETIME: + if (isProtoFieldTypeInteger(fieldDescriptor.getType())) { + long packedDateTime = Long.valueOf(fieldValue.toString()); + return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(packedDateTime) + .format(BigQueryUtils.BIGQUERY_DATETIME_FORMATTER); + } else { + return fieldValue.toString(); + } + + case TIME: + if (isProtoFieldTypeInteger(fieldDescriptor.getType())) { + long packedTime = Long.valueOf(fieldValue.toString()); + return CivilTimeEncoder.decodePacked64TimeMicrosAsJavaTime(packedTime).toString(); + } else { + return fieldValue.toString(); + } + case STRUCT: + return useSetF + ? tableRowFromMessageUseSetF( + schemaInformation, (Message) fieldValue, false, includeField, prefix) + : tableRowFromMessageNoF( + schemaInformation, (Message) fieldValue, false, includeField, prefix); default: - // The above types must be cast to string to be safely encoded in - // JSON (due to JSON's float-based representation of all numbers). return fieldValue.toString(); } } + + private static boolean isProtoFieldTypeInteger(FieldDescriptor.Type type) { + switch (type) { + case INT32: + case INT64: + case UINT32: + case UINT64: + case SFIXED32: + case SFIXED64: + case SINT64: + return true; + default: + return false; + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 77fc7cab0245..3c0ea08c033b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -613,6 +613,7 @@ public StreamAppendClient getStreamAppendClient( private Descriptor protoDescriptor; private TableSchema currentSchema; private @Nullable com.google.cloud.bigquery.storage.v1.TableSchema updatedSchema; + TableRowToStorageApiProto.SchemaInformation schemaInformation; private boolean usedForInsert = false; private boolean usedForUpdate = false; @@ -627,6 +628,9 @@ public StreamAppendClient getStreamAppendClient( throw new ApiException(null, GrpcStatusCode.of(Status.Code.NOT_FOUND), false); } currentSchema = stream.tableContainer.getTable().getSchema(); + schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema( + TableRowToStorageApiProto.schemaToProtoTableSchema(currentSchema)); } } @@ -650,6 +654,7 @@ public ApiFuture appendRows(long offset, ProtoRows rows) } TableRow tableRow = TableRowToStorageApiProto.tableRowFromMessage( + schemaInformation, DynamicMessage.parseFrom(protoDescriptor, bytes), false, Predicates.alwaysTrue()); @@ -698,6 +703,8 @@ public ApiFuture appendRows(long offset, ProtoRows rows) responseBuilder.setUpdatedSchema(newSchema); if (this.updatedSchema == null) { this.updatedSchema = newSchema; + this.schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema((this.updatedSchema)); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index f943b60118d2..a5d6ac68ce66 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; +import static org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.TYPE_MAP_PROTO_CONVERTERS; import static org.apache.beam.sdk.io.gcp.bigquery.WriteTables.ResultCoder.INSTANCE; import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; @@ -59,8 +60,17 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.Exceptions; import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.protobuf.BoolValue; import com.google.protobuf.ByteString; import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.DoubleValue; +import com.google.protobuf.FloatValue; +import com.google.protobuf.Int32Value; +import com.google.protobuf.Int64Value; +import com.google.protobuf.Timestamp; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; +import com.google.protobuf.util.Timestamps; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -1288,7 +1298,7 @@ public void runTestWriteAvro(boolean schemaFromView) throws Exception { "CreateTableSchemaString", Create.of(KV.of(tableName, BigQueryHelpers.toJsonString(tableSchema)))) .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) - .apply(View.asMap())); + .apply(View.asMap())); } else { bqWrite = bqWrite.withSchema(tableSchema); } @@ -1302,34 +1312,46 @@ public void runTestWriteAvro(boolean schemaFromView) throws Exception { p.run(); + // Convert values string before comparing. + List allRows = + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map( + (TableRow tr) -> { + Map stringed = + tr.entrySet().stream() + .collect( + Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())); + + TableRow tableRow = new TableRow(); + tableRow.putAll(stringed); + return tableRow; + }) + .collect(Collectors.toList()); assertThat( - fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), + allRows, containsInAnyOrder( new TableRow() .set("strval", "test") .set("longval", "1") - .set("doubleval", 1.0) + .set("doubleval", "1.0") .set( "instantval", useStorageApi || useStorageApiApproximate - ? String.valueOf(Instant.parse("2019-01-01T00:00:00Z").getMillis() * 1000) + ? "2019-01-01 T00:00:00" : "2019-01-01 00:00:00 UTC"), new TableRow() .set("strval", "test2") .set("longval", "2") - .set("doubleval", 2.0) + .set("doubleval", "2.0") .set( "instantval", useStorageApi || useStorageApiApproximate - ? String.valueOf(Instant.parse("2019-02-01T00:00:00Z").getMillis() * 1000) + ? "2019-02-01 T00:00:00" : "2019-02-01 00:00:00 UTC"))); } @Test public void testWriteAvro() throws Exception { - // only streaming inserts don't support avro types - assumeTrue(!useStreaming); - runTestWriteAvro(false); } @@ -2843,7 +2865,10 @@ private void testWritePartition( multiPartitionsTag, singlePartitionTag, RowWriterFactory.tableRows( - SerializableFunctions.identity(), SerializableFunctions.identity())); + BigQueryIO.TableRowFormatFunction.fromSerializableFunction( + SerializableFunctions.identity()), + BigQueryIO.TableRowFormatFunction.fromSerializableFunction( + SerializableFunctions.identity()))); DoFnTester< Iterable>, @@ -3284,9 +3309,9 @@ public void testStorageApiErrorsWriteProto() throws Exception { Function getPrimitiveRow = (Integer i) -> new TableRow() - .set("primitive_double", Double.valueOf(i)) - .set("primitive_float", Float.valueOf(i).doubleValue()) - .set("primitive_int32", i.intValue()) + .set("primitive_double", TableRowToStorageApiProto.DECIMAL_FORMAT.format(i)) + .set("primitive_float", TableRowToStorageApiProto.DECIMAL_FORMAT.format(i)) + .set("primitive_int32", i.toString()) .set("primitive_int64", i.toString()) .set("primitive_uint32", i.toString()) .set("primitive_uint64", i.toString()) @@ -3294,7 +3319,7 @@ public void testStorageApiErrorsWriteProto() throws Exception { .set("primitive_sint64", i.toString()) .set("primitive_fixed32", i.toString()) .set("primitive_fixed64", i.toString()) - .set("primitive_bool", true) + .set("primitive_bool", "true") .set("primitive_string", i.toString()) .set( "primitive_bytes", @@ -3307,7 +3332,7 @@ public void testStorageApiErrorsWriteProto() throws Exception { (Function & Serializable) tr -> tr.containsKey("primitive_int32") - && (Integer) tr.get("primitive_int32") >= failFrom; + && Integer.parseInt((String) tr.get("primitive_int32")) >= failFrom; fakeDatasetService.setShouldFailRow(shouldFailRow); SerializableFunction formatRecordOnFailureFunction = @@ -3566,7 +3591,14 @@ public void testStorageApiErrorsWriteTableRows() throws Exception { TableSchema subSchema = new TableSchema() .setFields( - ImmutableList.of(new TableFieldSchema().setName("number").setType("INTEGER"))); + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER"), + new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"), + new TableFieldSchema().setName("time").setType("TIME"), + new TableFieldSchema().setName("datetime").setType("DATETIME"), + new TableFieldSchema().setName("date").setType("DATE"), + new TableFieldSchema().setName("numeric").setType("NUMERIC"), + new TableFieldSchema().setName("bignumeric").setType("BIGNUMERIC"))); TableSchema tableSchema = new TableSchema() @@ -3582,10 +3614,19 @@ public void testStorageApiErrorsWriteTableRows() throws Exception { .setType("RECORD") .setFields(subSchema.getFields()))); - TableRow goodNested = new TableRow().set("number", "42"); + TableRow goodNested = + new TableRow() + .set("number", "42") + .set("timestamp", "1970-01-01 T00:00:00.000043") + .set("time", "00:52:07.123456") + .set("datetime", "2019-08-16T00:52:07.123456") + .set("date", "2019-08-16") + .set("numeric", "23.4") + .set("bignumeric", "123456789012345678"); TableRow badNested = new TableRow().set("number", "nAn"); final String failValue = "failme"; + List goodRows = ImmutableList.of( new TableRow().set("name", "n1").set("number", "1"), @@ -3593,6 +3634,7 @@ public void testStorageApiErrorsWriteTableRows() throws Exception { new TableRow().set("name", "n2").set("number", "2"), new TableRow().set("name", failValue).set("number", "2"), new TableRow().set("name", "parent1").set("nested", goodNested), + new TableRow().set("name", failValue).set("number", "2").set("nested", goodNested), new TableRow().set("name", failValue).set("number", "1")); List badRows = ImmutableList.of( @@ -3625,22 +3667,6 @@ public void testStorageApiErrorsWriteTableRows() throws Exception { tr -> tr.containsKey("name") && tr.get("name").equals(failValue); fakeDatasetService.setShouldFailRow(shouldFailRow); - SerializableFunction formatRecordOnFailureFunction = - input -> { - TableRow failedTableRow = new TableRow().set("testFailureFunctionField", "testValue"); - if (input != null) { - Object name = input.get("name"); - if (name != null) { - failedTableRow.set("name", name); - } - Object number = input.get("number"); - if (number != null) { - failedTableRow.set("number", number); - } - } - return failedTableRow; - }; - WriteResult result = p.apply(Create.of(Iterables.concat(goodRows, badRows))) .apply( @@ -3652,7 +3678,6 @@ public void testStorageApiErrorsWriteTableRows() throws Exception { .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) .withPropagateSuccessfulStorageApiWrites(true) .withTestServices(fakeBqServices) - .withFormatRecordOnFailureFunction(formatRecordOnFailureFunction) .withoutValidation()); PCollection deadRows = @@ -3663,13 +3688,10 @@ public void testStorageApiErrorsWriteTableRows() throws Exception { .via(BigQueryStorageApiInsertError::getRow)); PCollection successfulRows = result.getSuccessfulStorageApiInserts(); - List expectedFailedRows = - badRows.stream().map(formatRecordOnFailureFunction::apply).collect(Collectors.toList()); + List expectedFailedRows = Lists.newArrayList(badRows); expectedFailedRows.addAll( - goodRows.stream() - .filter(shouldFailRow::apply) - .map(formatRecordOnFailureFunction::apply) - .collect(Collectors.toList())); + goodRows.stream().filter(shouldFailRow::apply).collect(Collectors.toList())); + PAssert.that(deadRows).containsInAnyOrder(expectedFailedRows); PAssert.that(successfulRows) .containsInAnyOrder( @@ -4029,9 +4051,9 @@ public void testWriteProtos() throws Exception { Function getPrimitiveRow = (Integer i) -> new TableRow() - .set("primitive_double", Double.valueOf(i)) - .set("primitive_float", Float.valueOf(i).doubleValue()) - .set("primitive_int32", i.intValue()) + .set("primitive_double", TableRowToStorageApiProto.DECIMAL_FORMAT.format(i)) + .set("primitive_float", TableRowToStorageApiProto.DECIMAL_FORMAT.format(i)) + .set("primitive_int32", i.toString()) .set("primitive_int64", i.toString()) .set("primitive_uint32", i.toString()) .set("primitive_uint64", i.toString()) @@ -4039,7 +4061,7 @@ public void testWriteProtos() throws Exception { .set("primitive_sint64", i.toString()) .set("primitive_fixed32", i.toString()) .set("primitive_fixed64", i.toString()) - .set("primitive_bool", true) + .set("primitive_bool", "true") .set("primitive_string", i.toString()) .set( "primitive_bytes", @@ -4098,6 +4120,440 @@ public void testWriteProtos() throws Exception { assertThat(allRows, containsInAnyOrder(Iterables.toArray(expectedItems, TableRow.class))); } + // XXX Test string fields + // Test date numeric field + @Test + public void testWriteProtosEncodedValuesDirectWrite() throws Exception { + testWriteProtosEncodedValues(true); + } + + @Test + public void testWriteProtosEncodedValuesNoDirectWrite() throws Exception { + testWriteProtosEncodedValues(false); + } + + public void testWriteProtosEncodedValues(boolean directWrite) throws Exception { + assumeTrue(useStorageApi); + + BigQueryIO.Write.Method method = + useStreaming + ? (useStorageApi + ? (useStorageApiApproximate + ? Method.STORAGE_API_AT_LEAST_ONCE + : Method.STORAGE_WRITE_API) + : Method.STREAMING_INSERTS) + : useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS; + + final TableSchema tableSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("encoded_timestamp").setType("TIMESTAMP"), + new TableFieldSchema().setName("encoded_date").setType("DATE"), + new TableFieldSchema().setName("encoded_numeric").setType("NUMERIC"), + new TableFieldSchema().setName("encoded_bignumeric").setType("BIGNUMERIC"), + new TableFieldSchema().setName("encoded_packed_datetime").setType("DATETIME"), + new TableFieldSchema().setName("encoded_packed_time").setType("TIME"))); + final TableSchema nestedSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("nested") + .setType("STRUCT") + .setFields(tableSchema.getFields()), + new TableFieldSchema() + .setName("nested_list") + .setType("STRUCT") + .setMode("REPEATED") + .setFields(tableSchema.getFields()))); + + final String timestamp = "1970-01-01 T00:00:00.000043"; + final String date = "2019-08-16"; + final String numeric = "23"; + final String bignumeric = "123456789012345678"; + final String datetime = "2019-08-16T00:52:07.123456"; + final String time = "00:52:07.123456"; + + Function getPrimitive = + (Integer i) -> { + try { + return Proto3SchemaMessages.PrimitiveEncodedFields.newBuilder() + .setEncodedTimestamp( + (long) + TYPE_MAP_PROTO_CONVERTERS + .get( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type + .TIMESTAMP) + .apply("", timestamp)) + .setEncodedDate( + (int) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DATE) + .apply("", date)) + .setEncodedNumeric( + (ByteString) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.NUMERIC) + .apply("", numeric)) + .setEncodedBignumeric( + (ByteString) + TYPE_MAP_PROTO_CONVERTERS + .get( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type + .BIGNUMERIC) + .apply("", bignumeric)) + .setEncodedPackedDatetime( + (long) + TYPE_MAP_PROTO_CONVERTERS + .get( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DATETIME) + .apply("", datetime)) + .setEncodedPackedTime( + (long) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIME) + .apply("", time)) + .build(); + } catch (TableRowToStorageApiProto.SchemaConversionException e) { + throw new RuntimeException(e); + } + }; + + Function getPrimitiveRow = + (Integer i) -> + new TableRow() + .set("encoded_timestamp", timestamp) + .set("encoded_date", date) + .set("encoded_numeric", numeric) + .set("encoded_bignumeric", bignumeric) + .set("encoded_packed_datetime", datetime) + .set("encoded_packed_time", time); + + List nestedItems = + Lists.newArrayList(getPrimitive.apply(1), getPrimitive.apply(2), getPrimitive.apply(3)); + + Iterable items = + nestedItems.stream() + .map( + p -> + Proto3SchemaMessages.NestedEncodedFields.newBuilder() + .setNested(p) + .addAllNestedList(Lists.newArrayList(p, p, p)) + .build()) + .collect(Collectors.toList()); + + List expectedNestedTableRows = + Lists.newArrayList( + getPrimitiveRow.apply(1), getPrimitiveRow.apply(2), getPrimitiveRow.apply(3)); + Iterable expectedItems = + expectedNestedTableRows.stream() + .map( + p -> + new TableRow().set("nested", p).set("nested_list", Lists.newArrayList(p, p, p))) + .collect(Collectors.toList()); + + BigQueryIO.Write write = + BigQueryIO.writeProtos(Proto3SchemaMessages.NestedEncodedFields.class) + .to("dataset-id.table-id") + .withSchema(nestedSchema) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withMethod(method) + .withoutValidation() + .withDirectWriteProtos(directWrite) + .withTestServices(fakeBqServices); + + p.apply(Create.of(items)).apply("WriteToBQ", write); + p.run(); + + // Round trip through the coder to make sure the types match our expected types. + List allRows = + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map( + tr -> { + try { + byte[] bytes = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), tr); + return CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), bytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertThat(allRows, containsInAnyOrder(Iterables.toArray(expectedItems, TableRow.class))); + } + + @Test + public void testWriteProtosUnEncodedValuesDirectWrite() throws Exception { + testWriteProtosUnEncodedValues(true); + } + + @Test + public void testWriteProtosUnEncodedValuesNoDirectWrite() throws Exception { + testWriteProtosUnEncodedValues(false); + } + + public void testWriteProtosUnEncodedValues(boolean directWrite) throws Exception { + BigQueryIO.Write.Method method = + useStreaming + ? (useStorageApi + ? (useStorageApiApproximate + ? Method.STORAGE_API_AT_LEAST_ONCE + : Method.STORAGE_WRITE_API) + : Method.STREAMING_INSERTS) + : useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS; + + final TableSchema tableSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"), + new TableFieldSchema().setName("date").setType("DATE"), + new TableFieldSchema().setName("numeric").setType("NUMERIC"), + new TableFieldSchema().setName("bignumeric").setType("BIGNUMERIC"), + new TableFieldSchema().setName("datetime").setType("DATETIME"), + new TableFieldSchema().setName("time").setType("TIME"))); + final TableSchema nestedSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("nested") + .setType("STRUCT") + .setFields(tableSchema.getFields()), + new TableFieldSchema() + .setName("nested_list") + .setType("STRUCT") + .setMode("REPEATED") + .setFields(tableSchema.getFields()))); + + final String timestamp = "1970-01-01 T00:00:00.000043"; + final String date = "2019-08-16"; + final String numeric = "23"; + final String bignumeric = "123456789012345678"; + final String datetime = "2019-08-16T00:52:07.123456"; + final String time = "00:52:07.123456"; + + Function getPrimitive = + (Integer i) -> { + return Proto3SchemaMessages.PrimitiveUnEncodedFields.newBuilder() + .setTimestamp(timestamp) + .setDate(date) + .setNumeric(numeric) + .setBignumeric(bignumeric) + .setDatetime(datetime) + .setTime(time) + .build(); + }; + + Function getPrimitiveRow = + (Integer i) -> + new TableRow() + .set("timestamp", timestamp) + .set("date", date) + .set("numeric", numeric) + .set("bignumeric", bignumeric) + .set("datetime", datetime) + .set("time", time); + + List nestedItems = + Lists.newArrayList(getPrimitive.apply(1), getPrimitive.apply(2), getPrimitive.apply(3)); + + Iterable items = + nestedItems.stream() + .map( + p -> + Proto3SchemaMessages.NestedUnEncodedFields.newBuilder() + .setNested(p) + .addAllNestedList(Lists.newArrayList(p, p, p)) + .build()) + .collect(Collectors.toList()); + + List expectedNestedTableRows = + Lists.newArrayList( + getPrimitiveRow.apply(1), getPrimitiveRow.apply(2), getPrimitiveRow.apply(3)); + Iterable expectedItems = + expectedNestedTableRows.stream() + .map( + p -> + new TableRow().set("nested", p).set("nested_list", Lists.newArrayList(p, p, p))) + .collect(Collectors.toList()); + + BigQueryIO.Write write = + BigQueryIO.writeProtos(Proto3SchemaMessages.NestedUnEncodedFields.class) + .to("dataset-id.table-id") + .withSchema(nestedSchema) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withMethod(method) + .withoutValidation() + .withDirectWriteProtos(directWrite) + .withTestServices(fakeBqServices); + + p.apply(Create.of(items)).apply("WriteToBQ", write); + p.run(); + + // Round trip through the coder to make sure the types match our expected types. + List allRows = + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map( + tr -> { + try { + byte[] bytes = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), tr); + return CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), bytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertThat(allRows, containsInAnyOrder(Iterables.toArray(expectedItems, TableRow.class))); + } + + @Test + public void testWriteProtosWrappedValuesDirectWrite() throws Exception { + testWriteProtosWrappedValues(true); + } + + @Test + public void testWriteProtosWrappedValuesNoDirectWrite() throws Exception { + testWriteProtosWrappedValues(false); + } + + public void testWriteProtosWrappedValues(boolean directWrite) throws Exception { + assumeTrue(useStorageApi); + BigQueryIO.Write.Method method = + useStreaming + ? (useStorageApi + ? (useStorageApiApproximate + ? Method.STORAGE_API_AT_LEAST_ONCE + : Method.STORAGE_WRITE_API) + : Method.STREAMING_INSERTS) + : useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS; + + final TableSchema tableSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("float").setType("FLOAT"), + new TableFieldSchema().setName("double").setType("FLOAT"), + new TableFieldSchema().setName("bool").setType("BOOL"), + new TableFieldSchema().setName("int32").setType("INTEGER"), + new TableFieldSchema().setName("int64").setType("INT64"), + new TableFieldSchema().setName("uint32").setType("INTEGER"), + new TableFieldSchema().setName("uint64").setType("INT64"), + new TableFieldSchema().setName("bytes").setType("BYTES"), + new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"))); + + final TableSchema nestedSchema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("nested") + .setType("STRUCT") + .setFields(tableSchema.getFields()), + new TableFieldSchema() + .setName("nested_list") + .setType("STRUCT") + .setMode("REPEATED") + .setFields(tableSchema.getFields()))); + + final String timestamp = "1970-01-01 T00:00:00.000043"; + long timestampMicros = + (long) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP) + .apply("", timestamp); + + final FloatValue floatValue = FloatValue.newBuilder().setValue(42.4F).build(); + final DoubleValue doubleValue = DoubleValue.newBuilder().setValue(3.14D).build(); + final BoolValue boolValue = BoolValue.newBuilder().setValue(true).build(); + final Int32Value int32Value = Int32Value.newBuilder().setValue(1234).build(); + final Int64Value int64Value = Int64Value.newBuilder().setValue(12345L).build(); + final UInt32Value uint32Value = UInt32Value.newBuilder().setValue(345).build(); + final UInt64Value uint64Value = UInt64Value.newBuilder().setValue(34567L).build(); + final Timestamp timestampValue = Timestamps.fromMicros(timestampMicros); + + Function getPrimitive = + (Integer i) -> { + return Proto3SchemaMessages.WrapperUnEncodedFields.newBuilder() + .setFloat(floatValue) + .setDouble(doubleValue) + .setBool(boolValue) + .setInt32(int32Value) + .setInt64(int64Value) + .setUint32(uint32Value) + .setUint64(uint64Value) + .setTimestamp(timestampValue) + .build(); + }; + + Function getPrimitiveRow = + (Integer i) -> + new TableRow() + .set( + "float", TableRowToStorageApiProto.DECIMAL_FORMAT.format(floatValue.getValue())) + .set( + "double", + TableRowToStorageApiProto.DECIMAL_FORMAT.format(doubleValue.getValue())) + .set("bool", Boolean.toString(boolValue.getValue())) + .set("int32", Integer.toString(int32Value.getValue())) + .set("int64", Long.toString(int64Value.getValue())) + .set("uint32", Integer.toString(uint32Value.getValue())) + .set("uint64", Long.toString(uint64Value.getValue())) + .set("timestamp", timestamp); + ; + + List nestedItems = + Lists.newArrayList(getPrimitive.apply(1), getPrimitive.apply(2), getPrimitive.apply(3)); + + Iterable items = + nestedItems.stream() + .map( + p -> + Proto3SchemaMessages.NestedWrapperUnEncodedFields.newBuilder() + .setNested(p) + .addAllNestedList(Lists.newArrayList(p, p, p)) + .build()) + .collect(Collectors.toList()); + + List expectedNestedTableRows = + Lists.newArrayList( + getPrimitiveRow.apply(1), getPrimitiveRow.apply(2), getPrimitiveRow.apply(3)); + Iterable expectedItems = + expectedNestedTableRows.stream() + .map( + p -> + new TableRow().set("nested", p).set("nested_list", Lists.newArrayList(p, p, p))) + .collect(Collectors.toList()); + + BigQueryIO.Write write = + BigQueryIO.writeProtos(Proto3SchemaMessages.NestedWrapperUnEncodedFields.class) + .to("dataset-id.table-id") + .withSchema(nestedSchema) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withMethod(method) + .withoutValidation() + .withDirectWriteProtos(directWrite) + .withTestServices(fakeBqServices); + + p.apply(Create.of(items)).apply("WriteToBQ", write); + p.run(); + + // Round trip through the coder to make sure the types match our expected types. + List allRows = + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map( + tr -> { + try { + byte[] bytes = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), tr); + return CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), bytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertThat(allRows, containsInAnyOrder(Iterables.toArray(expectedItems, TableRow.class))); + } + @Test public void testUpsertAndDeleteTableRows() throws Exception { assumeTrue(useStorageApi); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java index 1ae691cb7e99..aedba31f62fa 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.TYPE_MAP_PROTO_CONVERTERS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -25,6 +26,8 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import java.io.IOException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; @@ -37,6 +40,7 @@ import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.protobuf.Proto3SchemaMessages; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -92,6 +96,29 @@ public class TableRowToStorageApiProtoIT { new TableFieldSchema().setType("STRING").setName("123_IllegalProtoFieldName")) .build()); + private static final TableSchema PROTO_ENCODED_TABLE_SCHEMA = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("encoded_timestamp").setType("TIMESTAMP"), + new TableFieldSchema().setName("encoded_date").setType("DATE"), + new TableFieldSchema().setName("encoded_numeric").setType("NUMERIC"), + new TableFieldSchema().setName("encoded_bignumeric").setType("BIGNUMERIC"), + new TableFieldSchema().setName("encoded_packed_datetime").setType("DATETIME"), + new TableFieldSchema().setName("encoded_packed_time").setType("TIME"))); + + private static final TableSchema PROTO_UNENCODED_TABLE_SCHEMA = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"), + new TableFieldSchema().setName("date").setType("DATE"), + new TableFieldSchema().setName("numeric").setType("NUMERIC"), + new TableFieldSchema().setName("bignumeric").setType("BIGNUMERIC"), + new TableFieldSchema().setName("datetime").setType("DATETIME"), + new TableFieldSchema().setName("time").setType("TIME"), + new TableFieldSchema().setName("bytes").setType("BYTES"))); + private static final List REPEATED_BYTES = ImmutableList.of( BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)), @@ -395,6 +422,135 @@ public void testNestedRichTypesAndNull() throws IOException, InterruptedExceptio assertNull(actualTableRows.get(0).get("nestedValue3")); } + @Test + public void testWriteProtosEncodedTypes() + throws IOException, InterruptedException, + TableRowToStorageApiProto.SchemaConversionException { + String tableSpec = createTable(PROTO_ENCODED_TABLE_SCHEMA); + final String timestamp = "1970-01-01T00:00:00.000043"; + final String date = "2019-08-16"; + final String numeric = "23"; + final String bignumeric = "123456789012345678"; + final String datetime = "2019-08-16T00:52:07.123456"; + final String time = "00:52:07.123456"; + + final Proto3SchemaMessages.PrimitiveEncodedFields baseRow = + Proto3SchemaMessages.PrimitiveEncodedFields.newBuilder() + .setEncodedTimestamp( + (long) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP) + .apply("", timestamp)) + .setEncodedDate( + (int) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DATE) + .apply("", date)) + .setEncodedNumeric( + (ByteString) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.NUMERIC) + .apply("", numeric)) + .setEncodedBignumeric( + (ByteString) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.BIGNUMERIC) + .apply("", bignumeric)) + .setEncodedPackedDatetime( + (long) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.DATETIME) + .apply("", datetime)) + .setEncodedPackedTime( + (long) + TYPE_MAP_PROTO_CONVERTERS + .get(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIME) + .apply("", time)) + .build(); + + TableRow expected = + new TableRow() + .set("encoded_timestamp", timestamp) + .set("encoded_date", date) + .set("encoded_numeric", numeric) + .set("encoded_bignumeric", bignumeric) + .set("encoded_packed_datetime", datetime) + .set("encoded_packed_time", time); + + runPipeline( + tableSpec, + Proto3SchemaMessages.PrimitiveEncodedFields.class, + PROTO_ENCODED_TABLE_SCHEMA, + Collections.singleton(baseRow)); + + final String timestampFormat = "\'%Y-%m-%dT%H:%M:%E6S\'"; + List actualTableRows = + BQ_CLIENT.queryUnflattened( + String.format( + "SELECT FORMAT_TIMESTAMP(%s, encoded_timestamp) AS encoded_timestamp, * EXCEPT(encoded_timestamp) " + + "FROM %s", + timestampFormat, tableSpec), + PROJECT, + true, + true, + bigQueryLocation); + + assertEquals(1, actualTableRows.size()); + assertEquals(expected, actualTableRows.get(0)); + } + + @Test + public void testWriteProtosStringTypes() + throws IOException, InterruptedException, + TableRowToStorageApiProto.SchemaConversionException { + String tableSpec = createTable(PROTO_UNENCODED_TABLE_SCHEMA); + final String timestamp = "1970-01-01T00:00:00.000043"; + final String date = "2019-08-16"; + final String numeric = "23"; + final String bignumeric = "123456789012345678"; + final String datetime = "2019-08-16T00:52:07.123456"; + final String time = "00:52:07.123456"; + Proto3SchemaMessages.PrimitiveUnEncodedFields baseRow = + Proto3SchemaMessages.PrimitiveUnEncodedFields.newBuilder() + .setTimestamp(timestamp) + .setDate(date) + .setNumeric(numeric) + .setBignumeric(bignumeric) + .setDatetime(datetime) + .setTime(time) + .build(); + + TableRow expected = + new TableRow() + .set("timestamp", timestamp) + .set("date", date) + .set("numeric", numeric) + .set("bignumeric", bignumeric) + .set("datetime", datetime) + .set("time", time); + + runPipeline( + tableSpec, + Proto3SchemaMessages.PrimitiveUnEncodedFields.class, + PROTO_UNENCODED_TABLE_SCHEMA, + Collections.singleton(baseRow)); + + final String timestampFormat = "\'%Y-%m-%dT%H:%M:%E6S\'"; + List actualTableRows = + BQ_CLIENT.queryUnflattened( + String.format( + "SELECT FORMAT_TIMESTAMP(%s, timestamp) AS timestamp, * EXCEPT(timestamp) " + + "FROM %s", + timestampFormat, tableSpec), + PROJECT, + true, + true, + bigQueryLocation); + + assertEquals(1, actualTableRows.size()); + assertEquals(expected, actualTableRows.get(0)); + } + private static String createTable(TableSchema tableSchema) throws IOException, InterruptedException { String table = "table" + System.nanoTime(); @@ -424,4 +580,18 @@ private static void runPipeline(String tableSpec, Iterable tableRows) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)); p.run().waitUntilFinish(); } + + private static void runPipeline( + String tableSpec, Class protoClass, TableSchema tableSchema, Iterable tableRows) { + Pipeline p = Pipeline.create(); + p.apply("Create test cases", Create.of(tableRows)) + .apply( + "Write using Storage Write API", + BigQueryIO.writeProtos(protoClass) + .to(tableSpec) + .withSchema(tableSchema) + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)); + p.run().waitUntilFinish(); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index 51c56bf53082..05f0e9c993c0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.TIMESTAMP_FORMATTER; +import static org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.TYPE_MAP_PROTO_CONVERTERS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -43,16 +45,21 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaConversionException; import org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaInformation; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -77,105 +84,105 @@ public class TableRowToStorageApiProtoTest { new TableSchema() .setFields( ImmutableList.builder() - .add(new TableFieldSchema().setType("STRING").setName("stringValue")) + .add(new TableFieldSchema().setType("STRING").setName("stringvalue")) .add(new TableFieldSchema().setType("STRING").setName("f")) - .add(new TableFieldSchema().setType("BYTES").setName("bytesValue")) - .add(new TableFieldSchema().setType("INT64").setName("int64Value")) - .add(new TableFieldSchema().setType("INTEGER").setName("intValue")) - .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value")) - .add(new TableFieldSchema().setType("FLOAT").setName("floatValue")) - .add(new TableFieldSchema().setType("BOOL").setName("boolValue")) - .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue")) - .add(new TableFieldSchema().setType("TIME").setName("timeValue")) - .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue")) - .add(new TableFieldSchema().setType("DATE").setName("dateValue")) - .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue")) - .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue")) - .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue2")) - .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue2")) + .add(new TableFieldSchema().setType("BYTES").setName("bytesvalue")) + .add(new TableFieldSchema().setType("INT64").setName("int64value")) + .add(new TableFieldSchema().setType("INTEGER").setName("intvalue")) + .add(new TableFieldSchema().setType("FLOAT64").setName("float64value")) + .add(new TableFieldSchema().setType("FLOAT").setName("floatvalue")) + .add(new TableFieldSchema().setType("BOOL").setName("boolvalue")) + .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanvalue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvalue")) + .add(new TableFieldSchema().setType("TIME").setName("timevalue")) + .add(new TableFieldSchema().setType("DATETIME").setName("datetimevalue")) + .add(new TableFieldSchema().setType("DATE").setName("datevalue")) + .add(new TableFieldSchema().setType("NUMERIC").setName("numericvalue")) + .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bignumericvalue")) + .add(new TableFieldSchema().setType("NUMERIC").setName("numericvalue2")) + .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bignumericvalue2")) .add( new TableFieldSchema() .setType("BYTES") .setMode("REPEATED") .setName("arrayValue")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampISOValue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampisovalue")) .add( new TableFieldSchema() .setType("TIMESTAMP") - .setName("timestampISOValueOffsetHH")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueLong")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpace")) + .setName("timestampisovalueOffsethh")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluelong")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluespace")) .add( - new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpaceUtc")) + new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluespaceutc")) .add( new TableFieldSchema() .setType("TIMESTAMP") - .setName("timestampValueZoneRegion")) + .setName("timestampvaluezoneregion")) .add( new TableFieldSchema() .setType("TIMESTAMP") - .setName("timestampValueSpaceMilli")) + .setName("timestampvaluespacemilli")) .add( new TableFieldSchema() .setType("TIMESTAMP") - .setName("timestampValueSpaceTrailingZero")) - .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValueSpace")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueMaximum")) + .setName("timestampvaluespacetrailingzero")) + .add(new TableFieldSchema().setType("DATETIME").setName("datetimevaluespace")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluemaximum")) .add( - new TableFieldSchema().setType("STRING").setName("123_IllegalProtoFieldName")) + new TableFieldSchema().setType("STRING").setName("123_illegalprotofieldname")) .build()); private static final TableSchema BASE_TABLE_SCHEMA_NO_F = new TableSchema() .setFields( ImmutableList.builder() - .add(new TableFieldSchema().setType("STRING").setName("stringValue")) - .add(new TableFieldSchema().setType("BYTES").setName("bytesValue")) - .add(new TableFieldSchema().setType("INT64").setName("int64Value")) - .add(new TableFieldSchema().setType("INTEGER").setName("intValue")) - .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value")) - .add(new TableFieldSchema().setType("FLOAT").setName("floatValue")) - .add(new TableFieldSchema().setType("BOOL").setName("boolValue")) - .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue")) - .add(new TableFieldSchema().setType("TIME").setName("timeValue")) - .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue")) - .add(new TableFieldSchema().setType("DATE").setName("dateValue")) - .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue")) - .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue")) - .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue2")) - .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue2")) + .add(new TableFieldSchema().setType("STRING").setName("stringvalue")) + .add(new TableFieldSchema().setType("BYTES").setName("bytesvalue")) + .add(new TableFieldSchema().setType("INT64").setName("int64value")) + .add(new TableFieldSchema().setType("INTEGER").setName("intvalue")) + .add(new TableFieldSchema().setType("FLOAT64").setName("float64value")) + .add(new TableFieldSchema().setType("FLOAT").setName("floatvalue")) + .add(new TableFieldSchema().setType("BOOL").setName("boolvalue")) + .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanvalue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvalue")) + .add(new TableFieldSchema().setType("TIME").setName("timevalue")) + .add(new TableFieldSchema().setType("DATETIME").setName("datetimevalue")) + .add(new TableFieldSchema().setType("DATE").setName("datevalue")) + .add(new TableFieldSchema().setType("NUMERIC").setName("numericvalue")) + .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bignumericvalue")) + .add(new TableFieldSchema().setType("NUMERIC").setName("numericvalue2")) + .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bignumericvalue2")) .add( new TableFieldSchema() .setType("BYTES") .setMode("REPEATED") .setName("arrayValue")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampISOValue")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampisovalue")) .add( new TableFieldSchema() .setType("TIMESTAMP") - .setName("timestampISOValueOffsetHH")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueLong")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpace")) + .setName("timestampisovalueOffsethh")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluelong")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluespace")) .add( - new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpaceUtc")) + new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluespaceutc")) .add( new TableFieldSchema() .setType("TIMESTAMP") - .setName("timestampValueZoneRegion")) + .setName("timestampvaluezoneregion")) .add( new TableFieldSchema() .setType("TIMESTAMP") - .setName("timestampValueSpaceMilli")) + .setName("timestampvaluespacemilli")) .add( new TableFieldSchema() .setType("TIMESTAMP") - .setName("timestampValueSpaceTrailingZero")) - .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValueSpace")) - .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueMaximum")) + .setName("timestampvaluespacetrailingzero")) + .add(new TableFieldSchema().setType("DATETIME").setName("datetimevaluespace")) + .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluemaximum")) .add( - new TableFieldSchema().setType("STRING").setName("123_IllegalProtoFieldName")) + new TableFieldSchema().setType("STRING").setName("123_illegalprotofieldname")) .build()); private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR = @@ -920,6 +927,30 @@ public class TableRowToStorageApiProtoTest { .setFields(BASE_TABLE_SCHEMA_NO_F.getFields())) .build()); + private static final TableSchema NESTED_TABLE_SCHEMA_NO_F = + new TableSchema() + .setFields( + ImmutableList.builder() + .add( + new TableFieldSchema() + .setType("STRUCT") + .setName("nestedvalue1") + .setMode("NULLABLE") + .setFields(BASE_TABLE_SCHEMA_NO_F.getFields())) + .add( + new TableFieldSchema() + .setType("RECORD") + .setName("nestedvalue2") + .setMode("NULLABLE") + .setFields(BASE_TABLE_SCHEMA_NO_F.getFields())) + .add( + new TableFieldSchema() + .setType("RECORD") + .setName("repeatedvalue") + .setMode("REPEATED") + .setFields(BASE_TABLE_SCHEMA_NO_F.getFields())) + .build()); + @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test @@ -1156,36 +1187,36 @@ public void testNestedFromTableSchema() throws Exception { private static final TableRow BASE_TABLE_ROW_NO_F = new TableRow() - .set("stringValue", "string") + .set("stringvalue", "string") .set( - "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8))) - .set("int64Value", "42") - .set("intValue", "43") - .set("float64Value", "2.8168") - .set("floatValue", "2") - .set("boolValue", "true") - .set("booleanValue", "true") + "bytesvalue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8))) + .set("int64value", "42") + .set("intvalue", "43") + .set("float64value", "2.8168") + .set("floatvalue", "2") + .set("boolvalue", "true") + .set("booleanvalue", "true") // UTC time - .set("timestampValue", "1970-01-01T00:00:00.000043Z") - .set("timeValue", "00:52:07.123456") - .set("datetimeValue", "2019-08-16T00:52:07.123456") - .set("dateValue", "2019-08-16") - .set("numericValue", "23.4") - .set("bigNumericValue", "2312345.4") - .set("numericValue2", 23) - .set("bigNumericValue2", 123456789012345678L) + .set("timestampvalue", "1970-01-01T00:00:00.000043Z") + .set("timevalue", "00:52:07.123456") + .set("datetimevalue", "2019-08-16T00:52:07.123456") + .set("datevalue", "2019-08-16") + .set("numericvalue", "23.4") + .set("bignumericvalue", "2312345.4") + .set("numericvalue2", 23) + .set("bignumericvalue2", 123456789012345678L) .set("arrayValue", REPEATED_BYTES) - .set("timestampISOValue", "1970-01-01T00:00:00.000+01:00") - .set("timestampISOValueOffsetHH", "1970-01-01T00:00:00.000+01") - .set("timestampValueLong", "1234567") + .set("timestampisovalue", "1970-01-01T00:00:00.000+01:00") + .set("timestampisovalueOffsethh", "1970-01-01T00:00:00.000+01") + .set("timestampvaluelong", "1234567") // UTC time for backwards compatibility - .set("timestampValueSpace", "1970-01-01 00:00:00.000343") - .set("timestampValueSpaceUtc", "1970-01-01 00:00:00.000343 UTC") - .set("timestampValueZoneRegion", "1970-01-01 00:00:00.123456 America/New_York") - .set("timestampValueSpaceMilli", "1970-01-01 00:00:00.123") - .set("timestampValueSpaceTrailingZero", "1970-01-01 00:00:00.1230") - .set("datetimeValueSpace", "2019-08-16 00:52:07.123456") - .set("timestampValueMaximum", "9999-12-31 23:59:59.999999Z") + .set("timestampvaluespace", "1970-01-01 00:00:00.000343") + .set("timestampvaluespaceutc", "1970-01-01 00:00:00.000343 UTC") + .set("timestampvaluezoneregion", "1970-01-01 00:00:00.123456 America/New_York") + .set("timestampvaluespacemilli", "1970-01-01 00:00:00.123") + .set("timestampvaluespacetrailingzero", "1970-01-01 00:00:00.1230") + .set("datetimevaluespace", "2019-08-16 00:52:07.123456") + .set("timestampvaluemaximum", "9999-12-31 23:59:59.999999Z") .set("123_illegalprotofieldname", "madeit"); private static final Map BASE_ROW_EXPECTED_PROTO_VALUES = @@ -1285,6 +1316,136 @@ public void testNestedFromTableSchema() throws Exception { BigQuerySchemaUtil.generatePlaceholderFieldName("123_illegalprotofieldname"), "123_illegalprotofieldname"); + private TableRow normalizeTableRow( + TableRow row, SchemaInformation schemaInformation, boolean outputUsingF) throws Exception { + @Nullable Object fValue = row.get("f"); + if (fValue instanceof List) { + return normalizeTableRowF((List) fValue, schemaInformation, outputUsingF); + } else { + return normalizeTableRowNoF(row, schemaInformation, outputUsingF); + } + } + + private TableRow normalizeTableRowNoF( + TableRow row, SchemaInformation schemaInformation, boolean outputUsingF) throws Exception { + TableRow normalizedRow = new TableRow(); + if (outputUsingF) { + normalizedRow.setF(Lists.newArrayList()); + } + for (final Map.Entry entry : row.entrySet()) { + String key = entry.getKey().toLowerCase(); + SchemaInformation fieldSchemaInformation = + schemaInformation.getSchemaForField(entry.getKey()); + Object normalizedValue = + normalizeFieldValue(entry.getValue(), fieldSchemaInformation, outputUsingF); + if (outputUsingF) { + normalizedRow.getF().add(new TableCell().setV(normalizedValue)); + } else { + normalizedRow.set(key, normalizedValue); + } + } + return normalizedRow; + } + + private TableRow normalizeTableRowF( + List cells, SchemaInformation schemaInformation, boolean outputUsingF) + throws Exception { + TableRow normalizedRow = new TableRow(); + if (outputUsingF) { + normalizedRow.setF(Lists.newArrayList()); + } + for (int i = 0; i < cells.size(); i++) { + SchemaInformation fieldSchemaInformation = schemaInformation.getSchemaForField(i); + Object normalizedValue = + normalizeFieldValue(cells.get(i).getV(), fieldSchemaInformation, outputUsingF); + if (outputUsingF) { + normalizedRow.getF().add(new TableCell().setV(normalizedValue)); + } else { + normalizedRow.set(fieldSchemaInformation.getName(), normalizedValue); + } + } + return normalizedRow; + } + + private @Nullable Object normalizeFieldValue( + @Nullable Object value, SchemaInformation schemaInformation, boolean outputUsingF) + throws Exception { + if (value == null) { + return schemaInformation.isRepeated() ? Collections.emptyList() : null; + } + if (schemaInformation.isRepeated()) { + List list = (List) value; + List normalizedList = Lists.newArrayListWithCapacity(list.size()); + for (@Nullable Object item : list) { + if (item != null) { + normalizedList.add(normalizeSingularField(schemaInformation, item, outputUsingF)); + } + } + return normalizedList; + } + + return normalizeSingularField(schemaInformation, value, outputUsingF); + } + + private @Nullable Object normalizeSingularField( + SchemaInformation schemaInformation, Object value, boolean outputUsingF) throws Exception { + Object convertedValue; + if (schemaInformation.getType() + == com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT) { + return normalizeTableRow((TableRow) value, schemaInformation, outputUsingF); + } else { + convertedValue = TYPE_MAP_PROTO_CONVERTERS.get(schemaInformation.getType()).apply("", value); + switch (schemaInformation.getType()) { + case BOOL: + case JSON: + case GEOGRAPHY: + case STRING: + case INT64: + return convertedValue.toString(); + case DOUBLE: + return TableRowToStorageApiProto.DECIMAL_FORMAT.format((double) convertedValue); + case BYTES: + ByteString byteString = + (ByteString) + TYPE_MAP_PROTO_CONVERTERS.get(schemaInformation.getType()).apply("", value); + return BaseEncoding.base64().encode(byteString.toByteArray()); + case TIMESTAMP: + long timestampLongValue = (long) convertedValue; + long epochSeconds = timestampLongValue / 1_000_000L; + long nanoAdjustment = (timestampLongValue % 1_000_000L) * 1_000L; + Instant instant = Instant.ofEpochSecond(epochSeconds, nanoAdjustment); + return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER); + case DATE: + int daysInt = (int) convertedValue; + return LocalDate.ofEpochDay(daysInt).toString(); + case NUMERIC: + ByteString numericByteString = (ByteString) convertedValue; + return BigDecimalByteStringEncoder.decodeNumericByteString(numericByteString) + .stripTrailingZeros() + .toString(); + case BIGNUMERIC: + ByteString bigNumericByteString = (ByteString) convertedValue; + return BigDecimalByteStringEncoder.decodeBigNumericByteString(bigNumericByteString) + .stripTrailingZeros() + .toString(); + case DATETIME: + long packedDateTime = (long) convertedValue; + return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(packedDateTime) + .format(BigQueryUtils.BIGQUERY_DATETIME_FORMATTER); + case TIME: + long packedTime = (long) convertedValue; + return CivilTimeEncoder.decodePacked64TimeMicrosAsJavaTime(packedTime).toString(); + default: + return value.toString(); + } + } + } + + private static long toEpochMicros(Instant timestamp) { + // i.e 1970-01-01T00:01:01.000040Z: 61 * 1000_000L + 40000/1000 = 61000040 + return timestamp.getEpochSecond() * 1000_000L + timestamp.getNano() / 1000; + } + private void assertBaseRecord(DynamicMessage msg, boolean withF) { Map recordFields = msg.getAllFields().entrySet().stream() @@ -1334,6 +1495,108 @@ public void testMessageFromTableRow() throws Exception { assertBaseRecord((DynamicMessage) msg.getField(fieldDescriptors.get("nestedvaluenof2")), false); } + @Test + public void testTableRowFromMessageNoF() throws Exception { + TableRow tableRow = + new TableRow() + .set("nestedvalue1", BASE_TABLE_ROW_NO_F) + .set("nestedvalue2", BASE_TABLE_ROW_NO_F) + .set("repeatedvalue", ImmutableList.of(BASE_TABLE_ROW_NO_F, BASE_TABLE_ROW_NO_F)); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema( + NESTED_TABLE_SCHEMA_NO_F, true, false); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA_NO_F); + DynamicMessage msg = + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, tableRow, false, false, null, null, -1); + TableRow recovered = + TableRowToStorageApiProto.tableRowFromMessage( + schemaInformation, msg, true, Predicates.alwaysTrue()); + TableRow expected = normalizeTableRow(tableRow, schemaInformation, false); + assertEquals(expected, recovered); + } + + @Test + public void testTableRowFromMessageWithF() throws Exception { + final TableSchema nestedSchema = + new TableSchema() + .setFields( + ImmutableList.builder() + .add( + new TableFieldSchema() + .setType("STRUCT") + .setName("nestedvalue1") + .setMode("NULLABLE") + .setFields(BASE_TABLE_SCHEMA.getFields())) + .add( + new TableFieldSchema() + .setType("RECORD") + .setName("nestedvalue2") + .setMode("NULLABLE") + .setFields(BASE_TABLE_SCHEMA.getFields())) + .add( + new TableFieldSchema() + .setType("RECORD") + .setName("repeatedvalue") + .setMode("REPEATED") + .setFields(BASE_TABLE_SCHEMA.getFields())) + .build()); + + TableRow tableRow = new TableRow(); + tableRow.setF( + Lists.newArrayList( + new TableCell().setV(BASE_TABLE_ROW), + new TableCell().setV(BASE_TABLE_ROW), + new TableCell().setV(ImmutableList.of(BASE_TABLE_ROW, BASE_TABLE_ROW)))); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema(nestedSchema, true, false); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(nestedSchema); + DynamicMessage msg = + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, tableRow, false, false, null, null, -1); + TableRow recovered = + TableRowToStorageApiProto.tableRowFromMessage( + schemaInformation, msg, true, Predicates.alwaysTrue()); + TableRow expected = normalizeTableRow(tableRow, schemaInformation, true); + assertEquals(expected, recovered); + } + + @Test + public void testTableRowFromMessageWithNestedArrayF() throws Exception { + final TableSchema nestedSchema = + new TableSchema() + .setFields( + ImmutableList.builder() + .add( + new TableFieldSchema() + .setType("RECORD") + .setName("repeatedvalue") + .setMode("REPEATED") + .setFields(BASE_TABLE_SCHEMA.getFields())) + .build()); + + TableRow tableRow = new TableRow(); + tableRow.setF( + Lists.newArrayList(new TableCell().setV(ImmutableList.of(BASE_TABLE_ROW, BASE_TABLE_ROW)))); + + Descriptor descriptor = + TableRowToStorageApiProto.getDescriptorFromTableSchema(nestedSchema, true, false); + TableRowToStorageApiProto.SchemaInformation schemaInformation = + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(nestedSchema); + DynamicMessage msg = + TableRowToStorageApiProto.messageFromTableRow( + schemaInformation, descriptor, tableRow, false, false, null, null, -1); + TableRow recovered = + TableRowToStorageApiProto.tableRowFromMessage( + schemaInformation, msg, true, Predicates.alwaysTrue()); + TableRow expected = normalizeTableRow(tableRow, schemaInformation, true); + assertEquals(expected, recovered); + } + @Test public void testMessageWithFFromTableRow() throws Exception { Descriptor descriptor =