Skip to content

Commit d23c095

Browse files
claudevdmClaude
authored andcommitted
initial
1 parent cc8dc2f commit d23c095

File tree

5 files changed

+820
-10
lines changed

5 files changed

+820
-10
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public class BigQueryUtils {
101101
private static final Pattern TABLE_RESOURCE_PATTERN =
102102
Pattern.compile(
103103
"^projects/(?<PROJECT>[^/]+)/datasets/(?<DATASET>[^/]+)/tables/(?<TABLE>[^/]+)$");
104-
104+
ig
105105
// For parsing the format used to refer to tables parameters in BigQueryIO.
106106
// "{project_id}:{dataset_id}.{table_id}" or
107107
// "{project_id}.{dataset_id}.{table_id}"
@@ -380,6 +380,67 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
380380
return ret;
381381
}
382382

383+
/**
384+
* Represents a timestamp with picosecond precision, split into seconds and picoseconds
385+
* components.
386+
*/
387+
public static class TimestampPicos {
388+
final long seconds;
389+
final long picoseconds;
390+
391+
TimestampPicos(long seconds, long picoseconds) {
392+
this.seconds = seconds;
393+
this.picoseconds = picoseconds;
394+
}
395+
}
396+
397+
/**
398+
* Parses a timestamp string into seconds and picoseconds components.
399+
*
400+
* <p>Handles two formats:
401+
*
402+
* <ul>
403+
* <li>ISO format with exactly 12 fractional digits ending in Z (picosecond precision): e.g.,
404+
* "2024-01-15T10:30:45.123456789012Z"
405+
* <li>UTC format with 0-9 fractional digits ending in "UTC" (up to nanosecond precision): e.g.,
406+
* "2024-01-15 10:30:45.123456789 UTC", "2024-01-15 10:30:45 UTC"
407+
* </ul>
408+
*/
409+
public static TimestampPicos parseTimestampPicosFromString(String timestampString) {
410+
// Check for ISO picosecond format up to 12 fractional digits before Z
411+
// Format: "2024-01-15T10:30:45.123456789012Z"
412+
if (timestampString.endsWith("Z")) {
413+
int dotIndex = timestampString.lastIndexOf('.');
414+
415+
if (dotIndex > 0) {
416+
String fractionalPart =
417+
timestampString.substring(dotIndex + 1, timestampString.length() - 1);
418+
419+
if (fractionalPart.length() == 12) {
420+
// ISO timestamp with 12 decimal digits (picosecond precision)
421+
// Parse the datetime part (without fractional seconds)
422+
String dateTimePart = timestampString.substring(0, dotIndex) + "Z";
423+
java.time.Instant baseInstant = java.time.Instant.parse(dateTimePart);
424+
425+
// Parse all 12 digits directly as picoseconds (subsecond portion)
426+
long picoseconds = Long.parseLong(fractionalPart);
427+
428+
return new TimestampPicos(baseInstant.getEpochSecond(), picoseconds);
429+
}
430+
}
431+
432+
// ISO format with 0-9 fractional digits - Instant.parse handles this
433+
java.time.Instant timestamp = java.time.Instant.parse(timestampString);
434+
return new TimestampPicos(timestamp.getEpochSecond(), timestamp.getNano() * 1000L);
435+
}
436+
437+
// UTC format: "2024-01-15 10:30:45.123456789 UTC"
438+
// Use TIMESTAMP_FORMATTER which handles space separator and "UTC" suffix
439+
java.time.Instant timestamp =
440+
java.time.Instant.from(TIMESTAMP_FORMATTER.parse(timestampString));
441+
return new TimestampPicos(timestamp.getEpochSecond(), timestamp.getNano() * 1000L);
442+
}
443+
383444
/**
384445
* Get the Beam {@link FieldType} from a BigQuery type name.
385446
*

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java

Lines changed: 96 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,23 @@ private static String getPrettyFieldName(String fullName) {
191191
.put(TableFieldSchema.Type.JSON, "JSON")
192192
.build();
193193

194+
static final DescriptorProto TIMESTAMP_PICOS_DESCRIPTOR_PROTO =
195+
DescriptorProto.newBuilder()
196+
.setName("TimestampPicos")
197+
.addField(
198+
DescriptorProtos.FieldDescriptorProto.newBuilder()
199+
.setName("seconds")
200+
.setNumber(1)
201+
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
202+
.build())
203+
.addField(
204+
DescriptorProtos.FieldDescriptorProto.newBuilder()
205+
.setName("picoseconds")
206+
.setNumber(2)
207+
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
208+
.build())
209+
.build();
210+
194211
@FunctionalInterface
195212
public interface ThrowingBiFunction<FirstInputT, SecondInputT, OutputT> {
196213
OutputT apply(FirstInputT t, SecondInputT u) throws SchemaConversionException;
@@ -533,6 +550,9 @@ public static TableFieldSchema tableFieldToProtoTableField(
533550
if (field.getScale() != null) {
534551
builder.setScale(field.getScale());
535552
}
553+
if (field.getTimestampPrecision() != null) {
554+
builder.getTimestampPrecisionBuilder().setValue(field.getTimestampPrecision());
555+
}
536556
builder.setType(typeToProtoType(field.getType()));
537557
if (builder.getType().equals(TableFieldSchema.Type.STRUCT)) {
538558
for (com.google.api.services.bigquery.model.TableFieldSchema subField : field.getFields()) {
@@ -587,6 +607,10 @@ public boolean isRepeated() {
587607
return tableFieldSchema.getMode().equals(TableFieldSchema.Mode.REPEATED);
588608
}
589609

610+
public long getTimestampPrecision() {
611+
return tableFieldSchema.getTimestampPrecision().getValue();
612+
}
613+
590614
public SchemaInformation getSchemaForField(String name) {
591615
SchemaInformation schemaInformation = subFieldsByName.get(name.toLowerCase());
592616
if (schemaInformation == null) {
@@ -631,7 +655,6 @@ static SchemaInformation fromTableSchema(
631655
.put(TableFieldSchema.Type.DATE, Type.TYPE_INT32)
632656
.put(TableFieldSchema.Type.TIME, Type.TYPE_INT64)
633657
.put(TableFieldSchema.Type.DATETIME, Type.TYPE_INT64)
634-
.put(TableFieldSchema.Type.TIMESTAMP, Type.TYPE_INT64)
635658
.put(TableFieldSchema.Type.JSON, Type.TYPE_STRING)
636659
.build();
637660

@@ -957,10 +980,16 @@ static TableFieldSchema tableFieldSchemaFromDescriptorField(FieldDescriptor fiel
957980

958981
switch (fieldDescriptor.getType()) {
959982
case MESSAGE:
960-
tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
961-
TableSchema nestedTableField = tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
962-
tableFieldSchemaBuilder =
963-
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
983+
if (fieldDescriptor.getMessageType().getName().equals("TimestampPicos")) {
984+
tableFieldSchemaBuilder.setType(TableFieldSchema.Type.TIMESTAMP);
985+
tableFieldSchemaBuilder.setPrecision(12);
986+
} else {
987+
tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
988+
TableSchema nestedTableField =
989+
tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
990+
tableFieldSchemaBuilder =
991+
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
992+
}
964993
break;
965994
default:
966995
TableFieldSchema.Type type = PRIMITIVE_TYPES_PROTO_TO_BQ.get(fieldDescriptor.getType());
@@ -1060,6 +1089,25 @@ private static void fieldDescriptorFromTableField(
10601089
fieldDescriptorBuilder =
10611090
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
10621091
break;
1092+
case TIMESTAMP:
1093+
if (fieldSchema.getTimestampPrecision().getValue() == 12) {
1094+
boolean typeAlreadyExists =
1095+
descriptorBuilder.getNestedTypeList().stream()
1096+
.anyMatch(d -> TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName().equals(d.getName()));
1097+
1098+
if (!typeAlreadyExists) {
1099+
descriptorBuilder.addNestedType(TIMESTAMP_PICOS_DESCRIPTOR_PROTO);
1100+
}
1101+
fieldDescriptorBuilder =
1102+
fieldDescriptorBuilder
1103+
.setType(Type.TYPE_MESSAGE)
1104+
.setTypeName(TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName());
1105+
} else {
1106+
// Microsecond precision - use simple INT64
1107+
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
1108+
}
1109+
break;
1110+
10631111
default:
10641112
@Nullable Type type = PRIMITIVE_TYPES_BQ_TO_PROTO.get(fieldSchema.getType());
10651113
if (type == null) {
@@ -1313,6 +1361,34 @@ public static ByteString mergeNewFields(
13131361
null,
13141362
null);
13151363
}
1364+
} else if (schemaInformation.getType() == TableFieldSchema.Type.TIMESTAMP
1365+
&& schemaInformation.getTimestampPrecision() == 12) {
1366+
1367+
long seconds;
1368+
long picoseconds;
1369+
1370+
if (value instanceof String) {
1371+
BigQueryUtils.TimestampPicos parsed =
1372+
BigQueryUtils.parseTimestampPicosFromString((String) value);
1373+
seconds = parsed.seconds;
1374+
picoseconds = parsed.picoseconds;
1375+
1376+
} else if (value instanceof Instant) {
1377+
Instant timestamp = (Instant) value;
1378+
seconds = timestamp.getEpochSecond();
1379+
picoseconds = timestamp.getNano() * 1000L;
1380+
} else {
1381+
throw new IllegalArgumentException(
1382+
"Unsupported timestamp value type: " + value.getClass().getName());
1383+
}
1384+
1385+
converted =
1386+
DynamicMessage.newBuilder(fieldDescriptor.getMessageType())
1387+
.setField(fieldDescriptor.getMessageType().findFieldByName("seconds"), seconds)
1388+
.setField(
1389+
fieldDescriptor.getMessageType().findFieldByName("picoseconds"), picoseconds)
1390+
.build();
1391+
13161392
} else {
13171393
@Nullable
13181394
ThrowingBiFunction<String, Object, @Nullable Object> converter =
@@ -1633,13 +1709,28 @@ public static Object jsonValueFromMessageValue(
16331709
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
16341710
} else if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
16351711
Message message = (Message) fieldValue;
1712+
String messageName = fieldDescriptor.getMessageType().getName();
16361713
if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains(
16371714
fieldDescriptor.getMessageType().getName())) {
16381715
Descriptor descriptor = message.getDescriptorForType();
16391716
long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
16401717
int nanos = (int) message.getField(descriptor.findFieldByName("nanos"));
16411718
Instant instant = Instant.ofEpochSecond(seconds, nanos);
16421719
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
1720+
} else if (messageName.equals("TimestampPicos")) {
1721+
Descriptor descriptor = message.getDescriptorForType();
1722+
long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
1723+
long picoseconds = (long) message.getField(descriptor.findFieldByName("picoseconds"));
1724+
1725+
// Convert to ISO timestamp string with picoseconds
1726+
Instant instant = Instant.ofEpochSecond(seconds);
1727+
String baseTimestamp = instant.toString(); // "2024-01-15T10:30:45Z"
1728+
1729+
// Format picoseconds as 12-digit string
1730+
String picosPart = String.format("%012d", picoseconds);
1731+
1732+
// Insert before 'Z': "2024-01-15T10:30:45Z" → "2024-01-15T10:30:45.123456789012Z"
1733+
return baseTimestamp.replace("Z", "." + picosPart + "Z");
16431734
} else {
16441735
throw new RuntimeException(
16451736
"Not implemented yet " + fieldDescriptor.getMessageType().getFullName());

0 commit comments

Comments
 (0)