Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,51 @@ import "proto3_schema_options.proto";

option java_package = "org.apache.beam.sdk.extensions.protobuf";

message PrimitiveEncodedFields {
int64 encoded_timestamp = 1;
int32 encoded_date = 2;
bytes encoded_numeric = 3;
bytes encoded_bignumeric = 4;
int64 encoded_packed_datetime = 5;
int64 encoded_packed_time = 6;
}

message NestedEncodedFields {
PrimitiveEncodedFields nested = 1;
repeated PrimitiveEncodedFields nested_list = 2;
}

message PrimitiveUnEncodedFields {
string timestamp = 1;
string date = 2;
string numeric = 3;
string bignumeric = 4;
string datetime = 5;
string time = 6;
}

message NestedUnEncodedFields {
PrimitiveUnEncodedFields nested = 1;
repeated PrimitiveUnEncodedFields nested_list = 2;
}

message WrapperUnEncodedFields {
google.protobuf.FloatValue float = 1;
google.protobuf.DoubleValue double = 2;
google.protobuf.BoolValue bool = 3;
google.protobuf.Int32Value int32 = 4;
google.protobuf.Int64Value int64 = 5;
google.protobuf.UInt32Value uint32 = 6;
google.protobuf.UInt64Value uint64 = 7;
google.protobuf.BytesValue bytes = 8;
google.protobuf.Timestamp timestamp = 9;
}

message NestedWrapperUnEncodedFields {
WrapperUnEncodedFields nested = 1;
repeated WrapperUnEncodedFields nested_list = 2;
}

