Skip to content

Commit d46a013

Browse files
authored
Merge pull request #36425 from reuvenlax/fix_reverse_f_issue
Fix issues in tableRowFromMessage
1 parent d90b4e8 commit d46a013

16 files changed

+1896
-448
lines changed

sdks/java/extensions/protobuf/src/test/proto/proto3_schema_messages.proto

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,51 @@ import "proto3_schema_options.proto";
3333

3434
option java_package = "org.apache.beam.sdk.extensions.protobuf";
3535

36+
message PrimitiveEncodedFields {
37+
int64 encoded_timestamp = 1;
38+
int32 encoded_date = 2;
39+
bytes encoded_numeric = 3;
40+
bytes encoded_bignumeric = 4;
41+
int64 encoded_packed_datetime = 5;
42+
int64 encoded_packed_time = 6;
43+
}
44+
45+
message NestedEncodedFields {
46+
PrimitiveEncodedFields nested = 1;
47+
repeated PrimitiveEncodedFields nested_list = 2;
48+
}
49+
50+
message PrimitiveUnEncodedFields {
51+
string timestamp = 1;
52+
string date = 2;
53+
string numeric = 3;
54+
string bignumeric = 4;
55+
string datetime = 5;
56+
string time = 6;
57+
}
58+
59+
message NestedUnEncodedFields {
60+
PrimitiveUnEncodedFields nested = 1;
61+
repeated PrimitiveUnEncodedFields nested_list = 2;
62+
}
63+
64+
message WrapperUnEncodedFields {
65+
google.protobuf.FloatValue float = 1;
66+
google.protobuf.DoubleValue double = 2;
67+
google.protobuf.BoolValue bool = 3;
68+
google.protobuf.Int32Value int32 = 4;
69+
google.protobuf.Int64Value int64 = 5;
70+
google.protobuf.UInt32Value uint32 = 6;
71+
google.protobuf.UInt64Value uint64 = 7;
72+
google.protobuf.BytesValue bytes = 8;
73+
google.protobuf.Timestamp timestamp = 9;
74+
}
75+
76+
message NestedWrapperUnEncodedFields {
77+
WrapperUnEncodedFields nested = 1;
78+
repeated WrapperUnEncodedFields nested_list = 2;
79+
}
80+
3681
message Primitive {
3782
double primitive_double = 1;
3883
float primitive_float = 2;
@@ -287,4 +332,4 @@ message NoWrapPrimitive {
287332
optional bool bool = 13;
288333
optional string string = 14;
289334
optional bytes bytes = 15;
290-
}
335+
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ public ByteString mergeNewFields(
182182
public TableRow toTableRow(ByteString protoBytes, Predicate<String> includeField) {
183183
try {
184184
return TableRowToStorageApiProto.tableRowFromMessage(
185+
getSchemaInformation(),
185186
DynamicMessage.parseFrom(
186187
TableRowToStorageApiProto.wrapDescriptorProto(getDescriptor()), protoBytes),
187188
true,

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 93 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
4343
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
4444
import com.google.cloud.bigquery.storage.v1.DataFormat;
45+
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
4546
import com.google.cloud.bigquery.storage.v1.ReadSession;
4647
import com.google.cloud.bigquery.storage.v1.ReadStream;
4748
import com.google.gson.JsonArray;
@@ -119,6 +120,7 @@
119120
import org.apache.beam.sdk.transforms.PTransform;
120121
import org.apache.beam.sdk.transforms.ParDo;
121122
import org.apache.beam.sdk.transforms.Reshuffle;
123+
import org.apache.beam.sdk.transforms.SerializableBiFunction;
122124
import org.apache.beam.sdk.transforms.SerializableFunction;
123125
import org.apache.beam.sdk.transforms.SerializableFunctions;
124126
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -2297,10 +2299,79 @@ public static <T extends Message> Write<T> writeProtos(Class<T> protoMessageClas
22972299
if (DynamicMessage.class.equals(protoMessageClass)) {
22982300
throw new IllegalArgumentException("DynamicMessage is not supported.");
22992301
}
2300-
return BigQueryIO.<T>write()
2301-
.withFormatFunction(
2302-
m -> TableRowToStorageApiProto.tableRowFromMessage(m, false, Predicates.alwaysTrue()))
2303-
.withWriteProtosClass(protoMessageClass);
2302+
try {
2303+
return BigQueryIO.<T>write()
2304+
.toBuilder()
2305+
.setFormatFunction(FormatProto.fromClass(protoMessageClass))
2306+
.build()
2307+
.withWriteProtosClass(protoMessageClass);
2308+
} catch (Exception e) {
2309+
throw new RuntimeException(e);
2310+
}
2311+
}
2312+
2313+
abstract static class TableRowFormatFunction<T>
2314+
implements SerializableBiFunction<
2315+
TableRowToStorageApiProto.@Nullable SchemaInformation, T, TableRow> {
2316+
static <T> TableRowFormatFunction<T> fromSerializableFunction(
2317+
SerializableFunction<T, TableRow> serializableFunction) {
2318+
return new TableRowFormatFunction<T>() {
2319+
@Override
2320+
public TableRow apply(
2321+
TableRowToStorageApiProto.@Nullable SchemaInformation schemaInformation, T t) {
2322+
return serializableFunction.apply(t);
2323+
}
2324+
};
2325+
}
2326+
2327+
SerializableFunction<T, TableRow> toSerializableFunction() {
2328+
return input -> apply(null, input);
2329+
}
2330+
}
2331+
2332+
private static class FormatProto<T extends Message> extends TableRowFormatFunction<T> {
2333+
transient TableRowToStorageApiProto.SchemaInformation inferredSchemaInformation;
2334+
final Class<T> protoMessageClass;
2335+
2336+
FormatProto(Class<T> protoMessageClass) {
2337+
this.protoMessageClass = protoMessageClass;
2338+
}
2339+
2340+
TableRowToStorageApiProto.SchemaInformation inferSchemaInformation() {
2341+
try {
2342+
if (inferredSchemaInformation == null) {
2343+
Descriptors.Descriptor descriptor =
2344+
(Descriptors.Descriptor)
2345+
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
2346+
protoMessageClass.getMethod("getDescriptor"))
2347+
.invoke(null);
2348+
Descriptors.Descriptor convertedDescriptor =
2349+
TableRowToStorageApiProto.wrapDescriptorProto(
2350+
ProtoSchemaConverter.convert(descriptor).getProtoDescriptor());
2351+
TableSchema tableSchema =
2352+
TableRowToStorageApiProto.protoSchemaToTableSchema(
2353+
TableRowToStorageApiProto.tableSchemaFromDescriptor(convertedDescriptor));
2354+
this.inferredSchemaInformation =
2355+
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
2356+
}
2357+
return inferredSchemaInformation;
2358+
} catch (Exception e) {
2359+
throw new RuntimeException(e);
2360+
}
2361+
}
2362+
2363+
static <T extends Message> FormatProto<T> fromClass(Class<T> protoMessageClass)
2364+
throws Exception {
2365+
return new FormatProto<>(protoMessageClass);
2366+
}
2367+
2368+
@Override
2369+
public TableRow apply(TableRowToStorageApiProto.SchemaInformation schemaInformation, T input) {
2370+
TableRowToStorageApiProto.SchemaInformation localSchemaInformation =
2371+
schemaInformation != null ? schemaInformation : inferSchemaInformation();
2372+
return TableRowToStorageApiProto.tableRowFromMessage(
2373+
localSchemaInformation, input, false, Predicates.alwaysTrue());
2374+
}
23042375
}
23052376

23062377
/** Implementation of {@link #write}. */
@@ -2354,9 +2425,9 @@ public enum Method {
23542425
abstract @Nullable SerializableFunction<ValueInSingleWindow<T>, TableDestination>
23552426
getTableFunction();
23562427

2357-
abstract @Nullable SerializableFunction<T, TableRow> getFormatFunction();
2428+
abstract @Nullable TableRowFormatFunction<T> getFormatFunction();
23582429

2359-
abstract @Nullable SerializableFunction<T, TableRow> getFormatRecordOnFailureFunction();
2430+
abstract @Nullable TableRowFormatFunction<T> getFormatRecordOnFailureFunction();
23602431

23612432
abstract RowWriterFactory.@Nullable AvroRowWriterFactory<T, ?, ?> getAvroRowWriterFactory();
23622433

@@ -2467,10 +2538,10 @@ abstract static class Builder<T> {
24672538
abstract Builder<T> setTableFunction(
24682539
SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction);
24692540

2470-
abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction);
2541+
abstract Builder<T> setFormatFunction(TableRowFormatFunction<T> formatFunction);
24712542

24722543
abstract Builder<T> setFormatRecordOnFailureFunction(
2473-
SerializableFunction<T, TableRow> formatFunction);
2544+
TableRowFormatFunction<T> formatFunction);
24742545

24752546
abstract Builder<T> setAvroRowWriterFactory(
24762547
RowWriterFactory.AvroRowWriterFactory<T, ?, ?> avroRowWriterFactory);
@@ -2718,7 +2789,9 @@ public Write<T> to(DynamicDestinations<T, ?> dynamicDestinations) {
27182789

27192790
/** Formats the user's type into a {@link TableRow} to be written to BigQuery. */
27202791
public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunction) {
2721-
return toBuilder().setFormatFunction(formatFunction).build();
2792+
return toBuilder()
2793+
.setFormatFunction(TableRowFormatFunction.fromSerializableFunction(formatFunction))
2794+
.build();
27222795
}
27232796

27242797
/**
@@ -2733,7 +2806,10 @@ public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunct
27332806
*/
27342807
public Write<T> withFormatRecordOnFailureFunction(
27352808
SerializableFunction<T, TableRow> formatFunction) {
2736-
return toBuilder().setFormatRecordOnFailureFunction(formatFunction).build();
2809+
return toBuilder()
2810+
.setFormatRecordOnFailureFunction(
2811+
TableRowFormatFunction.fromSerializableFunction(formatFunction))
2812+
.build();
27372813
}
27382814

27392815
/**
@@ -3599,9 +3675,8 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
35993675
private <DestinationT> WriteResult expandTyped(
36003676
PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
36013677
boolean optimizeWrites = getOptimizeWrites();
3602-
SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
3603-
SerializableFunction<T, TableRow> formatRecordOnFailureFunction =
3604-
getFormatRecordOnFailureFunction();
3678+
TableRowFormatFunction<T> formatFunction = getFormatFunction();
3679+
TableRowFormatFunction<T> formatRecordOnFailureFunction = getFormatRecordOnFailureFunction();
36053680
RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT> avroRowWriterFactory =
36063681
(RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT>) getAvroRowWriterFactory();
36073682

@@ -3623,7 +3698,9 @@ private <DestinationT> WriteResult expandTyped(
36233698
// If no format function set, then we will automatically convert the input type to a
36243699
// TableRow.
36253700
// TODO: it would be trivial to convert to avro records here instead.
3626-
formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());
3701+
formatFunction =
3702+
TableRowFormatFunction.fromSerializableFunction(
3703+
BigQueryUtils.toTableRow(input.getToRowFunction()));
36273704
}
36283705
// Infer the TableSchema from the input Beam schema.
36293706
// TODO: If the user provided a schema, we should use that. There are things that can be
@@ -3769,8 +3846,8 @@ private <DestinationT> WriteResult continueExpandTyped(
37693846
getCreateDisposition(),
37703847
dynamicDestinations,
37713848
elementCoder,
3772-
tableRowWriterFactory.getToRowFn(),
3773-
tableRowWriterFactory.getToFailsafeRowFn())
3849+
tableRowWriterFactory.getToRowFn().toSerializableFunction(),
3850+
tableRowWriterFactory.getToFailsafeRowFn().toSerializableFunction())
37743851
.withInsertRetryPolicy(retryPolicy)
37753852
.withTestServices(getBigQueryServices())
37763853
.withExtendedErrorInfo(getExtendedErrorInfo())

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray;
2121
import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray;
2222

23-
import com.google.api.services.bigquery.model.TableRow;
2423
import com.google.auto.service.AutoService;
2524
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
2625
import com.google.cloud.bigquery.storage.v1.DataFormat;
@@ -641,14 +640,14 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
641640
if (formatFunctionBytes != null) {
642641
builder =
643642
builder.setFormatFunction(
644-
(SerializableFunction<?, TableRow>) fromByteArray(formatFunctionBytes));
643+
(BigQueryIO.TableRowFormatFunction<?>) fromByteArray(formatFunctionBytes));
645644
}
646645
byte[] formatRecordOnFailureFunctionBytes =
647646
configRow.getBytes("format_record_on_failure_function");
648647
if (formatRecordOnFailureFunctionBytes != null) {
649648
builder =
650649
builder.setFormatRecordOnFailureFunction(
651-
(SerializableFunction<?, TableRow>)
650+
(BigQueryIO.TableRowFormatFunction<?>)
652651
fromByteArray(formatRecordOnFailureFunctionBytes));
653652
}
654653
byte[] avroRowWriterFactoryBytes = configRow.getBytes("avro_row_writer_factory");

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.time.LocalDate;
3535
import java.time.LocalDateTime;
3636
import java.time.LocalTime;
37+
import java.time.ZoneOffset;
38+
import java.time.format.DateTimeParseException;
3739
import java.util.ArrayList;
3840
import java.util.Collections;
3941
import java.util.HashMap;
@@ -169,11 +171,46 @@ public abstract static class Builder {
169171
}
170172

171173
private static final String BIGQUERY_TIME_PATTERN = "HH:mm:ss[.SSSSSS]";
172-
private static final java.time.format.DateTimeFormatter BIGQUERY_TIME_FORMATTER =
174+
static final java.time.format.DateTimeFormatter BIGQUERY_TIME_FORMATTER =
173175
java.time.format.DateTimeFormatter.ofPattern(BIGQUERY_TIME_PATTERN);
174-
private static final java.time.format.DateTimeFormatter BIGQUERY_DATETIME_FORMATTER =
176+
static final java.time.format.DateTimeFormatter BIGQUERY_DATETIME_FORMATTER =
175177
java.time.format.DateTimeFormatter.ofPattern("uuuu-MM-dd'T'" + BIGQUERY_TIME_PATTERN);
176178

179+
// Custom formatter that accepts "2022-05-09 18:04:59.123456"
180+
// The old dremel parser accepts this format, and so does insertall. We need to accept it
181+
// for backwards compatibility, and it is based on UTC time.
182+
static final java.time.format.DateTimeFormatter DATETIME_SPACE_FORMATTER =
183+
new java.time.format.DateTimeFormatterBuilder()
184+
.append(java.time.format.DateTimeFormatter.ISO_LOCAL_DATE)
185+
.optionalStart()
186+
.appendLiteral(' ')
187+
.optionalEnd()
188+
.optionalStart()
189+
.appendLiteral('T')
190+
.optionalEnd()
191+
.append(java.time.format.DateTimeFormatter.ISO_LOCAL_TIME)
192+
.toFormatter()
193+
.withZone(ZoneOffset.UTC);
194+
195+
static final java.time.format.DateTimeFormatter TIMESTAMP_FORMATTER =
196+
new java.time.format.DateTimeFormatterBuilder()
197+
// 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS'
198+
.append(DATETIME_SPACE_FORMATTER)
199+
// 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS(+HH:mm:ss|Z)'
200+
.optionalStart()
201+
.appendOffsetId()
202+
.optionalEnd()
203+
.optionalStart()
204+
.appendOffset("+HH:mm", "+00:00")
205+
.optionalEnd()
206+
// 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS [time_zone]', time_zone -> UTC, Asia/Kolkata, etc
207+
// if both an offset and a time zone are provided, the offset takes precedence
208+
.optionalStart()
209+
.appendLiteral(' ')
210+
.parseCaseSensitive()
211+
.appendZoneRegionId()
212+
.toFormatter();
213+
177214
private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PRINTER;
178215

179216
/**
@@ -747,7 +784,11 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
747784
return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(value);
748785
} catch (NumberFormatException e) {
749786
// Handle as a String, ie. "2023-02-16 12:00:00"
750-
return LocalDateTime.parse(jsonBQString, BIGQUERY_DATETIME_FORMATTER);
787+
try {
788+
return LocalDateTime.parse(jsonBQString);
789+
} catch (DateTimeParseException e2) {
790+
return LocalDateTime.parse(jsonBQString, DATETIME_SPACE_FORMATTER);
791+
}
751792
}
752793
} else if (fieldType.isLogicalType(SqlTypes.DATE.getIdentifier())) {
753794
return LocalDate.parse(jsonBQString);

0 commit comments

Comments
 (0)