Skip to content

Commit aad5864

Browse files
claudevdmClaude
andauthored
Support picosecond tiemstamps when writing GenericRecord and Beam Rows. (#37294)
Co-authored-by: Claude <cvandermerwe@google.com>
1 parent 4dcc27d commit aad5864

File tree

4 files changed

+57
-0
lines changed

4 files changed

+57
-0
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,9 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch
348348
fieldDescriptorFromAvroField(
349349
new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()));
350350
builder = builder.setType(elementFieldSchema.getType());
351+
if (elementFieldSchema.hasTimestampPrecision()) {
352+
builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
353+
}
351354
builder.addAllFields(elementFieldSchema.getFieldsList());
352355
builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
353356
break;

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,9 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) {
237237
TableFieldSchema elementFieldSchema =
238238
fieldDescriptorFromBeamField(Field.of(field.getName(), elementType));
239239
builder = builder.setType(elementFieldSchema.getType());
240+
if (elementFieldSchema.hasTimestampPrecision()) {
241+
builder = builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
242+
}
240243
builder.addAllFields(elementFieldSchema.getFieldsList());
241244
builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
242245
break;

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,23 @@ private static Schema createTimestampNanosSchema() {
348348
.endRecord();
349349
}
350350

351+
private static Schema createRepeatedTimestampNanosSchema() {
352+
Schema longSchema = Schema.create(Schema.Type.LONG);
353+
longSchema.addProp("logicalType", "timestamp-nanos");
354+
355+
Schema arraySchema = Schema.createArray(longSchema);
356+
357+
return SchemaBuilder.record("RepeatedTimestampNanosRecord")
358+
.fields()
359+
.name("timestampNanosArray")
360+
.type(arraySchema)
361+
.noDefault()
362+
.endRecord();
363+
}
364+
351365
private static final Schema TIMESTAMP_NANOS_SCHEMA = createTimestampNanosSchema();
366+
private static final Schema REPEATED_TIMESTAMP_NANOS_SCHEMA =
367+
createRepeatedTimestampNanosSchema();
352368

353369
private static GenericRecord baseRecord;
354370
private static GenericRecord rawLogicalTypesRecord;
@@ -885,4 +901,22 @@ public void testProtoTableSchemaFromAvroSchemaTimestampNanos() {
885901
assertTrue(field.hasTimestampPrecision());
886902
assertEquals(12L, field.getTimestampPrecision().getValue());
887903
}
904+
905+
@Test
906+
public void testProtoTableSchemaFromAvroSchemaRepeatedTimestampNanos() {
907+
com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
908+
AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(
909+
REPEATED_TIMESTAMP_NANOS_SCHEMA);
910+
911+
assertEquals(1, protoSchema.getFieldsCount());
912+
com.google.cloud.bigquery.storage.v1.TableFieldSchema field = protoSchema.getFields(0);
913+
914+
assertEquals("timestampnanosarray", field.getName());
915+
assertEquals(
916+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP, field.getType());
917+
assertEquals(
918+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, field.getMode());
919+
920+
assertEquals(12L, field.getTimestampPrecision().getValue());
921+
}
888922
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ public class BeamRowToStorageApiProtoTest {
6969
Schema.builder()
7070
.addField("timestampNanos", FieldType.logicalType(Timestamp.NANOS).withNullable(true))
7171
.build();
72+
private static final Schema TIMESTAMP_NANOS_ARRAY_SCHEMA =
73+
Schema.builder()
74+
.addField("timestampNanosArray", FieldType.array(FieldType.logicalType(Timestamp.NANOS)))
75+
.build();
7276
private static final EnumerationType TEST_ENUM =
7377
EnumerationType.create("ONE", "TWO", "RED", "BLUE");
7478
private static final Schema BASE_SCHEMA =
@@ -614,6 +618,19 @@ public void testTimestampNanosSchema() {
614618
assertEquals(12L, field.getTimestampPrecision().getValue());
615619
}
616620

621+
@Test
622+
public void testTimestampNanosArraySchema() {
623+
com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
624+
BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_ARRAY_SCHEMA);
625+
626+
assertEquals(1, protoSchema.getFieldsCount());
627+
TableFieldSchema field = protoSchema.getFields(0);
628+
assertEquals(TableFieldSchema.Type.TIMESTAMP, field.getType());
629+
assertEquals(
630+
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, field.getMode());
631+
assertEquals(12L, field.getTimestampPrecision().getValue());
632+
}
633+
617634
@Test
618635
public void testTimestampNanosDescriptor() throws Exception {
619636
DescriptorProto descriptor =

0 commit comments

Comments
 (0)