message Primitive {
double primitive_double = 1;
float primitive_float = 2;
Expand Down Expand Up @@ -287,4 +332,4 @@ message NoWrapPrimitive {
optional bool bool = 13;
optional string string = 14;
optional bytes bytes = 15;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public ByteString mergeNewFields(
public TableRow toTableRow(ByteString protoBytes, Predicate<String> includeField) {
try {
return TableRowToStorageApiProto.tableRowFromMessage(
getSchemaInformation(),
DynamicMessage.parseFrom(
TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()), protoBytes),
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.gson.JsonArray;
Expand Down Expand Up @@ -119,6 +120,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.SimpleFunction;
Expand Down Expand Up @@ -2297,10 +2299,79 @@ public static <T extends Message> Write<T> writeProtos(Class<T> protoMessageClas
if (DynamicMessage.class.equals(protoMessageClass)) {
throw new IllegalArgumentException("DynamicMessage is not supported.");
}
return BigQueryIO.<T>write()
.withFormatFunction(
m -> TableRowToStorageApiProto.tableRowFromMessage(m, false, Predicates.alwaysTrue()))
.withWriteProtosClass(protoMessageClass);
try {
return BigQueryIO.<T>write()
.toBuilder()
.setFormatFunction(FormatProto.fromClass(protoMessageClass))
.build()
.withWriteProtosClass(protoMessageClass);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

abstract static class TableRowFormatFunction<T>
implements SerializableBiFunction<
TableRowToStorageApiProto.@Nullable SchemaInformation, T, TableRow> {
static <T> TableRowFormatFunction<T> fromSerializableFunction(
SerializableFunction<T, TableRow> serializableFunction) {
return new TableRowFormatFunction<T>() {
@Override
public TableRow apply(
TableRowToStorageApiProto.@Nullable SchemaInformation schemaInformation, T t) {
return serializableFunction.apply(t);
}
};
}

SerializableFunction<T, TableRow> toSerializableFunction() {
return input -> apply(null, input);
}
}

private static class FormatProto<T extends Message> extends TableRowFormatFunction<T> {
transient TableRowToStorageApiProto.SchemaInformation inferredSchemaInformation;
final Class<T> protoMessageClass;

FormatProto(Class<T> protoMessageClass) {
this.protoMessageClass = protoMessageClass;
}

TableRowToStorageApiProto.SchemaInformation inferSchemaInformation() {
try {
if (inferredSchemaInformation == null) {
Descriptors.Descriptor descriptor =
(Descriptors.Descriptor)
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
protoMessageClass.getMethod("getDescriptor"))
.invoke(null);
Descriptors.Descriptor convertedDescriptor =
TableRowToStorageApiProto.wrapDescriptorProto(
ProtoSchemaConverter.convert(descriptor).getProtoDescriptor());
TableSchema tableSchema =
TableRowToStorageApiProto.protoSchemaToTableSchema(
TableRowToStorageApiProto.tableSchemaFromDescriptor(convertedDescriptor));
this.inferredSchemaInformation =
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
}
return inferredSchemaInformation;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

static <T extends Message> FormatProto<T> fromClass(Class<T> protoMessageClass)
throws Exception {
return new FormatProto<>(protoMessageClass);
}

@Override
public TableRow apply(TableRowToStorageApiProto.SchemaInformation schemaInformation, T input) {
TableRowToStorageApiProto.SchemaInformation localSchemaInformation =
schemaInformation != null ? schemaInformation : inferSchemaInformation();
return TableRowToStorageApiProto.tableRowFromMessage(
localSchemaInformation, input, false, Predicates.alwaysTrue());
}
}

/** Implementation of {@link #write}. */
Expand Down Expand Up @@ -2354,9 +2425,9 @@ public enum Method {
abstract @Nullable SerializableFunction<ValueInSingleWindow<T>, TableDestination>
getTableFunction();

abstract @Nullable SerializableFunction<T, TableRow> getFormatFunction();
abstract @Nullable TableRowFormatFunction<T> getFormatFunction();

abstract @Nullable SerializableFunction<T, TableRow> getFormatRecordOnFailureFunction();
abstract @Nullable TableRowFormatFunction<T> getFormatRecordOnFailureFunction();

abstract RowWriterFactory.@Nullable AvroRowWriterFactory<T, ?, ?> getAvroRowWriterFactory();

Expand Down Expand Up @@ -2467,10 +2538,10 @@ abstract static class Builder<T> {
abstract Builder<T> setTableFunction(
SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction);

abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction);
abstract Builder<T> setFormatFunction(TableRowFormatFunction<T> formatFunction);

abstract Builder<T> setFormatRecordOnFailureFunction(
SerializableFunction<T, TableRow> formatFunction);
TableRowFormatFunction<T> formatFunction);

abstract Builder<T> setAvroRowWriterFactory(
RowWriterFactory.AvroRowWriterFactory<T, ?, ?> avroRowWriterFactory);
Expand Down Expand Up @@ -2718,7 +2789,9 @@ public Write<T> to(DynamicDestinations<T, ?> dynamicDestinations) {

/** Formats the user's type into a {@link TableRow} to be written to BigQuery. */
public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunction) {
return toBuilder().setFormatFunction(formatFunction).build();
return toBuilder()
.setFormatFunction(TableRowFormatFunction.fromSerializableFunction(formatFunction))
.build();
}

/**
Expand All @@ -2733,7 +2806,10 @@ public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunct
*/
public Write<T> withFormatRecordOnFailureFunction(
SerializableFunction<T, TableRow> formatFunction) {
return toBuilder().setFormatRecordOnFailureFunction(formatFunction).build();
return toBuilder()
.setFormatRecordOnFailureFunction(
TableRowFormatFunction.fromSerializableFunction(formatFunction))
.build();
}

/**
Expand Down Expand Up @@ -3599,9 +3675,8 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
private <DestinationT> WriteResult expandTyped(
PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
boolean optimizeWrites = getOptimizeWrites();
SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
SerializableFunction<T, TableRow> formatRecordOnFailureFunction =
getFormatRecordOnFailureFunction();
TableRowFormatFunction<T> formatFunction = getFormatFunction();
TableRowFormatFunction<T> formatRecordOnFailureFunction = getFormatRecordOnFailureFunction();
RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT> avroRowWriterFactory =
(RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT>) getAvroRowWriterFactory();

Expand All @@ -3623,7 +3698,9 @@ private <DestinationT> WriteResult expandTyped(
// If no format function set, then we will automatically convert the input type to a
// TableRow.
// TODO: it would be trivial to convert to avro records here instead.
formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());
formatFunction =
TableRowFormatFunction.fromSerializableFunction(
BigQueryUtils.toTableRow(input.getToRowFunction()));
}
// Infer the TableSchema from the input Beam schema.
// TODO: If the user provided a schema, we should use that. There are things that can be
Expand Down Expand Up @@ -3769,8 +3846,8 @@ private <DestinationT> WriteResult continueExpandTyped(
getCreateDisposition(),
dynamicDestinations,
elementCoder,
tableRowWriterFactory.getToRowFn(),
tableRowWriterFactory.getToFailsafeRowFn())
tableRowWriterFactory.getToRowFn().toSerializableFunction(),
tableRowWriterFactory.getToFailsafeRowFn().toSerializableFunction())
.withInsertRetryPolicy(retryPolicy)
.withTestServices(getBigQueryServices())
.withExtendedErrorInfo(getExtendedErrorInfo())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray;
import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray;

import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.service.AutoService;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.DataFormat;
Expand Down Expand Up @@ -641,14 +640,14 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
if (formatFunctionBytes != null) {
builder =
builder.setFormatFunction(
(SerializableFunction<?, TableRow>) fromByteArray(formatFunctionBytes));
(BigQueryIO.TableRowFormatFunction<?>) fromByteArray(formatFunctionBytes));
}
byte[] formatRecordOnFailureFunctionBytes =
configRow.getBytes("format_record_on_failure_function");
if (formatRecordOnFailureFunctionBytes != null) {
builder =
builder.setFormatRecordOnFailureFunction(
(SerializableFunction<?, TableRow>)
(BigQueryIO.TableRowFormatFunction<?>)
fromByteArray(formatRecordOnFailureFunctionBytes));
}
byte[] avroRowWriterFactoryBytes = configRow.getBytes("avro_row_writer_factory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -169,11 +171,46 @@ public abstract static class Builder {
}

private static final String BIGQUERY_TIME_PATTERN = "HH:mm:ss[.SSSSSS]";
private static final java.time.format.DateTimeFormatter BIGQUERY_TIME_FORMATTER =
static final java.time.format.DateTimeFormatter BIGQUERY_TIME_FORMATTER =
java.time.format.DateTimeFormatter.ofPattern(BIGQUERY_TIME_PATTERN);
private static final java.time.format.DateTimeFormatter BIGQUERY_DATETIME_FORMATTER =
static final java.time.format.DateTimeFormatter BIGQUERY_DATETIME_FORMATTER =
java.time.format.DateTimeFormatter.ofPattern("uuuu-MM-dd'T'" + BIGQUERY_TIME_PATTERN);

// Custom formatter that accepts "2022-05-09 18:04:59.123456"
// The old dremel parser accepts this format, and so does insertall. We need to accept it
// for backwards compatibility, and it is based on UTC time.
static final java.time.format.DateTimeFormatter DATETIME_SPACE_FORMATTER =
new java.time.format.DateTimeFormatterBuilder()
.append(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE)
.optionalStart()
.appendLiteral(' ')
.optionalEnd()
.optionalStart()
.appendLiteral('T')
.optionalEnd()
.append(java.time.format.DateTimeFormatter.ISO_LOCAL_TIME)
.toFormatter()
.withZone(ZoneOffset.UTC);

static final java.time.format.DateTimeFormatter TIMESTAMP_FORMATTER =
new java.time.format.DateTimeFormatterBuilder()
// 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS'
.append(DATETIME_SPACE_FORMATTER)
// 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS(+HH:mm:ss|Z)'
.optionalStart()
.appendOffsetId()
.optionalEnd()
.optionalStart()
.appendOffset("+HH:mm", "+00:00")
.optionalEnd()
// 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS [time_zone]', time_zone -> UTC, Asia/Kolkata, etc
// if both an offset and a time zone are provided, the offset takes precedence
.optionalStart()
.appendLiteral(' ')
.parseCaseSensitive()
.appendZoneRegionId()
.toFormatter();

private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PRINTER;

/**
Expand Down Expand Up @@ -747,7 +784,11 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(value);
} catch (NumberFormatException e) {
// Handle as a String, ie. "2023-02-16 12:00:00"
return LocalDateTime.parse(jsonBQString, BIGQUERY_DATETIME_FORMATTER);
try {
return LocalDateTime.parse(jsonBQString);
} catch (DateTimeParseException e2) {
return LocalDateTime.parse(jsonBQString, DATETIME_SPACE_FORMATTER);
}
}
} else if (fieldType.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return LocalDate.parse(jsonBQString);
Expand Down
Loading
Loading