|
18 | 18 | package org.apache.beam.sdk.io.gcp.bigquery; |
19 | 19 |
|
20 | 20 | import static java.util.stream.Collectors.toList; |
| 21 | +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.DATETIME_SPACE_FORMATTER; |
| 22 | +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.TIMESTAMP_FORMATTER; |
21 | 23 |
|
22 | 24 | import com.google.api.services.bigquery.model.TableCell; |
23 | 25 | import com.google.api.services.bigquery.model.TableRow; |
|
50 | 52 | import java.time.LocalDateTime; |
51 | 53 | import java.time.LocalTime; |
52 | 54 | import java.time.ZoneOffset; |
53 | | -import java.time.format.DateTimeFormatter; |
54 | | -import java.time.format.DateTimeFormatterBuilder; |
55 | 55 | import java.time.format.DateTimeParseException; |
56 | 56 | import java.util.AbstractMap; |
57 | 57 | import java.util.ArrayList; |
|
81 | 81 | * with the Storage write API. |
82 | 82 | */ |
83 | 83 | public class TableRowToStorageApiProto { |
84 | | - |
85 | | - // Custom formatter that accepts "2022-05-09 18:04:59.123456" |
86 | | - // The old dremel parser accepts this format, and so does insertall. We need to accept it |
87 | | - // for backwards compatibility, and it is based on UTC time. |
88 | | - static final DateTimeFormatter DATETIME_SPACE_FORMATTER = |
89 | | - new DateTimeFormatterBuilder() |
90 | | - .append(DateTimeFormatter.ISO_LOCAL_DATE) |
91 | | - .optionalStart() |
92 | | - .appendLiteral(' ') |
93 | | - .optionalEnd() |
94 | | - .optionalStart() |
95 | | - .appendLiteral('T') |
96 | | - .optionalEnd() |
97 | | - .append(DateTimeFormatter.ISO_LOCAL_TIME) |
98 | | - .toFormatter() |
99 | | - .withZone(ZoneOffset.UTC); |
100 | | - |
101 | | - static final DateTimeFormatter TIMESTAMP_FORMATTER = |
102 | | - new DateTimeFormatterBuilder() |
103 | | - // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS' |
104 | | - .append(DATETIME_SPACE_FORMATTER) |
105 | | - // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS(+HH:mm:ss|Z)' |
106 | | - .optionalStart() |
107 | | - .appendOffsetId() |
108 | | - .optionalEnd() |
109 | | - .optionalStart() |
110 | | - .appendOffset("+HH:mm", "+00:00") |
111 | | - .optionalEnd() |
112 | | - // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS [time_zone]', time_zone -> UTC, Asia/Kolkata, etc |
113 | | - // if both an offset and a time zone are provided, the offset takes precedence |
114 | | - .optionalStart() |
115 | | - .appendLiteral(' ') |
116 | | - .parseCaseSensitive() |
117 | | - .appendZoneRegionId() |
118 | | - .toFormatter(); |
119 | | - |
120 | 84 | abstract static class SchemaConversionException extends Exception { |
121 | 85 | SchemaConversionException(String msg) { |
122 | 86 | super(msg); |
@@ -220,8 +184,8 @@ private static String getPrettyFieldName(String fullName) { |
220 | 184 | .build(); |
221 | 185 |
|
222 | 186 | @FunctionalInterface |
223 | | - public interface ThrowingBiFunction<T, U, R> { |
224 | | - R apply(T t, U u) throws SchemaConversionException; |
| 187 | + public interface ThrowingBiFunction<FirstInputT, SecondInputT, OutputT> { |
| 188 | + OutputT apply(FirstInputT t, SecondInputT u) throws SchemaConversionException; |
225 | 189 | } |
226 | 190 |
|
227 | 191 | // Map of functions to convert json values into the value expected in the Vortex proto object. |
@@ -1333,15 +1297,15 @@ public static TableRow tableRowFromMessageUseSetF( |
1333 | 1297 |
|
1334 | 1298 | // Our process for generating descriptors modifies the names of nested descriptors for wrapper |
1335 | 1299 | // types, so we record them here. |
1336 | | - private static String FLOAT_VALUE_DESCRIPTOR_NAME = "google_protobuf_FloatValue"; |
1337 | | - private static String DOUBLE_VALUE_DESCRIPTOR_NAME = "google_protobuf_DoubleValue"; |
1338 | | - private static String BOOL_VALUE_DESCRIPTOR_NAME = "google_protobuf_BoolValue"; |
1339 | | - private static String INT32_VALUE_DESCRIPTOR_NAME = "google_protobuf_Int32Value"; |
1340 | | - private static String INT64_VALUE_DESCRIPTOR_NAME = "google_protobuf_Int64Value"; |
1341 | | - private static String UINT32_VALUE_DESCRIPTOR_NAME = "google_protobuf_UInt32Value"; |
1342 | | - private static String UINT64_VALUE_DESCRIPTOR_NAME = "google_protobuf_UInt64Value"; |
1343 | | - private static String BYTES_VALUE_DESCRIPTOR_NAME = "google_protobuf_BytesValue"; |
1344 | | - private static String TIMESTAMP_VALUE_DESCRIPTOR_NAME = "google_protobuf_Timestamp"; |
| 1300 | + private static final String FLOAT_VALUE_DESCRIPTOR_NAME = "google_protobuf_FloatValue"; |
| 1301 | + private static final String DOUBLE_VALUE_DESCRIPTOR_NAME = "google_protobuf_DoubleValue"; |
| 1302 | + private static final String BOOL_VALUE_DESCRIPTOR_NAME = "google_protobuf_BoolValue"; |
| 1303 | + private static final String INT32_VALUE_DESCRIPTOR_NAME = "google_protobuf_Int32Value"; |
| 1304 | + private static final String INT64_VALUE_DESCRIPTOR_NAME = "google_protobuf_Int64Value"; |
| 1305 | + private static final String UINT32_VALUE_DESCRIPTOR_NAME = "google_protobuf_UInt32Value"; |
| 1306 | + private static final String UINT64_VALUE_DESCRIPTOR_NAME = "google_protobuf_UInt64Value"; |
| 1307 | + private static final String BYTES_VALUE_DESCRIPTOR_NAME = "google_protobuf_BytesValue"; |
| 1308 | + private static final String TIMESTAMP_VALUE_DESCRIPTOR_NAME = "google_protobuf_Timestamp"; |
1345 | 1309 |
|
1346 | 1310 | // Translate a proto message value into a json value. If useSetF==false, this will fail with |
1347 | 1311 | // Optional.empty() if |
@@ -1551,7 +1515,7 @@ public static Object jsonValueFromMessageValue( |
1551 | 1515 | if (isProtoFieldTypeInteger(fieldDescriptor.getType())) { |
1552 | 1516 | long packedDateTime = Long.valueOf(fieldValue.toString()); |
1553 | 1517 | return CivilTimeEncoder.decodePacked64DatetimeMicrosAsJavaTime(packedDateTime) |
1554 | | - .format(DATETIME_SPACE_FORMATTER); |
| 1518 | + .format(BigQueryUtils.BIGQUERY_DATETIME_FORMATTER); |
1555 | 1519 | } else { |
1556 | 1520 | return fieldValue.toString(); |
1557 | 1521 | } |
|
0 commit comments