Skip to content

Commit a3a9f9f

Browse files
committed
More tests.
1 parent adaf5a3 commit a3a9f9f

File tree

3 files changed

+428
-109
lines changed

3 files changed

+428
-109
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,23 @@ private static String getPrettyFieldName(String fullName) {
191191
.put(TableFieldSchema.Type.JSON, "JSON")
192192
.build();
193193

194+
static final DescriptorProto TIMESTAMP_PICOS_DESCRIPTOR_PROTO =
195+
DescriptorProto.newBuilder()
196+
.setName("TimestampPicos")
197+
.addField(
198+
DescriptorProtos.FieldDescriptorProto.newBuilder()
199+
.setName("seconds")
200+
.setNumber(1)
201+
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
202+
.build())
203+
.addField(
204+
DescriptorProtos.FieldDescriptorProto.newBuilder()
205+
.setName("picoseconds")
206+
.setNumber(2)
207+
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
208+
.build())
209+
.build();
210+
194211
@FunctionalInterface
195212
public interface ThrowingBiFunction<FirstInputT, SecondInputT, OutputT> {
196213
OutputT apply(FirstInputT t, SecondInputT u) throws SchemaConversionException;
@@ -963,10 +980,16 @@ static TableFieldSchema tableFieldSchemaFromDescriptorField(FieldDescriptor fiel
963980

964981
switch (fieldDescriptor.getType()) {
965982
case MESSAGE:
966-
tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
967-
TableSchema nestedTableField = tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
968-
tableFieldSchemaBuilder =
969-
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
983+
if (fieldDescriptor.getMessageType().getName().equals("TimestampPicos")) {
984+
tableFieldSchemaBuilder.setType(TableFieldSchema.Type.TIMESTAMP);
985+
tableFieldSchemaBuilder.setPrecision(12);
986+
} else {
987+
tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
988+
TableSchema nestedTableField =
989+
tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
990+
tableFieldSchemaBuilder =
991+
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
992+
}
970993
break;
971994
default:
972995
TableFieldSchema.Type type = PRIMITIVE_TYPES_PROTO_TO_BQ.get(fieldDescriptor.getType());
@@ -1068,28 +1091,17 @@ private static void fieldDescriptorFromTableField(
10681091
break;
10691092
case TIMESTAMP:
10701093
if (fieldSchema.getTimestampPrecision().getValue() == 12) {
1071-
// Build nested message descriptor for picosecond precision
1072-
DescriptorProto timestampPicosDescriptor =
1073-
DescriptorProto.newBuilder()
1074-
.setName("TimestampPicos")
1075-
.addField(
1076-
DescriptorProtos.FieldDescriptorProto.newBuilder()
1077-
.setName("seconds")
1078-
.setNumber(1)
1079-
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
1080-
.build())
1081-
.addField(
1082-
DescriptorProtos.FieldDescriptorProto.newBuilder()
1083-
.setName("picoseconds")
1084-
.setNumber(2)
1085-
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
1086-
.build())
1087-
.build();
1088-
descriptorBuilder.addNestedType(timestampPicosDescriptor);
1094+
boolean typeAlreadyExists =
1095+
descriptorBuilder.getNestedTypeList().stream()
1096+
.anyMatch(d -> TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName().equals(d.getName()));
1097+
1098+
if (!typeAlreadyExists) {
1099+
descriptorBuilder.addNestedType(TIMESTAMP_PICOS_DESCRIPTOR_PROTO);
1100+
}
10891101
fieldDescriptorBuilder =
10901102
fieldDescriptorBuilder
10911103
.setType(Type.TYPE_MESSAGE)
1092-
.setTypeName(timestampPicosDescriptor.getName());
1104+
.setTypeName(TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName());
10931105
} else {
10941106
// Microsecond precision - use simple INT64
10951107
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
@@ -1697,13 +1709,28 @@ public static Object jsonValueFromMessageValue(
16971709
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
16981710
} else if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
16991711
Message message = (Message) fieldValue;
1712+
String messageName = fieldDescriptor.getMessageType().getName();
17001713
if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains(
17011714
fieldDescriptor.getMessageType().getName())) {
17021715
Descriptor descriptor = message.getDescriptorForType();
17031716
long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
17041717
int nanos = (int) message.getField(descriptor.findFieldByName("nanos"));
17051718
Instant instant = Instant.ofEpochSecond(seconds, nanos);
17061719
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
1720+
} else if (messageName.equals("TimestampPicos")) {
1721+
Descriptor descriptor = message.getDescriptorForType();
1722+
long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
1723+
long picoseconds = (long) message.getField(descriptor.findFieldByName("picoseconds"));
1724+
1725+
// Convert to ISO timestamp string with picoseconds
1726+
Instant instant = Instant.ofEpochSecond(seconds);
1727+
String baseTimestamp = instant.toString(); // "2024-01-15T10:30:45Z"
1728+
1729+
// Format picoseconds as 12-digit string
1730+
String picosPart = String.format("%012d", picoseconds);
1731+
1732+
// Insert before 'Z': "2024-01-15T10:30:45Z" → "2024-01-15T10:30:45.123456789012Z"
1733+
return baseTimestamp.replace("Z", "." + picosPart + "Z");
17071734
} else {
17081735
throw new RuntimeException(
17091736
"Not implemented yet " + fieldDescriptor.getMessageType().getFullName());

0 commit comments

Comments
 (0)